mina_node/
effects.rs

1use mina_core::log::system_time;
2use rand::prelude::*;
3
4use crate::{
5    block_producer::BlockProducerAction,
6    block_producer_effectful::block_producer_effects,
7    event_source::event_source_effects,
8    external_snark_worker_effectful::external_snark_worker_effectful_effects,
9    ledger::read::LedgerReadAction,
10    ledger_effectful::ledger_effectful_effects,
11    logger::logger_effects,
12    p2p::node_p2p_effects,
13    p2p_ready,
14    rpc_effectful::rpc_effects,
15    snark::snark_effects,
16    snark_pool::{candidate::SnarkPoolCandidateAction, snark_pool_effects, SnarkPoolAction},
17    transaction_pool::candidate::TransactionPoolCandidateAction,
18    transition_frontier::{genesis::TransitionFrontierGenesisAction, transition_frontier_effects},
19    Action, ActionWithMeta, CheckInvalidPeersAction, ExternalSnarkWorkerAction, Service, Store,
20    TransactionPoolAction,
21};
22
23use crate::p2p::channels::rpc::{P2pChannelsRpcAction, P2pRpcRequest};
24
25pub fn effects<S: Service>(store: &mut Store<S>, action: ActionWithMeta) {
26    store.service.recorder().action(&action);
27
28    let (action, meta) = action.split();
29
30    if let Some(stats) = store.service.stats() {
31        stats.new_action(action.kind(), meta.clone());
32    }
33
34    logger_effects(store, meta.clone().with_action(&action));
35    match action {
36        // Following action gets dispatched very often, so ideally this
37        // effect execution should be as light as possible.
38        Action::CheckTimeouts(_) => {
39            // TODO(binier): create init action and dispatch these there.
40            store.dispatch(TransitionFrontierGenesisAction::LedgerLoadInit);
41            store.dispatch(ExternalSnarkWorkerAction::Start);
42
43            store.dispatch(TransitionFrontierGenesisAction::ProveInit);
44
45            if store.state().p2p.ready().is_some() {
46                p2p_request_best_tip_if_needed(store);
47                p2p_request_transactions_if_needed(store);
48                p2p_request_snarks_if_needed(store);
49            }
50
51            store.dispatch(TransactionPoolAction::P2pSendAll);
52            store.dispatch(TransactionPoolCandidateAction::FetchAll);
53            store.dispatch(TransactionPoolCandidateAction::VerifyNext);
54
55            store.dispatch(SnarkPoolAction::CheckTimeouts);
56            store.dispatch(SnarkPoolAction::P2pSendAll);
57
58            store.dispatch(SnarkPoolCandidateAction::WorkFetchAll);
59            store.dispatch(SnarkPoolCandidateAction::WorkVerifyNext);
60
61            store.dispatch(ExternalSnarkWorkerAction::StartTimeout { now: meta.time() });
62            store.dispatch(ExternalSnarkWorkerAction::WorkTimeout { now: meta.time() });
63
64            store.dispatch(BlockProducerAction::WonSlotProduceInit);
65            store.dispatch(BlockProducerAction::BlockInject);
66            store.dispatch(LedgerReadAction::FindTodos);
67
68            store.dispatch(CheckInvalidPeersAction {});
69        }
70        Action::EventSource(action) => {
71            event_source_effects(store, meta.with_action(action));
72        }
73        Action::Snark(action) => {
74            snark_effects(store, meta.with_action(action));
75        }
76        Action::TransactionPoolEffect(action) => {
77            action.effects(store);
78        }
79        Action::TransitionFrontier(action) => {
80            transition_frontier_effects(store, meta.with_action(action));
81        }
82        Action::P2pEffectful(action) => {
83            node_p2p_effects(store, meta.with_action(action));
84        }
85        Action::LedgerEffects(action) => {
86            ledger_effectful_effects(store, meta.with_action(action));
87        }
88        Action::SnarkPoolEffect(action) => {
89            snark_pool_effects(store, meta.with_action(action));
90        }
91        Action::BlockProducerEffectful(action) => {
92            block_producer_effects(store, meta.with_action(action));
93        }
94        Action::ExternalSnarkWorkerEffects(action) => {
95            external_snark_worker_effectful_effects(store, meta.with_action(action));
96        }
97        Action::RpcEffectful(action) => {
98            rpc_effects(store, meta.with_action(action));
99        }
100        Action::CheckInvalidPeersAction(_) => {
101            let state = store.state();
102            let initial_peers = state
103                .p2p
104                .config()
105                .initial_peers
106                .iter()
107                .map(|opts| opts.to_string())
108                .collect::<Vec<_>>()
109                .join(", ");
110
111            crate::core::error!(
112                crate::core::log::system_time();
113                error = "Invalid initial peers",
114                initial_peers = initial_peers
115            );
116
117            #[cfg(feature = "exit-on-invalid-peers")]
118            {
119                crate::core::error!(
120                    crate::core::log::system_time();
121                    summary = "Exiting due to invalid peers",
122                );
123                std::process::exit(1);
124            }
125        }
126        Action::BlockProducer(_)
127        | Action::SnarkPool(_)
128        | Action::ExternalSnarkWorker(_)
129        | Action::TransactionPool(_)
130        | Action::Ledger(_)
131        | Action::Rpc(_)
132        | Action::WatchedAccounts(_)
133        | Action::P2pCallbacks(_)
134        | Action::P2p(_) => {
135            // Handled by reducer
136        }
137    }
138}
139
140fn p2p_request_best_tip_if_needed<S: Service>(store: &mut Store<S>) {
141    // TODO(binier): refactor
142    let state = store.state();
143    let best_candidate = state.transition_frontier.candidates.best_verified();
144    let best_candidate_hash = best_candidate.map(|s| s.block.hash());
145    let best_tip_hash = state.transition_frontier.best_tip().map(|v| &v.hash);
146    let syncing_best_tip_hash = state.transition_frontier.sync.best_tip().map(|v| &v.hash);
147
148    if best_candidate.is_some()
149        && best_candidate_hash != best_tip_hash
150        && best_candidate_hash != syncing_best_tip_hash
151        && best_candidate.is_some_and(|s| s.chain_proof.is_none())
152    {
153        request_best_tip(store, best_candidate_hash.cloned());
154    }
155}
156use mina_p2p_messages::v2::StateHash;
157
158fn request_best_tip<S: Service>(store: &mut Store<S>, consensus_best_tip_hash: Option<StateHash>) {
159    let p2p = p2p_ready!(store.state().p2p, "request_best_tip", system_time());
160
161    let (ready_peers, ready_peers_matching_best_tip) = p2p.ready_rpc_peers_iter().fold(
162        (Vec::new(), Vec::new()),
163        |(mut all, mut matching), (peer_id, peer)| {
164            let rpc_id = peer.channels.next_local_rpc_id();
165            if peer.best_tip.as_ref().map(|b| b.hash()) == consensus_best_tip_hash.as_ref() {
166                matching.push((*peer_id, rpc_id));
167            } else if matching.is_empty() {
168                all.push((*peer_id, rpc_id));
169            }
170            (all, matching)
171        },
172    );
173
174    let peers = if !ready_peers_matching_best_tip.is_empty() {
175        ready_peers_matching_best_tip
176    } else {
177        ready_peers
178    };
179
180    if let Some((peer_id, id)) = peers.choose(&mut store.state().pseudo_rng()) {
181        store.dispatch(P2pChannelsRpcAction::RequestSend {
182            peer_id: *peer_id,
183            id: *id,
184            request: Box::new(P2pRpcRequest::BestTipWithProof),
185            on_init: None,
186        });
187    }
188}
189
190fn p2p_request_transactions_if_needed<S: Service>(store: &mut Store<S>) {
191    use crate::p2p::channels::transaction::P2pChannelsTransactionAction;
192
193    const MAX_PEER_PENDING_TXS: usize = 32;
194
195    let state = store.state();
196    let p2p = p2p_ready!(
197        state.p2p,
198        "p2p_request_transactions_if_needed",
199        system_time()
200    );
201    let transaction_reqs = p2p
202        .ready_peers_iter()
203        .filter(|(_, p)| p.channels.transaction.can_send_request())
204        .map(|(peer_id, _)| {
205            let pending_txs = state
206                .transaction_pool
207                .candidates
208                .peer_transaction_count(peer_id);
209            (peer_id, MAX_PEER_PENDING_TXS.saturating_sub(pending_txs))
210        })
211        .filter(|(_, limit)| *limit > 0)
212        .map(|(peer_id, limit)| (*peer_id, limit.min(u8::MAX as usize) as u8))
213        .collect::<Vec<_>>();
214
215    for (peer_id, limit) in transaction_reqs {
216        store.dispatch(P2pChannelsTransactionAction::RequestSend { peer_id, limit });
217    }
218}
219
220fn p2p_request_snarks_if_needed<S: Service>(store: &mut Store<S>) {
221    use crate::p2p::channels::snark::P2pChannelsSnarkAction;
222
223    const MAX_PEER_PENDING_SNARKS: usize = 32;
224
225    let state = store.state();
226    let p2p = p2p_ready!(state.p2p, "p2p_request_snarks_if_needed", system_time());
227    let snark_reqs = p2p
228        .ready_peers_iter()
229        .filter(|(_, p)| p.channels.snark.can_send_request())
230        .map(|(peer_id, _)| {
231            let pending_snarks = state.snark_pool.candidates.peer_work_count(peer_id);
232            (
233                peer_id,
234                MAX_PEER_PENDING_SNARKS.saturating_sub(pending_snarks),
235            )
236        })
237        .filter(|(_, limit)| *limit > 0)
238        .map(|(peer_id, limit)| (*peer_id, limit.min(u8::MAX as usize) as u8))
239        .collect::<Vec<_>>();
240
241    for (peer_id, limit) in snark_reqs {
242        store.dispatch(P2pChannelsSnarkAction::RequestSend { peer_id, limit });
243    }
244}