node/transition_frontier/
transition_frontier_effects.rs

1use mina_p2p_messages::gossip::GossipNetMessageV2;
2use redux::Timestamp;
3
4use crate::{
5    block_producer::BlockProducerAction,
6    ledger::LEDGER_DEPTH,
7    p2p::{channels::best_tip::P2pChannelsBestTipAction, P2pNetworkPubsubAction},
8    snark_pool::{SnarkPoolAction, SnarkWork},
9    stats::sync::SyncingLedger,
10    Store, TransactionPoolAction,
11};
12
13use super::{
14    candidate::TransitionFrontierCandidateAction,
15    genesis::TransitionFrontierGenesisAction,
16    sync::{
17        ledger::{
18            snarked::{TransitionFrontierSyncLedgerSnarkedAction, ACCOUNT_SUBTREE_HEIGHT},
19            staged::TransitionFrontierSyncLedgerStagedAction,
20            transition_frontier_sync_ledger_init_effects,
21            transition_frontier_sync_ledger_snarked_success_effects,
22            transition_frontier_sync_ledger_staged_success_effects,
23            TransitionFrontierSyncLedgerAction,
24        },
25        TransitionFrontierSyncAction, TransitionFrontierSyncState,
26    },
27    TransitionFrontierAction, TransitionFrontierActionWithMeta, TransitionFrontierState,
28};
29
30// TODO(refactor): all service accesses are for stats, how should that be handled?
31
32pub fn transition_frontier_effects<S: crate::Service>(
33    store: &mut Store<S>,
34    action: TransitionFrontierActionWithMeta,
35) {
36    let (action, meta) = action.split();
37
38    match action {
39        TransitionFrontierAction::Genesis(a) => {
40            // TODO(refactor): this should be handled by a callback and removed from here
41            // whenever any of these is going to happen, genesisinject must happen first
42            match &a {
43                TransitionFrontierGenesisAction::Produce => {
44                    store.dispatch(TransitionFrontierAction::GenesisInject);
45                }
46                TransitionFrontierGenesisAction::ProveSuccess { .. } => {
47                    store.dispatch(TransitionFrontierAction::GenesisProvenInject);
48                }
49                _ => {}
50            }
51        }
52        TransitionFrontierAction::GenesisEffect(a) => {
53            a.effects(&meta, store);
54        }
55        TransitionFrontierAction::GenesisInject => {
56            synced_effects(&meta, store);
57        }
58        TransitionFrontierAction::GenesisProvenInject => {
59            if store.state().transition_frontier.sync.is_synced() {
60                synced_effects(&meta, store);
61            }
62        }
63        TransitionFrontierAction::Candidate(_) => {}
64        TransitionFrontierAction::Sync(a) => {
65            match a {
66                TransitionFrontierSyncAction::Init {
67                    ref best_tip,
68                    ref root_block,
69                    ..
70                } => {
71                    if let Some(stats) = store.service.stats() {
72                        stats.new_sync_target(meta.time(), best_tip, root_block);
73                        if let TransitionFrontierSyncState::BlocksPending { chain, .. } =
74                            &store.state.get().transition_frontier.sync
75                        {
76                            stats.syncing_blocks_init(chain);
77                        }
78                    }
79                }
80                TransitionFrontierSyncAction::BestTipUpdate {
81                    ref best_tip,
82                    ref root_block,
83                    ..
84                } => {
85                    if let Some(stats) = store.service.stats() {
86                        stats.new_sync_target(meta.time(), best_tip, root_block);
87                        if let Some(target) =
88                            store.state.get().transition_frontier.sync.ledger_target()
89                        {
90                            stats.syncing_ledger(
91                                target.kind,
92                                SyncingLedger::Init {
93                                    snarked_ledger_hash: target.snarked_ledger_hash.clone(),
94                                    staged_ledger_hash: target
95                                        .staged
96                                        .as_ref()
97                                        .map(|v| v.hashes.non_snark.ledger_hash.clone()),
98                                },
99                            );
100                        }
101                        if let TransitionFrontierSyncState::BlocksPending { chain, .. } =
102                            &store.state.get().transition_frontier.sync
103                        {
104                            stats.syncing_blocks_init(chain);
105                        }
106                    }
107                }
108                TransitionFrontierSyncAction::LedgerStakingPending => {
109                    if let Some(stats) = store.service.stats() {
110                        if let Some(target) =
111                            store.state.get().transition_frontier.sync.ledger_target()
112                        {
113                            stats.syncing_ledger(
114                                target.kind,
115                                SyncingLedger::Init {
116                                    snarked_ledger_hash: target.snarked_ledger_hash.clone(),
117                                    staged_ledger_hash: target
118                                        .staged
119                                        .as_ref()
120                                        .map(|v| v.hashes.non_snark.ledger_hash.clone()),
121                                },
122                            );
123                        }
124                    }
125                }
126                TransitionFrontierSyncAction::LedgerStakingSuccess => {}
127                TransitionFrontierSyncAction::LedgerNextEpochPending => {
128                    if let Some(stats) = store.service.stats() {
129                        if let Some(target) =
130                            store.state.get().transition_frontier.sync.ledger_target()
131                        {
132                            stats.syncing_ledger(
133                                target.kind,
134                                SyncingLedger::Init {
135                                    snarked_ledger_hash: target.snarked_ledger_hash.clone(),
136                                    staged_ledger_hash: target
137                                        .staged
138                                        .as_ref()
139                                        .map(|v| v.hashes.non_snark.ledger_hash.clone()),
140                                },
141                            );
142                        }
143                    }
144                }
145                TransitionFrontierSyncAction::LedgerNextEpochSuccess => {}
146                TransitionFrontierSyncAction::LedgerRootPending => {
147                    if let Some(stats) = store.service.stats() {
148                        if let Some(target) =
149                            store.state.get().transition_frontier.sync.ledger_target()
150                        {
151                            stats.syncing_ledger(
152                                target.kind,
153                                SyncingLedger::Init {
154                                    snarked_ledger_hash: target.snarked_ledger_hash.clone(),
155                                    staged_ledger_hash: target
156                                        .staged
157                                        .as_ref()
158                                        .map(|v| v.hashes.non_snark.ledger_hash.clone()),
159                                },
160                            );
161                        }
162                    }
163                }
164                TransitionFrontierSyncAction::LedgerRootSuccess => {}
165                TransitionFrontierSyncAction::BlocksPending => {
166                    if let Some(stats) = store.service.stats() {
167                        if let TransitionFrontierSyncState::BlocksPending { chain, .. } =
168                            &store.state.get().transition_frontier.sync
169                        {
170                            stats.syncing_blocks_init(chain);
171                        }
172                    }
173                }
174                TransitionFrontierSyncAction::BlocksPeersQuery => {}
175                TransitionFrontierSyncAction::BlocksPeerQueryInit { .. } => {}
176                TransitionFrontierSyncAction::BlocksPeerQueryRetry { .. } => {}
177                TransitionFrontierSyncAction::BlocksPeerQueryPending { ref hash, .. } => {
178                    if let Some(stats) = store.service.stats() {
179                        if let Some(state) =
180                            store.state.get().transition_frontier.sync.block_state(hash)
181                        {
182                            stats.syncing_block_update(state);
183                        }
184                    }
185                }
186                TransitionFrontierSyncAction::BlocksPeerQueryError { .. } => {}
187                TransitionFrontierSyncAction::BlocksPeerQuerySuccess { .. } => {}
188                TransitionFrontierSyncAction::BlocksFetchSuccess { ref hash } => {
189                    if let Some(stats) = store.service.stats() {
190                        if let Some(state) =
191                            store.state.get().transition_frontier.sync.block_state(hash)
192                        {
193                            stats.syncing_block_update(state);
194                        }
195                    }
196                }
197                TransitionFrontierSyncAction::BlocksNextApplyInit => {}
198                TransitionFrontierSyncAction::BlocksNextApplyPending { ref hash } => {
199                    if let Some(stats) = store.service.stats() {
200                        if let Some(state) =
201                            store.state.get().transition_frontier.sync.block_state(hash)
202                        {
203                            stats.syncing_block_update(state);
204                        }
205                    }
206                }
207                TransitionFrontierSyncAction::BlocksNextApplyError { .. } => {}
208                TransitionFrontierSyncAction::BlocksNextApplySuccess {
209                    ref hash,
210                    just_emitted_a_proof: _,
211                } => {
212                    if let Some(stats) = store.service.stats() {
213                        if let Some(state) =
214                            store.state.get().transition_frontier.sync.block_state(hash)
215                        {
216                            stats.syncing_block_update(state);
217                        }
218                    }
219                }
220                TransitionFrontierSyncAction::BlocksSendToArchive { .. } => {}
221                TransitionFrontierSyncAction::BlocksSuccess => {
222                    store.dispatch(TransitionFrontierSyncAction::CommitInit);
223                }
224                TransitionFrontierSyncAction::CommitInit => {}
225                TransitionFrontierSyncAction::CommitPending => {}
226                TransitionFrontierSyncAction::CommitSuccess { result } => {
227                    // TODO(refactor): needs to be moved to the reducer in the sync module,
228                    // but that will result in extra cloning until the reducers
229                    // take the action by value instead of reference
230                    let own_peer_id = store.state().p2p.my_id();
231                    let transition_frontier = &store.state.get().transition_frontier;
232                    let TransitionFrontierSyncState::CommitSuccess { chain, .. } =
233                        &transition_frontier.sync
234                    else {
235                        return;
236                    };
237                    let Some(best_tip) = chain.last() else {
238                        return;
239                    };
240                    let orphaned_snarks = transition_frontier
241                        .best_chain
242                        .iter()
243                        .rev()
244                        .take_while(|b1| {
245                            let height_diff =
246                                best_tip.height().saturating_sub(b1.height()) as usize;
247                            if height_diff == 0 {
248                                best_tip.hash() != b1.hash()
249                            } else if let Some(index) =
250                                chain.len().checked_sub(height_diff.saturating_add(1))
251                            {
252                                chain.get(index).is_none_or(|b2| b1.hash() != b2.hash())
253                            } else {
254                                true
255                            }
256                        })
257                        .flat_map(|v| v.completed_works_iter())
258                        .map(|v| SnarkWork {
259                            work: v.clone().into(),
260                            received_t: meta.time(),
261                            sender: own_peer_id,
262                        })
263                        .collect();
264
265                    store.dispatch(TransitionFrontierAction::Synced {
266                        needed_protocol_states: result.needed_protocol_states,
267                    });
268                    store.dispatch(SnarkPoolAction::JobsUpdate {
269                        jobs: result.available_jobs,
270                        orphaned_snarks,
271                    });
272                    return;
273                }
274                TransitionFrontierSyncAction::Ledger(ref a) => {
275                    handle_transition_frontier_sync_ledger_action(a.clone(), &meta, store)
276                }
277            }
278            a.effects(&meta, store);
279        }
280        TransitionFrontierAction::Synced { .. } => {
281            synced_effects(&meta, store);
282        }
283        TransitionFrontierAction::SyncFailed { .. } => {
284            // TODO(SEC): disconnect/blacklist peers that caused this.
285        }
286    }
287}
288
289fn synced_effects<S: crate::Service>(
290    meta: &redux::ActionMeta,
291    store: &mut redux::Store<crate::State, S, crate::Action>,
292) {
293    let TransitionFrontierState {
294        best_chain,
295        chain_diff,
296        ..
297    } = &store.state.get().transition_frontier;
298
299    let Some(best_tip) = best_chain.last() else {
300        return;
301    };
302    if let Some(stats) = store.service.stats() {
303        stats.new_best_chain(meta.time(), best_chain);
304    }
305
306    let chain_diff = chain_diff.clone();
307
308    // publish new best tip.
309    let best_tip = best_tip.clone();
310    for peer_id in store.state().p2p.ready_peers() {
311        store.dispatch(P2pChannelsBestTipAction::ResponseSend {
312            peer_id,
313            best_tip: best_tip.block.clone(),
314        });
315    }
316    // TODO this should be handled by a callback
317    // If this get dispatched, we received block from libp2p.
318    if !store.dispatch(P2pNetworkPubsubAction::BroadcastValidatedMessage {
319        message_id: p2p::BroadcastMessageId::BlockHash {
320            hash: best_tip.hash().clone(),
321        },
322    }) {
323        // Otherwise block was received from WebRTC so inject it in libp2p.
324        store.dispatch(P2pNetworkPubsubAction::WebRtcRebroadcast {
325            message: GossipNetMessageV2::NewState(best_tip.block().clone()),
326        });
327    }
328
329    let best_tip_hash = best_tip.merkle_root_hash().clone();
330    store.dispatch(TransitionFrontierCandidateAction::Prune);
331    store.dispatch(BlockProducerAction::BestTipUpdate {
332        best_tip: best_tip.block.clone(),
333    });
334    store.dispatch(TransactionPoolAction::BestTipChanged {
335        best_tip_hash: best_tip_hash.clone(),
336    });
337    if let Some(diff) = chain_diff {
338        store.dispatch(TransactionPoolAction::ApplyTransitionFrontierDiff {
339            best_tip_hash,
340            diff,
341        });
342    }
343}
344
345// Handling of the actions related to the synchronization of a target ledger
346// in either one of the epoch ledgers or the root of the transition frontier
347// happens here. These are part of the bootstrap process and should not happen
348// again unless the node needs to re-bootstrap (either because of a reorg or
349// a long desync).
350fn handle_transition_frontier_sync_ledger_action<S: crate::Service>(
351    action: TransitionFrontierSyncLedgerAction,
352    meta: &redux::ActionMeta,
353    store: &mut redux::Store<crate::State, S, crate::Action>,
354) {
355    match action {
356        TransitionFrontierSyncLedgerAction::Init => {
357            transition_frontier_sync_ledger_init_effects(meta, store);
358        }
359        TransitionFrontierSyncLedgerAction::Snarked(a) => {
360            match a {
361                TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressInit {
362                    ref address,
363                    ..
364                } => {
365                    if let Some(stats) = store.service.stats() {
366                        let (start, end) = (meta.time(), meta.time());
367                        if let Some(kind) = store
368                            .state
369                            .get()
370                            .transition_frontier
371                            .sync
372                            .ledger_target_kind()
373                        {
374                            if address.length() < LEDGER_DEPTH - ACCOUNT_SUBTREE_HEIGHT {
375                                stats.syncing_ledger(
376                                    kind,
377                                    SyncingLedger::FetchHashes { start, end },
378                                );
379                            } else {
380                                stats.syncing_ledger(
381                                    kind,
382                                    SyncingLedger::FetchAccounts { start, end },
383                                );
384                            }
385                        }
386                    }
387                }
388                TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressSuccess {
389                    peer_id,
390                    rpc_id,
391                    ref response,
392                } => {
393                    if let Some(stats) = store.service.stats() {
394                        if let Some((kind, start, end)) = store
395                            .state
396                            .get()
397                            .transition_frontier
398                            .sync
399                            .ledger()
400                            .and_then(|s| s.snarked())
401                            .and_then(|s| {
402                                Some((s.target().kind, s.peer_address_query_get(&peer_id, rpc_id)?))
403                            })
404                            .map(|(kind, (_, s))| (kind, s.time, meta.time()))
405                        {
406                            if response.is_child_hashes() {
407                                stats.syncing_ledger(
408                                    kind,
409                                    SyncingLedger::FetchHashes { start, end },
410                                );
411                            } else if response.is_child_accounts() {
412                                stats.syncing_ledger(
413                                    kind,
414                                    SyncingLedger::FetchAccounts { start, end },
415                                );
416                            }
417                        }
418                    }
419                }
420                TransitionFrontierSyncLedgerSnarkedAction::Success => {
421                    transition_frontier_sync_ledger_snarked_success_effects(meta, store);
422                }
423                _ => {}
424            }
425            a.effects(meta, store);
426        }
427        TransitionFrontierSyncLedgerAction::Staged(a) => {
428            // TODO(refactor): these should be handled with callbacks or something
429            match a {
430                TransitionFrontierSyncLedgerStagedAction::PartsFetchPending => {
431                    if let Some(stats) = store.service.stats() {
432                        if let Some(kind) = store
433                            .state
434                            .get()
435                            .transition_frontier
436                            .sync
437                            .ledger_target_kind()
438                        {
439                            let (start, end) = (meta.time(), None);
440                            stats.syncing_ledger(kind, SyncingLedger::FetchParts { start, end });
441                        }
442                    }
443                }
444                TransitionFrontierSyncLedgerStagedAction::PartsFetchSuccess { .. } => {
445                    if let Some(stats) = store.service.stats() {
446                        let (start, end) = (Timestamp::ZERO, Some(meta.time()));
447                        if let Some(kind) = store
448                            .state
449                            .get()
450                            .transition_frontier
451                            .sync
452                            .ledger_target_kind()
453                        {
454                            stats.syncing_ledger(kind, SyncingLedger::FetchParts { start, end });
455                        }
456                    }
457                }
458                TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchError {
459                    ref error, ..
460                } => {
461                    if let Some(stats) = store.service.stats() {
462                        stats.staging_ledger_fetch_failure(error, meta.time());
463                    }
464                }
465                TransitionFrontierSyncLedgerStagedAction::ReconstructInit { .. } => {
466                    if let Some(stats) = store.service.stats() {
467                        let (start, end) = (meta.time(), None);
468                        if let Some(kind) = store
469                            .state
470                            .get()
471                            .transition_frontier
472                            .sync
473                            .ledger_target_kind()
474                        {
475                            stats.syncing_ledger(kind, SyncingLedger::ApplyParts { start, end });
476                        }
477                    }
478                }
479                TransitionFrontierSyncLedgerStagedAction::ReconstructSuccess { .. } => {
480                    if let Some(stats) = store.service.stats() {
481                        let (start, end) = (Timestamp::ZERO, Some(meta.time()));
482                        if let Some(kind) = store
483                            .state
484                            .get()
485                            .transition_frontier
486                            .sync
487                            .ledger_target_kind()
488                        {
489                            stats.syncing_ledger(kind, SyncingLedger::ApplyParts { start, end });
490                        }
491                    }
492                }
493                TransitionFrontierSyncLedgerStagedAction::Success => {
494                    // TODO(refactor): this one in particular must be a callback, others
495                    // are just stats updates
496                    transition_frontier_sync_ledger_staged_success_effects(meta, store);
497                }
498                _ => {}
499            }
500        }
501        TransitionFrontierSyncLedgerAction::Success => {
502            match &store.state().transition_frontier.sync {
503                TransitionFrontierSyncState::StakingLedgerPending { .. } => {
504                    store.dispatch(TransitionFrontierSyncAction::LedgerStakingSuccess);
505                }
506                TransitionFrontierSyncState::NextEpochLedgerPending { .. } => {
507                    store.dispatch(TransitionFrontierSyncAction::LedgerNextEpochSuccess);
508                }
509                TransitionFrontierSyncState::RootLedgerPending { .. } => {
510                    store.dispatch(TransitionFrontierSyncAction::LedgerRootSuccess);
511                }
512                _ => {}
513            }
514        }
515    }
516}