1use openmina_core::log::system_time;
2use rand::prelude::*;
3
4use crate::{
5 block_producer::BlockProducerAction,
6 block_producer_effectful::block_producer_effects,
7 event_source::event_source_effects,
8 external_snark_worker_effectful::external_snark_worker_effectful_effects,
9 ledger::read::LedgerReadAction,
10 ledger_effectful::ledger_effectful_effects,
11 logger::logger_effects,
12 p2p::node_p2p_effects,
13 p2p_ready,
14 rpc_effectful::rpc_effects,
15 snark::snark_effects,
16 snark_pool::{candidate::SnarkPoolCandidateAction, snark_pool_effects, SnarkPoolAction},
17 transaction_pool::candidate::TransactionPoolCandidateAction,
18 transition_frontier::{genesis::TransitionFrontierGenesisAction, transition_frontier_effects},
19 Action, ActionWithMeta, ExternalSnarkWorkerAction, Service, Store, TransactionPoolAction,
20};
21
22use crate::p2p::channels::rpc::{P2pChannelsRpcAction, P2pRpcRequest};
23
24pub fn effects<S: Service>(store: &mut Store<S>, action: ActionWithMeta) {
25 store.service.recorder().action(&action);
26
27 let (action, meta) = action.split();
28
29 if let Some(stats) = store.service.stats() {
30 stats.new_action(action.kind(), meta.clone());
31 }
32
33 logger_effects(store, meta.clone().with_action(&action));
34 match action {
35 Action::CheckTimeouts(_) => {
38 store.dispatch(TransitionFrontierGenesisAction::LedgerLoadInit);
40 store.dispatch(ExternalSnarkWorkerAction::Start);
41
42 store.dispatch(TransitionFrontierGenesisAction::ProveInit);
43
44 if store.state().p2p.ready().is_some() {
45 p2p_request_best_tip_if_needed(store);
46 p2p_request_transactions_if_needed(store);
47 p2p_request_snarks_if_needed(store);
48 }
49
50 store.dispatch(TransactionPoolAction::P2pSendAll);
51 store.dispatch(TransactionPoolCandidateAction::FetchAll);
52 store.dispatch(TransactionPoolCandidateAction::VerifyNext);
53
54 store.dispatch(SnarkPoolAction::CheckTimeouts);
55 store.dispatch(SnarkPoolAction::P2pSendAll);
56
57 store.dispatch(SnarkPoolCandidateAction::WorkFetchAll);
58 store.dispatch(SnarkPoolCandidateAction::WorkVerifyNext);
59
60 store.dispatch(ExternalSnarkWorkerAction::StartTimeout { now: meta.time() });
61 store.dispatch(ExternalSnarkWorkerAction::WorkTimeout { now: meta.time() });
62
63 store.dispatch(BlockProducerAction::WonSlotProduceInit);
64 store.dispatch(BlockProducerAction::BlockInject);
65 store.dispatch(LedgerReadAction::FindTodos);
66 }
67 Action::EventSource(action) => {
68 event_source_effects(store, meta.with_action(action));
69 }
70 Action::Snark(action) => {
71 snark_effects(store, meta.with_action(action));
72 }
73 Action::TransactionPoolEffect(action) => {
74 action.effects(store);
75 }
76 Action::TransitionFrontier(action) => {
77 transition_frontier_effects(store, meta.with_action(action));
78 }
79 Action::P2pEffectful(action) => {
80 node_p2p_effects(store, meta.with_action(action));
81 }
82 Action::LedgerEffects(action) => {
83 ledger_effectful_effects(store, meta.with_action(action));
84 }
85 Action::SnarkPoolEffect(action) => {
86 snark_pool_effects(store, meta.with_action(action));
87 }
88 Action::BlockProducerEffectful(action) => {
89 block_producer_effects(store, meta.with_action(action));
90 }
91 Action::ExternalSnarkWorkerEffects(action) => {
92 external_snark_worker_effectful_effects(store, meta.with_action(action));
93 }
94 Action::RpcEffectful(action) => {
95 rpc_effects(store, meta.with_action(action));
96 }
97 Action::BlockProducer(_)
98 | Action::SnarkPool(_)
99 | Action::ExternalSnarkWorker(_)
100 | Action::TransactionPool(_)
101 | Action::Ledger(_)
102 | Action::Rpc(_)
103 | Action::WatchedAccounts(_)
104 | Action::P2pCallbacks(_)
105 | Action::P2p(_) => {
106 }
108 }
109}
110
111fn p2p_request_best_tip_if_needed<S: Service>(store: &mut Store<S>) {
112 let state = store.state();
114 let best_candidate = state.transition_frontier.candidates.best_verified();
115 let best_candidate_hash = best_candidate.map(|s| s.block.hash());
116 let best_tip_hash = state.transition_frontier.best_tip().map(|v| &v.hash);
117 let syncing_best_tip_hash = state.transition_frontier.sync.best_tip().map(|v| &v.hash);
118
119 if best_candidate.is_some()
120 && best_candidate_hash != best_tip_hash
121 && best_candidate_hash != syncing_best_tip_hash
122 && best_candidate.is_some_and(|s| s.chain_proof.is_none())
123 {
124 request_best_tip(store, best_candidate_hash.cloned());
125 }
126}
127use mina_p2p_messages::v2::StateHash;
128
129fn request_best_tip<S: Service>(store: &mut Store<S>, consensus_best_tip_hash: Option<StateHash>) {
130 let p2p = p2p_ready!(store.state().p2p, "request_best_tip", system_time());
131
132 let (ready_peers, ready_peers_matching_best_tip) = p2p.ready_rpc_peers_iter().fold(
133 (Vec::new(), Vec::new()),
134 |(mut all, mut matching), (peer_id, peer)| {
135 let rpc_id = peer.channels.next_local_rpc_id();
136 if peer.best_tip.as_ref().map(|b| b.hash()) == consensus_best_tip_hash.as_ref() {
137 matching.push((*peer_id, rpc_id));
138 } else if matching.is_empty() {
139 all.push((*peer_id, rpc_id));
140 }
141 (all, matching)
142 },
143 );
144
145 let peers = if !ready_peers_matching_best_tip.is_empty() {
146 ready_peers_matching_best_tip
147 } else {
148 ready_peers
149 };
150
151 if let Some((peer_id, id)) = peers.choose(&mut store.state().pseudo_rng()) {
152 store.dispatch(P2pChannelsRpcAction::RequestSend {
153 peer_id: *peer_id,
154 id: *id,
155 request: Box::new(P2pRpcRequest::BestTipWithProof),
156 on_init: None,
157 });
158 }
159}
160
161fn p2p_request_transactions_if_needed<S: Service>(store: &mut Store<S>) {
162 use p2p::channels::transaction::P2pChannelsTransactionAction;
163
164 const MAX_PEER_PENDING_TXS: usize = 32;
165
166 let state = store.state();
167 let p2p = p2p_ready!(
168 state.p2p,
169 "p2p_request_transactions_if_needed",
170 system_time()
171 );
172 let transaction_reqs = p2p
173 .ready_peers_iter()
174 .filter(|(_, p)| p.channels.transaction.can_send_request())
175 .map(|(peer_id, _)| {
176 let pending_txs = state.snark_pool.candidates.peer_work_count(peer_id);
177 (peer_id, MAX_PEER_PENDING_TXS.saturating_sub(pending_txs))
178 })
179 .filter(|(_, limit)| *limit > 0)
180 .map(|(peer_id, limit)| (*peer_id, limit.min(u8::MAX as usize) as u8))
181 .collect::<Vec<_>>();
182
183 for (peer_id, limit) in transaction_reqs {
184 store.dispatch(P2pChannelsTransactionAction::RequestSend { peer_id, limit });
185 }
186}
187
188fn p2p_request_snarks_if_needed<S: Service>(store: &mut Store<S>) {
189 use p2p::channels::snark::P2pChannelsSnarkAction;
190
191 const MAX_PEER_PENDING_SNARKS: usize = 32;
192
193 let state = store.state();
194 let p2p = p2p_ready!(state.p2p, "p2p_request_snarks_if_needed", system_time());
195 let snark_reqs = p2p
196 .ready_peers_iter()
197 .filter(|(_, p)| p.channels.snark.can_send_request())
198 .map(|(peer_id, _)| {
199 let pending_snarks = state.snark_pool.candidates.peer_work_count(peer_id);
200 (
201 peer_id,
202 MAX_PEER_PENDING_SNARKS.saturating_sub(pending_snarks),
203 )
204 })
205 .filter(|(_, limit)| *limit > 0)
206 .map(|(peer_id, limit)| (*peer_id, limit.min(u8::MAX as usize) as u8))
207 .collect::<Vec<_>>();
208
209 for (peer_id, limit) in snark_reqs {
210 store.dispatch(P2pChannelsSnarkAction::RequestSend { peer_id, limit });
211 }
212}