node/transition_frontier/
transition_frontier_effects.rs1use mina_p2p_messages::gossip::GossipNetMessageV2;
2use redux::Timestamp;
3
4use crate::{
5 block_producer::BlockProducerAction,
6 ledger::LEDGER_DEPTH,
7 p2p::{channels::best_tip::P2pChannelsBestTipAction, P2pNetworkPubsubAction},
8 snark_pool::{SnarkPoolAction, SnarkWork},
9 stats::sync::SyncingLedger,
10 Store, TransactionPoolAction,
11};
12
13use super::{
14 candidate::TransitionFrontierCandidateAction,
15 genesis::TransitionFrontierGenesisAction,
16 sync::{
17 ledger::{
18 snarked::{TransitionFrontierSyncLedgerSnarkedAction, ACCOUNT_SUBTREE_HEIGHT},
19 staged::TransitionFrontierSyncLedgerStagedAction,
20 transition_frontier_sync_ledger_init_effects,
21 transition_frontier_sync_ledger_snarked_success_effects,
22 transition_frontier_sync_ledger_staged_success_effects,
23 TransitionFrontierSyncLedgerAction,
24 },
25 TransitionFrontierSyncAction, TransitionFrontierSyncState,
26 },
27 TransitionFrontierAction, TransitionFrontierActionWithMeta, TransitionFrontierState,
28};
29
30pub fn transition_frontier_effects<S: crate::Service>(
33 store: &mut Store<S>,
34 action: TransitionFrontierActionWithMeta,
35) {
36 let (action, meta) = action.split();
37
38 match action {
39 TransitionFrontierAction::Genesis(a) => {
40 match &a {
43 TransitionFrontierGenesisAction::Produce => {
44 store.dispatch(TransitionFrontierAction::GenesisInject);
45 }
46 TransitionFrontierGenesisAction::ProveSuccess { .. } => {
47 store.dispatch(TransitionFrontierAction::GenesisProvenInject);
48 }
49 _ => {}
50 }
51 }
52 TransitionFrontierAction::GenesisEffect(a) => {
53 a.effects(&meta, store);
54 }
55 TransitionFrontierAction::GenesisInject => {
56 synced_effects(&meta, store);
57 }
58 TransitionFrontierAction::GenesisProvenInject => {
59 if store.state().transition_frontier.sync.is_synced() {
60 synced_effects(&meta, store);
61 }
62 }
63 TransitionFrontierAction::Candidate(_) => {}
64 TransitionFrontierAction::Sync(a) => {
65 match a {
66 TransitionFrontierSyncAction::Init {
67 ref best_tip,
68 ref root_block,
69 ..
70 } => {
71 if let Some(stats) = store.service.stats() {
72 stats.new_sync_target(meta.time(), best_tip, root_block);
73 if let TransitionFrontierSyncState::BlocksPending { chain, .. } =
74 &store.state.get().transition_frontier.sync
75 {
76 stats.syncing_blocks_init(chain);
77 }
78 }
79 }
80 TransitionFrontierSyncAction::BestTipUpdate {
81 ref best_tip,
82 ref root_block,
83 ..
84 } => {
85 if let Some(stats) = store.service.stats() {
86 stats.new_sync_target(meta.time(), best_tip, root_block);
87 if let Some(target) =
88 store.state.get().transition_frontier.sync.ledger_target()
89 {
90 stats.syncing_ledger(
91 target.kind,
92 SyncingLedger::Init {
93 snarked_ledger_hash: target.snarked_ledger_hash.clone(),
94 staged_ledger_hash: target
95 .staged
96 .as_ref()
97 .map(|v| v.hashes.non_snark.ledger_hash.clone()),
98 },
99 );
100 }
101 if let TransitionFrontierSyncState::BlocksPending { chain, .. } =
102 &store.state.get().transition_frontier.sync
103 {
104 stats.syncing_blocks_init(chain);
105 }
106 }
107 }
108 TransitionFrontierSyncAction::LedgerStakingPending => {
109 if let Some(stats) = store.service.stats() {
110 if let Some(target) =
111 store.state.get().transition_frontier.sync.ledger_target()
112 {
113 stats.syncing_ledger(
114 target.kind,
115 SyncingLedger::Init {
116 snarked_ledger_hash: target.snarked_ledger_hash.clone(),
117 staged_ledger_hash: target
118 .staged
119 .as_ref()
120 .map(|v| v.hashes.non_snark.ledger_hash.clone()),
121 },
122 );
123 }
124 }
125 }
126 TransitionFrontierSyncAction::LedgerStakingSuccess => {}
127 TransitionFrontierSyncAction::LedgerNextEpochPending => {
128 if let Some(stats) = store.service.stats() {
129 if let Some(target) =
130 store.state.get().transition_frontier.sync.ledger_target()
131 {
132 stats.syncing_ledger(
133 target.kind,
134 SyncingLedger::Init {
135 snarked_ledger_hash: target.snarked_ledger_hash.clone(),
136 staged_ledger_hash: target
137 .staged
138 .as_ref()
139 .map(|v| v.hashes.non_snark.ledger_hash.clone()),
140 },
141 );
142 }
143 }
144 }
145 TransitionFrontierSyncAction::LedgerNextEpochSuccess => {}
146 TransitionFrontierSyncAction::LedgerRootPending => {
147 if let Some(stats) = store.service.stats() {
148 if let Some(target) =
149 store.state.get().transition_frontier.sync.ledger_target()
150 {
151 stats.syncing_ledger(
152 target.kind,
153 SyncingLedger::Init {
154 snarked_ledger_hash: target.snarked_ledger_hash.clone(),
155 staged_ledger_hash: target
156 .staged
157 .as_ref()
158 .map(|v| v.hashes.non_snark.ledger_hash.clone()),
159 },
160 );
161 }
162 }
163 }
164 TransitionFrontierSyncAction::LedgerRootSuccess => {}
165 TransitionFrontierSyncAction::BlocksPending => {
166 if let Some(stats) = store.service.stats() {
167 if let TransitionFrontierSyncState::BlocksPending { chain, .. } =
168 &store.state.get().transition_frontier.sync
169 {
170 stats.syncing_blocks_init(chain);
171 }
172 }
173 }
174 TransitionFrontierSyncAction::BlocksPeersQuery => {}
175 TransitionFrontierSyncAction::BlocksPeerQueryInit { .. } => {}
176 TransitionFrontierSyncAction::BlocksPeerQueryRetry { .. } => {}
177 TransitionFrontierSyncAction::BlocksPeerQueryPending { ref hash, .. } => {
178 if let Some(stats) = store.service.stats() {
179 if let Some(state) =
180 store.state.get().transition_frontier.sync.block_state(hash)
181 {
182 stats.syncing_block_update(state);
183 }
184 }
185 }
186 TransitionFrontierSyncAction::BlocksPeerQueryError { .. } => {}
187 TransitionFrontierSyncAction::BlocksPeerQuerySuccess { .. } => {}
188 TransitionFrontierSyncAction::BlocksFetchSuccess { ref hash } => {
189 if let Some(stats) = store.service.stats() {
190 if let Some(state) =
191 store.state.get().transition_frontier.sync.block_state(hash)
192 {
193 stats.syncing_block_update(state);
194 }
195 }
196 }
197 TransitionFrontierSyncAction::BlocksNextApplyInit => {}
198 TransitionFrontierSyncAction::BlocksNextApplyPending { ref hash } => {
199 if let Some(stats) = store.service.stats() {
200 if let Some(state) =
201 store.state.get().transition_frontier.sync.block_state(hash)
202 {
203 stats.syncing_block_update(state);
204 }
205 }
206 }
207 TransitionFrontierSyncAction::BlocksNextApplyError { .. } => {}
208 TransitionFrontierSyncAction::BlocksNextApplySuccess {
209 ref hash,
210 just_emitted_a_proof: _,
211 } => {
212 if let Some(stats) = store.service.stats() {
213 if let Some(state) =
214 store.state.get().transition_frontier.sync.block_state(hash)
215 {
216 stats.syncing_block_update(state);
217 }
218 }
219 }
220 TransitionFrontierSyncAction::BlocksSendToArchive { .. } => {}
221 TransitionFrontierSyncAction::BlocksSuccess => {
222 store.dispatch(TransitionFrontierSyncAction::CommitInit);
223 }
224 TransitionFrontierSyncAction::CommitInit => {}
225 TransitionFrontierSyncAction::CommitPending => {}
226 TransitionFrontierSyncAction::CommitSuccess { result } => {
227 let own_peer_id = store.state().p2p.my_id();
231 let transition_frontier = &store.state.get().transition_frontier;
232 let TransitionFrontierSyncState::CommitSuccess { chain, .. } =
233 &transition_frontier.sync
234 else {
235 return;
236 };
237 let Some(best_tip) = chain.last() else {
238 return;
239 };
240 let orphaned_snarks = transition_frontier
241 .best_chain
242 .iter()
243 .rev()
244 .take_while(|b1| {
245 let height_diff =
246 best_tip.height().saturating_sub(b1.height()) as usize;
247 if height_diff == 0 {
248 best_tip.hash() != b1.hash()
249 } else if let Some(index) =
250 chain.len().checked_sub(height_diff.saturating_add(1))
251 {
252 chain.get(index).is_none_or(|b2| b1.hash() != b2.hash())
253 } else {
254 true
255 }
256 })
257 .flat_map(|v| v.completed_works_iter())
258 .map(|v| SnarkWork {
259 work: v.clone().into(),
260 received_t: meta.time(),
261 sender: own_peer_id,
262 })
263 .collect();
264
265 store.dispatch(TransitionFrontierAction::Synced {
266 needed_protocol_states: result.needed_protocol_states,
267 });
268 store.dispatch(SnarkPoolAction::JobsUpdate {
269 jobs: result.available_jobs,
270 orphaned_snarks,
271 });
272 return;
273 }
274 TransitionFrontierSyncAction::Ledger(ref a) => {
275 handle_transition_frontier_sync_ledger_action(a.clone(), &meta, store)
276 }
277 }
278 a.effects(&meta, store);
279 }
280 TransitionFrontierAction::Synced { .. } => {
281 synced_effects(&meta, store);
282 }
283 TransitionFrontierAction::SyncFailed { .. } => {
284 }
286 }
287}
288
289fn synced_effects<S: crate::Service>(
290 meta: &redux::ActionMeta,
291 store: &mut redux::Store<crate::State, S, crate::Action>,
292) {
293 let TransitionFrontierState {
294 best_chain,
295 chain_diff,
296 ..
297 } = &store.state.get().transition_frontier;
298
299 let Some(best_tip) = best_chain.last() else {
300 return;
301 };
302 if let Some(stats) = store.service.stats() {
303 stats.new_best_chain(meta.time(), best_chain);
304 }
305
306 let chain_diff = chain_diff.clone();
307
308 let best_tip = best_tip.clone();
310 for peer_id in store.state().p2p.ready_peers() {
311 store.dispatch(P2pChannelsBestTipAction::ResponseSend {
312 peer_id,
313 best_tip: best_tip.block.clone(),
314 });
315 }
316 if !store.dispatch(P2pNetworkPubsubAction::BroadcastValidatedMessage {
319 message_id: p2p::BroadcastMessageId::BlockHash {
320 hash: best_tip.hash().clone(),
321 },
322 }) {
323 store.dispatch(P2pNetworkPubsubAction::WebRtcRebroadcast {
325 message: GossipNetMessageV2::NewState(best_tip.block().clone()),
326 });
327 }
328
329 let best_tip_hash = best_tip.merkle_root_hash().clone();
330 store.dispatch(TransitionFrontierCandidateAction::Prune);
331 store.dispatch(BlockProducerAction::BestTipUpdate {
332 best_tip: best_tip.block.clone(),
333 });
334 store.dispatch(TransactionPoolAction::BestTipChanged {
335 best_tip_hash: best_tip_hash.clone(),
336 });
337 if let Some(diff) = chain_diff {
338 store.dispatch(TransactionPoolAction::ApplyTransitionFrontierDiff {
339 best_tip_hash,
340 diff,
341 });
342 }
343}
344
345fn handle_transition_frontier_sync_ledger_action<S: crate::Service>(
351 action: TransitionFrontierSyncLedgerAction,
352 meta: &redux::ActionMeta,
353 store: &mut redux::Store<crate::State, S, crate::Action>,
354) {
355 match action {
356 TransitionFrontierSyncLedgerAction::Init => {
357 transition_frontier_sync_ledger_init_effects(meta, store);
358 }
359 TransitionFrontierSyncLedgerAction::Snarked(a) => {
360 match a {
361 TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressInit {
362 ref address,
363 ..
364 } => {
365 if let Some(stats) = store.service.stats() {
366 let (start, end) = (meta.time(), meta.time());
367 if let Some(kind) = store
368 .state
369 .get()
370 .transition_frontier
371 .sync
372 .ledger_target_kind()
373 {
374 if address.length() < LEDGER_DEPTH - ACCOUNT_SUBTREE_HEIGHT {
375 stats.syncing_ledger(
376 kind,
377 SyncingLedger::FetchHashes { start, end },
378 );
379 } else {
380 stats.syncing_ledger(
381 kind,
382 SyncingLedger::FetchAccounts { start, end },
383 );
384 }
385 }
386 }
387 }
388 TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressSuccess {
389 peer_id,
390 rpc_id,
391 ref response,
392 } => {
393 if let Some(stats) = store.service.stats() {
394 if let Some((kind, start, end)) = store
395 .state
396 .get()
397 .transition_frontier
398 .sync
399 .ledger()
400 .and_then(|s| s.snarked())
401 .and_then(|s| {
402 Some((s.target().kind, s.peer_address_query_get(&peer_id, rpc_id)?))
403 })
404 .map(|(kind, (_, s))| (kind, s.time, meta.time()))
405 {
406 if response.is_child_hashes() {
407 stats.syncing_ledger(
408 kind,
409 SyncingLedger::FetchHashes { start, end },
410 );
411 } else if response.is_child_accounts() {
412 stats.syncing_ledger(
413 kind,
414 SyncingLedger::FetchAccounts { start, end },
415 );
416 }
417 }
418 }
419 }
420 TransitionFrontierSyncLedgerSnarkedAction::Success => {
421 transition_frontier_sync_ledger_snarked_success_effects(meta, store);
422 }
423 _ => {}
424 }
425 a.effects(meta, store);
426 }
427 TransitionFrontierSyncLedgerAction::Staged(a) => {
428 match a {
430 TransitionFrontierSyncLedgerStagedAction::PartsFetchPending => {
431 if let Some(stats) = store.service.stats() {
432 if let Some(kind) = store
433 .state
434 .get()
435 .transition_frontier
436 .sync
437 .ledger_target_kind()
438 {
439 let (start, end) = (meta.time(), None);
440 stats.syncing_ledger(kind, SyncingLedger::FetchParts { start, end });
441 }
442 }
443 }
444 TransitionFrontierSyncLedgerStagedAction::PartsFetchSuccess { .. } => {
445 if let Some(stats) = store.service.stats() {
446 let (start, end) = (Timestamp::ZERO, Some(meta.time()));
447 if let Some(kind) = store
448 .state
449 .get()
450 .transition_frontier
451 .sync
452 .ledger_target_kind()
453 {
454 stats.syncing_ledger(kind, SyncingLedger::FetchParts { start, end });
455 }
456 }
457 }
458 TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchError {
459 ref error, ..
460 } => {
461 if let Some(stats) = store.service.stats() {
462 stats.staging_ledger_fetch_failure(error, meta.time());
463 }
464 }
465 TransitionFrontierSyncLedgerStagedAction::ReconstructInit { .. } => {
466 if let Some(stats) = store.service.stats() {
467 let (start, end) = (meta.time(), None);
468 if let Some(kind) = store
469 .state
470 .get()
471 .transition_frontier
472 .sync
473 .ledger_target_kind()
474 {
475 stats.syncing_ledger(kind, SyncingLedger::ApplyParts { start, end });
476 }
477 }
478 }
479 TransitionFrontierSyncLedgerStagedAction::ReconstructSuccess { .. } => {
480 if let Some(stats) = store.service.stats() {
481 let (start, end) = (Timestamp::ZERO, Some(meta.time()));
482 if let Some(kind) = store
483 .state
484 .get()
485 .transition_frontier
486 .sync
487 .ledger_target_kind()
488 {
489 stats.syncing_ledger(kind, SyncingLedger::ApplyParts { start, end });
490 }
491 }
492 }
493 TransitionFrontierSyncLedgerStagedAction::Success => {
494 transition_frontier_sync_ledger_staged_success_effects(meta, store);
497 }
498 _ => {}
499 }
500 }
501 TransitionFrontierSyncLedgerAction::Success => {
502 match &store.state().transition_frontier.sync {
503 TransitionFrontierSyncState::StakingLedgerPending { .. } => {
504 store.dispatch(TransitionFrontierSyncAction::LedgerStakingSuccess);
505 }
506 TransitionFrontierSyncState::NextEpochLedgerPending { .. } => {
507 store.dispatch(TransitionFrontierSyncAction::LedgerNextEpochSuccess);
508 }
509 TransitionFrontierSyncState::RootLedgerPending { .. } => {
510 store.dispatch(TransitionFrontierSyncAction::LedgerRootSuccess);
511 }
512 _ => {}
513 }
514 }
515 }
516}