node/
effects.rs

1use openmina_core::log::system_time;
2use rand::prelude::*;
3
4use crate::{
5    block_producer::BlockProducerAction,
6    block_producer_effectful::block_producer_effects,
7    event_source::event_source_effects,
8    external_snark_worker_effectful::external_snark_worker_effectful_effects,
9    ledger::read::LedgerReadAction,
10    ledger_effectful::ledger_effectful_effects,
11    logger::logger_effects,
12    p2p::node_p2p_effects,
13    p2p_ready,
14    rpc_effectful::rpc_effects,
15    snark::snark_effects,
16    snark_pool::{candidate::SnarkPoolCandidateAction, snark_pool_effects, SnarkPoolAction},
17    transaction_pool::candidate::TransactionPoolCandidateAction,
18    transition_frontier::{genesis::TransitionFrontierGenesisAction, transition_frontier_effects},
19    Action, ActionWithMeta, ExternalSnarkWorkerAction, Service, Store, TransactionPoolAction,
20};
21
22use crate::p2p::channels::rpc::{P2pChannelsRpcAction, P2pRpcRequest};
23
24pub fn effects<S: Service>(store: &mut Store<S>, action: ActionWithMeta) {
25    store.service.recorder().action(&action);
26
27    let (action, meta) = action.split();
28
29    if let Some(stats) = store.service.stats() {
30        stats.new_action(action.kind(), meta.clone());
31    }
32
33    logger_effects(store, meta.clone().with_action(&action));
34    match action {
35        // Following action gets dispatched very often, so ideally this
36        // effect execution should be as light as possible.
37        Action::CheckTimeouts(_) => {
38            // TODO(binier): create init action and dispatch these there.
39            store.dispatch(TransitionFrontierGenesisAction::LedgerLoadInit);
40            store.dispatch(ExternalSnarkWorkerAction::Start);
41
42            store.dispatch(TransitionFrontierGenesisAction::ProveInit);
43
44            if store.state().p2p.ready().is_some() {
45                p2p_request_best_tip_if_needed(store);
46                p2p_request_transactions_if_needed(store);
47                p2p_request_snarks_if_needed(store);
48            }
49
50            store.dispatch(TransactionPoolAction::P2pSendAll);
51            store.dispatch(TransactionPoolCandidateAction::FetchAll);
52            store.dispatch(TransactionPoolCandidateAction::VerifyNext);
53
54            store.dispatch(SnarkPoolAction::CheckTimeouts);
55            store.dispatch(SnarkPoolAction::P2pSendAll);
56
57            store.dispatch(SnarkPoolCandidateAction::WorkFetchAll);
58            store.dispatch(SnarkPoolCandidateAction::WorkVerifyNext);
59
60            store.dispatch(ExternalSnarkWorkerAction::StartTimeout { now: meta.time() });
61            store.dispatch(ExternalSnarkWorkerAction::WorkTimeout { now: meta.time() });
62
63            store.dispatch(BlockProducerAction::WonSlotProduceInit);
64            store.dispatch(BlockProducerAction::BlockInject);
65            store.dispatch(LedgerReadAction::FindTodos);
66        }
67        Action::EventSource(action) => {
68            event_source_effects(store, meta.with_action(action));
69        }
70        Action::Snark(action) => {
71            snark_effects(store, meta.with_action(action));
72        }
73        Action::TransactionPoolEffect(action) => {
74            action.effects(store);
75        }
76        Action::TransitionFrontier(action) => {
77            transition_frontier_effects(store, meta.with_action(action));
78        }
79        Action::P2pEffectful(action) => {
80            node_p2p_effects(store, meta.with_action(action));
81        }
82        Action::LedgerEffects(action) => {
83            ledger_effectful_effects(store, meta.with_action(action));
84        }
85        Action::SnarkPoolEffect(action) => {
86            snark_pool_effects(store, meta.with_action(action));
87        }
88        Action::BlockProducerEffectful(action) => {
89            block_producer_effects(store, meta.with_action(action));
90        }
91        Action::ExternalSnarkWorkerEffects(action) => {
92            external_snark_worker_effectful_effects(store, meta.with_action(action));
93        }
94        Action::RpcEffectful(action) => {
95            rpc_effects(store, meta.with_action(action));
96        }
97        Action::BlockProducer(_)
98        | Action::SnarkPool(_)
99        | Action::ExternalSnarkWorker(_)
100        | Action::TransactionPool(_)
101        | Action::Ledger(_)
102        | Action::Rpc(_)
103        | Action::WatchedAccounts(_)
104        | Action::P2pCallbacks(_)
105        | Action::P2p(_) => {
106            // Handled by reducer
107        }
108    }
109}
110
111fn p2p_request_best_tip_if_needed<S: Service>(store: &mut Store<S>) {
112    // TODO(binier): refactor
113    let state = store.state();
114    let best_candidate = state.transition_frontier.candidates.best_verified();
115    let best_candidate_hash = best_candidate.map(|s| s.block.hash());
116    let best_tip_hash = state.transition_frontier.best_tip().map(|v| &v.hash);
117    let syncing_best_tip_hash = state.transition_frontier.sync.best_tip().map(|v| &v.hash);
118
119    if best_candidate.is_some()
120        && best_candidate_hash != best_tip_hash
121        && best_candidate_hash != syncing_best_tip_hash
122        && best_candidate.is_some_and(|s| s.chain_proof.is_none())
123    {
124        request_best_tip(store, best_candidate_hash.cloned());
125    }
126}
127use mina_p2p_messages::v2::StateHash;
128
129fn request_best_tip<S: Service>(store: &mut Store<S>, consensus_best_tip_hash: Option<StateHash>) {
130    let p2p = p2p_ready!(store.state().p2p, "request_best_tip", system_time());
131
132    let (ready_peers, ready_peers_matching_best_tip) = p2p.ready_rpc_peers_iter().fold(
133        (Vec::new(), Vec::new()),
134        |(mut all, mut matching), (peer_id, peer)| {
135            let rpc_id = peer.channels.next_local_rpc_id();
136            if peer.best_tip.as_ref().map(|b| b.hash()) == consensus_best_tip_hash.as_ref() {
137                matching.push((*peer_id, rpc_id));
138            } else if matching.is_empty() {
139                all.push((*peer_id, rpc_id));
140            }
141            (all, matching)
142        },
143    );
144
145    let peers = if !ready_peers_matching_best_tip.is_empty() {
146        ready_peers_matching_best_tip
147    } else {
148        ready_peers
149    };
150
151    if let Some((peer_id, id)) = peers.choose(&mut store.state().pseudo_rng()) {
152        store.dispatch(P2pChannelsRpcAction::RequestSend {
153            peer_id: *peer_id,
154            id: *id,
155            request: Box::new(P2pRpcRequest::BestTipWithProof),
156            on_init: None,
157        });
158    }
159}
160
161fn p2p_request_transactions_if_needed<S: Service>(store: &mut Store<S>) {
162    use p2p::channels::transaction::P2pChannelsTransactionAction;
163
164    const MAX_PEER_PENDING_TXS: usize = 32;
165
166    let state = store.state();
167    let p2p = p2p_ready!(
168        state.p2p,
169        "p2p_request_transactions_if_needed",
170        system_time()
171    );
172    let transaction_reqs = p2p
173        .ready_peers_iter()
174        .filter(|(_, p)| p.channels.transaction.can_send_request())
175        .map(|(peer_id, _)| {
176            let pending_txs = state.snark_pool.candidates.peer_work_count(peer_id);
177            (peer_id, MAX_PEER_PENDING_TXS.saturating_sub(pending_txs))
178        })
179        .filter(|(_, limit)| *limit > 0)
180        .map(|(peer_id, limit)| (*peer_id, limit.min(u8::MAX as usize) as u8))
181        .collect::<Vec<_>>();
182
183    for (peer_id, limit) in transaction_reqs {
184        store.dispatch(P2pChannelsTransactionAction::RequestSend { peer_id, limit });
185    }
186}
187
188fn p2p_request_snarks_if_needed<S: Service>(store: &mut Store<S>) {
189    use p2p::channels::snark::P2pChannelsSnarkAction;
190
191    const MAX_PEER_PENDING_SNARKS: usize = 32;
192
193    let state = store.state();
194    let p2p = p2p_ready!(state.p2p, "p2p_request_snarks_if_needed", system_time());
195    let snark_reqs = p2p
196        .ready_peers_iter()
197        .filter(|(_, p)| p.channels.snark.can_send_request())
198        .map(|(peer_id, _)| {
199            let pending_snarks = state.snark_pool.candidates.peer_work_count(peer_id);
200            (
201                peer_id,
202                MAX_PEER_PENDING_SNARKS.saturating_sub(pending_snarks),
203            )
204        })
205        .filter(|(_, limit)| *limit > 0)
206        .map(|(peer_id, limit)| (*peer_id, limit.min(u8::MAX as usize) as u8))
207        .collect::<Vec<_>>();
208
209    for (peer_id, limit) in snark_reqs {
210        store.dispatch(P2pChannelsSnarkAction::RequestSend { peer_id, limit });
211    }
212}