1use mina_core::log::system_time;
2use rand::prelude::*;
3
4use crate::{
5 block_producer::BlockProducerAction,
6 block_producer_effectful::block_producer_effects,
7 event_source::event_source_effects,
8 external_snark_worker_effectful::external_snark_worker_effectful_effects,
9 ledger::read::LedgerReadAction,
10 ledger_effectful::ledger_effectful_effects,
11 logger::logger_effects,
12 p2p::node_p2p_effects,
13 p2p_ready,
14 rpc_effectful::rpc_effects,
15 snark::snark_effects,
16 snark_pool::{candidate::SnarkPoolCandidateAction, snark_pool_effects, SnarkPoolAction},
17 transaction_pool::candidate::TransactionPoolCandidateAction,
18 transition_frontier::{genesis::TransitionFrontierGenesisAction, transition_frontier_effects},
19 Action, ActionWithMeta, CheckInvalidPeersAction, ExternalSnarkWorkerAction, Service, Store,
20 TransactionPoolAction,
21};
22
23use crate::p2p::channels::rpc::{P2pChannelsRpcAction, P2pRpcRequest};
24
25pub fn effects<S: Service>(store: &mut Store<S>, action: ActionWithMeta) {
26 store.service.recorder().action(&action);
27
28 let (action, meta) = action.split();
29
30 if let Some(stats) = store.service.stats() {
31 stats.new_action(action.kind(), meta.clone());
32 }
33
34 logger_effects(store, meta.clone().with_action(&action));
35 match action {
36 Action::CheckTimeouts(_) => {
39 store.dispatch(TransitionFrontierGenesisAction::LedgerLoadInit);
41 store.dispatch(ExternalSnarkWorkerAction::Start);
42
43 store.dispatch(TransitionFrontierGenesisAction::ProveInit);
44
45 if store.state().p2p.ready().is_some() {
46 p2p_request_best_tip_if_needed(store);
47 p2p_request_transactions_if_needed(store);
48 p2p_request_snarks_if_needed(store);
49 }
50
51 store.dispatch(TransactionPoolAction::P2pSendAll);
52 store.dispatch(TransactionPoolCandidateAction::FetchAll);
53 store.dispatch(TransactionPoolCandidateAction::VerifyNext);
54
55 store.dispatch(SnarkPoolAction::CheckTimeouts);
56 store.dispatch(SnarkPoolAction::P2pSendAll);
57
58 store.dispatch(SnarkPoolCandidateAction::WorkFetchAll);
59 store.dispatch(SnarkPoolCandidateAction::WorkVerifyNext);
60
61 store.dispatch(ExternalSnarkWorkerAction::StartTimeout { now: meta.time() });
62 store.dispatch(ExternalSnarkWorkerAction::WorkTimeout { now: meta.time() });
63
64 store.dispatch(BlockProducerAction::WonSlotProduceInit);
65 store.dispatch(BlockProducerAction::BlockInject);
66 store.dispatch(LedgerReadAction::FindTodos);
67
68 store.dispatch(CheckInvalidPeersAction {});
69 }
70 Action::EventSource(action) => {
71 event_source_effects(store, meta.with_action(action));
72 }
73 Action::Snark(action) => {
74 snark_effects(store, meta.with_action(action));
75 }
76 Action::TransactionPoolEffect(action) => {
77 action.effects(store);
78 }
79 Action::TransitionFrontier(action) => {
80 transition_frontier_effects(store, meta.with_action(action));
81 }
82 Action::P2pEffectful(action) => {
83 node_p2p_effects(store, meta.with_action(action));
84 }
85 Action::LedgerEffects(action) => {
86 ledger_effectful_effects(store, meta.with_action(action));
87 }
88 Action::SnarkPoolEffect(action) => {
89 snark_pool_effects(store, meta.with_action(action));
90 }
91 Action::BlockProducerEffectful(action) => {
92 block_producer_effects(store, meta.with_action(action));
93 }
94 Action::ExternalSnarkWorkerEffects(action) => {
95 external_snark_worker_effectful_effects(store, meta.with_action(action));
96 }
97 Action::RpcEffectful(action) => {
98 rpc_effects(store, meta.with_action(action));
99 }
100 Action::CheckInvalidPeersAction(_) => {
101 let state = store.state();
102 let initial_peers = state
103 .p2p
104 .config()
105 .initial_peers
106 .iter()
107 .map(|opts| opts.to_string())
108 .collect::<Vec<_>>()
109 .join(", ");
110
111 crate::core::error!(
112 crate::core::log::system_time();
113 error = "Invalid initial peers",
114 initial_peers = initial_peers
115 );
116
117 #[cfg(feature = "exit-on-invalid-peers")]
118 {
119 crate::core::error!(
120 crate::core::log::system_time();
121 summary = "Exiting due to invalid peers",
122 );
123 std::process::exit(1);
124 }
125 }
126 Action::BlockProducer(_)
127 | Action::SnarkPool(_)
128 | Action::ExternalSnarkWorker(_)
129 | Action::TransactionPool(_)
130 | Action::Ledger(_)
131 | Action::Rpc(_)
132 | Action::WatchedAccounts(_)
133 | Action::P2pCallbacks(_)
134 | Action::P2p(_) => {
135 }
137 }
138}
139
140fn p2p_request_best_tip_if_needed<S: Service>(store: &mut Store<S>) {
141 let state = store.state();
143 let best_candidate = state.transition_frontier.candidates.best_verified();
144 let best_candidate_hash = best_candidate.map(|s| s.block.hash());
145 let best_tip_hash = state.transition_frontier.best_tip().map(|v| &v.hash);
146 let syncing_best_tip_hash = state.transition_frontier.sync.best_tip().map(|v| &v.hash);
147
148 if best_candidate.is_some()
149 && best_candidate_hash != best_tip_hash
150 && best_candidate_hash != syncing_best_tip_hash
151 && best_candidate.is_some_and(|s| s.chain_proof.is_none())
152 {
153 request_best_tip(store, best_candidate_hash.cloned());
154 }
155}
156use mina_p2p_messages::v2::StateHash;
157
158fn request_best_tip<S: Service>(store: &mut Store<S>, consensus_best_tip_hash: Option<StateHash>) {
159 let p2p = p2p_ready!(store.state().p2p, "request_best_tip", system_time());
160
161 let (ready_peers, ready_peers_matching_best_tip) = p2p.ready_rpc_peers_iter().fold(
162 (Vec::new(), Vec::new()),
163 |(mut all, mut matching), (peer_id, peer)| {
164 let rpc_id = peer.channels.next_local_rpc_id();
165 if peer.best_tip.as_ref().map(|b| b.hash()) == consensus_best_tip_hash.as_ref() {
166 matching.push((*peer_id, rpc_id));
167 } else if matching.is_empty() {
168 all.push((*peer_id, rpc_id));
169 }
170 (all, matching)
171 },
172 );
173
174 let peers = if !ready_peers_matching_best_tip.is_empty() {
175 ready_peers_matching_best_tip
176 } else {
177 ready_peers
178 };
179
180 if let Some((peer_id, id)) = peers.choose(&mut store.state().pseudo_rng()) {
181 store.dispatch(P2pChannelsRpcAction::RequestSend {
182 peer_id: *peer_id,
183 id: *id,
184 request: Box::new(P2pRpcRequest::BestTipWithProof),
185 on_init: None,
186 });
187 }
188}
189
190fn p2p_request_transactions_if_needed<S: Service>(store: &mut Store<S>) {
191 use crate::p2p::channels::transaction::P2pChannelsTransactionAction;
192
193 const MAX_PEER_PENDING_TXS: usize = 32;
194
195 let state = store.state();
196 let p2p = p2p_ready!(
197 state.p2p,
198 "p2p_request_transactions_if_needed",
199 system_time()
200 );
201 let transaction_reqs = p2p
202 .ready_peers_iter()
203 .filter(|(_, p)| p.channels.transaction.can_send_request())
204 .map(|(peer_id, _)| {
205 let pending_txs = state
206 .transaction_pool
207 .candidates
208 .peer_transaction_count(peer_id);
209 (peer_id, MAX_PEER_PENDING_TXS.saturating_sub(pending_txs))
210 })
211 .filter(|(_, limit)| *limit > 0)
212 .map(|(peer_id, limit)| (*peer_id, limit.min(u8::MAX as usize) as u8))
213 .collect::<Vec<_>>();
214
215 for (peer_id, limit) in transaction_reqs {
216 store.dispatch(P2pChannelsTransactionAction::RequestSend { peer_id, limit });
217 }
218}
219
220fn p2p_request_snarks_if_needed<S: Service>(store: &mut Store<S>) {
221 use crate::p2p::channels::snark::P2pChannelsSnarkAction;
222
223 const MAX_PEER_PENDING_SNARKS: usize = 32;
224
225 let state = store.state();
226 let p2p = p2p_ready!(state.p2p, "p2p_request_snarks_if_needed", system_time());
227 let snark_reqs = p2p
228 .ready_peers_iter()
229 .filter(|(_, p)| p.channels.snark.can_send_request())
230 .map(|(peer_id, _)| {
231 let pending_snarks = state.snark_pool.candidates.peer_work_count(peer_id);
232 (
233 peer_id,
234 MAX_PEER_PENDING_SNARKS.saturating_sub(pending_snarks),
235 )
236 })
237 .filter(|(_, limit)| *limit > 0)
238 .map(|(peer_id, limit)| (*peer_id, limit.min(u8::MAX as usize) as u8))
239 .collect::<Vec<_>>();
240
241 for (peer_id, limit) in snark_reqs {
242 store.dispatch(P2pChannelsSnarkAction::RequestSend { peer_id, limit });
243 }
244}