mina_node/transition_frontier/sync/
transition_frontier_sync_effects.rs

1use crate::p2p::{
2    channels::rpc::{P2pChannelsRpcAction, P2pRpcId},
3    P2pNetworkPubsubAction, PeerId,
4};
5use mina_core::block::{AppliedBlock, ArcBlockWithHash};
6use mina_p2p_messages::v2::LedgerHash;
7use redux::ActionMeta;
8
9use crate::{
10    ledger::write::{LedgerWriteAction, LedgerWriteRequest, LedgersToKeep},
11    p2p::channels::rpc::P2pRpcRequest,
12    p2p_ready,
13    service::TransitionFrontierSyncLedgerSnarkedService,
14    Service, Store, TransitionFrontierAction,
15};
16
17use super::{
18    ledger::{
19        snarked::TransitionFrontierSyncLedgerSnarkedAction,
20        staged::TransitionFrontierSyncLedgerStagedAction, SyncLedgerTarget,
21        TransitionFrontierSyncLedgerAction,
22    },
23    SyncError, TransitionFrontierSyncAction, TransitionFrontierSyncState,
24};
25
26impl TransitionFrontierSyncAction {
27    pub fn effects<S>(&self, meta: &ActionMeta, store: &mut Store<S>)
28    where
29        S: Service,
30    {
31        match self {
32            TransitionFrontierSyncAction::Init { best_tip, .. } => {
33                let protocol_state_body = &best_tip.block.header.protocol_state.body;
34                let genesis_ledger_hash = &protocol_state_body.blockchain_state.genesis_ledger_hash;
35                let staking_epoch_ledger_hash = &protocol_state_body
36                    .consensus_state
37                    .staking_epoch_data
38                    .ledger
39                    .hash;
40                let next_epoch_ledger_hash = &protocol_state_body
41                    .consensus_state
42                    .next_epoch_data
43                    .ledger
44                    .hash;
45
46                // TODO(tizoc): if root ledger matches genesis, should anything special be done?
47                // snarked ledger will not need to be synced but staged ledger parts are still
48                // required
49
50                if genesis_ledger_hash != staking_epoch_ledger_hash {
51                    store.dispatch(TransitionFrontierSyncAction::LedgerStakingPending);
52                } else if genesis_ledger_hash != next_epoch_ledger_hash {
53                    store.dispatch(TransitionFrontierSyncAction::LedgerNextEpochPending);
54                } else {
55                    store.dispatch(TransitionFrontierSyncAction::LedgerRootPending);
56                }
57            }
58            TransitionFrontierSyncAction::BestTipUpdate {
59                previous_root_snarked_ledger_hash,
60                best_tip,
61                on_success,
62                ..
63            } => {
64                // TODO(tizoc): this is currently required because how how complicated the BestTipUpdate reducer is,
65                // once that is simplified this should be handled in separate actions.
66                maybe_copy_ledgers_for_sync(
67                    store,
68                    previous_root_snarked_ledger_hash.clone(),
69                    best_tip,
70                )
71                .unwrap();
72
73                // if root snarked ledger changed.
74                store.dispatch(TransitionFrontierSyncLedgerAction::Init);
75                // if root snarked ledger stayed same but root block changed
76                // while reconstructing staged ledger.
77                store.dispatch(TransitionFrontierSyncLedgerStagedAction::PartsFetchPending);
78                store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery);
79                // if we don't need to sync root staged ledger.
80                store.dispatch(TransitionFrontierSyncAction::BlocksPeersQuery);
81                // if we already have a block ready to be applied.
82                store.dispatch(TransitionFrontierSyncAction::BlocksNextApplyInit);
83
84                // TODO(binier): cleanup ledgers
85                if let Some(callback) = on_success {
86                    store.dispatch_callback(callback.clone(), ());
87                }
88            }
89            // TODO(tizoc): this action is never called with the current implementation,
90            // either remove it or figure out how to recover it as a reaction to
91            // `BestTipUpdate` above. Currently this logic is handled by
92            // `maybe_copy_ledgers_for_sync` at the end of this file.
93            // Same kind of applies to `LedgerNextEpochPending` and `LedgerRootPending`
94            // in some cases, but issue is mostly about `LedgerStakingPending` because
95            // it is the one most likely to be affected by the first `BestTipUpdate`
96            // action processed by the state machine.
97            TransitionFrontierSyncAction::LedgerStakingPending => {
98                prepare_staking_epoch_ledger_for_sync(store, &sync_best_tip(store.state()))
99                    .unwrap();
100
101                store.dispatch(TransitionFrontierSyncLedgerAction::Init);
102            }
103            TransitionFrontierSyncAction::LedgerStakingSuccess => {
104                if store.dispatch(TransitionFrontierSyncAction::LedgerNextEpochPending) {
105                } else if store.dispatch(TransitionFrontierSyncAction::LedgerRootPending) {
106                }
107            }
108            TransitionFrontierSyncAction::LedgerNextEpochPending => {
109                prepare_next_epoch_ledger_for_sync(store, &sync_best_tip(store.state())).unwrap();
110
111                store.dispatch(TransitionFrontierSyncLedgerAction::Init);
112            }
113            TransitionFrontierSyncAction::LedgerNextEpochSuccess => {
114                store.dispatch(TransitionFrontierSyncAction::LedgerRootPending);
115            }
116            TransitionFrontierSyncAction::LedgerRootPending => {
117                prepare_transition_frontier_root_ledger_for_sync(
118                    store,
119                    None,
120                    &sync_best_tip(store.state()),
121                )
122                .unwrap();
123
124                store.dispatch(TransitionFrontierSyncLedgerAction::Init);
125            }
126            TransitionFrontierSyncAction::LedgerRootSuccess => {
127                store.dispatch(TransitionFrontierSyncAction::BlocksPending);
128            }
129            TransitionFrontierSyncAction::BlocksPending => {
130                if !store.dispatch(TransitionFrontierSyncAction::BlocksSuccess) {
131                    store.dispatch(TransitionFrontierSyncAction::BlocksPeersQuery);
132                }
133            }
134            TransitionFrontierSyncAction::BlocksPeersQuery => {
135                let p2p = p2p_ready!(store.state().p2p, meta.time());
136                // TODO(binier): make sure they have the ledger we want to query.
137                let mut peer_ids = p2p
138                    .ready_peers_iter()
139                    .filter(|(_, p)| p.channels.rpc.can_send_request())
140                    .map(|(id, p)| (*id, p.connected_since))
141                    .collect::<Vec<_>>();
142                peer_ids.sort_by(|(_, t1), (_, t2)| t2.cmp(t1));
143
144                let mut retry_hashes = store
145                    .state()
146                    .transition_frontier
147                    .sync
148                    .blocks_fetch_retry_iter()
149                    .collect::<Vec<_>>();
150                retry_hashes.reverse();
151
152                for (peer_id, _) in peer_ids {
153                    if let Some(hash) = retry_hashes.last() {
154                        if store.dispatch(TransitionFrontierSyncAction::BlocksPeerQueryRetry {
155                            peer_id,
156                            hash: hash.clone(),
157                        }) {
158                            retry_hashes.pop();
159                            continue;
160                        }
161                    }
162
163                    match store.state().transition_frontier.sync.blocks_fetch_next() {
164                        Some(hash) => {
165                            store.dispatch(TransitionFrontierSyncAction::BlocksPeerQueryInit {
166                                peer_id,
167                                hash,
168                            });
169                        }
170                        None if retry_hashes.is_empty() => break,
171                        None => {}
172                    }
173                }
174            }
175            TransitionFrontierSyncAction::BlocksPeerQueryInit { hash, peer_id } => {
176                let p2p = p2p_ready!(store.state().p2p, meta.time());
177                let Some(rpc_id) = p2p
178                    .get_ready_peer(peer_id)
179                    .map(|v| v.channels.next_local_rpc_id())
180                else {
181                    return;
182                };
183
184                store.dispatch(P2pChannelsRpcAction::RequestSend {
185                    peer_id: *peer_id,
186                    id: rpc_id,
187                    request: Box::new(P2pRpcRequest::Block(hash.clone())),
188                    on_init: Some(redux::callback!(
189                        on_send_p2p_block_rpc_request(
190                            (peer_id: PeerId, rpc_id: P2pRpcId, request: P2pRpcRequest)
191                        ) -> crate::Action {
192                            let P2pRpcRequest::Block(hash) = request else {
193                                unreachable!()
194                            };
195                            TransitionFrontierSyncAction::BlocksPeerQueryPending {
196                                hash,
197                                peer_id,
198                                rpc_id,
199                            }
200                        }
201                    )),
202                });
203            }
204            TransitionFrontierSyncAction::BlocksPeerQueryRetry { hash, peer_id } => {
205                let p2p = p2p_ready!(store.state().p2p, meta.time());
206                let Some(rpc_id) = p2p
207                    .get_ready_peer(peer_id)
208                    .map(|v| v.channels.next_local_rpc_id())
209                else {
210                    return;
211                };
212
213                store.dispatch(P2pChannelsRpcAction::RequestSend {
214                    peer_id: *peer_id,
215                    id: rpc_id,
216                    request: Box::new(P2pRpcRequest::Block(hash.clone())),
217                    on_init: Some(redux::callback!(
218                        on_send_p2p_block_rpc_request_retry(
219                            (peer_id: PeerId, rpc_id: P2pRpcId, request: P2pRpcRequest)
220                        ) -> crate::Action {
221                            let P2pRpcRequest::Block(hash) = request else {
222                                unreachable!()
223                            };
224                            TransitionFrontierSyncAction::BlocksPeerQueryPending {
225                                hash,
226                                peer_id,
227                                rpc_id,
228                            }
229                        }
230                    )),
231                });
232            }
233            TransitionFrontierSyncAction::BlocksPeerQueryPending { .. } => {}
234            TransitionFrontierSyncAction::BlocksPeerQueryError { .. } => {
235                store.dispatch(TransitionFrontierSyncAction::BlocksPeersQuery);
236            }
237            TransitionFrontierSyncAction::BlocksPeerQuerySuccess { response, .. } => {
238                store.dispatch(TransitionFrontierSyncAction::BlocksPeersQuery);
239                store.dispatch(TransitionFrontierSyncAction::BlocksFetchSuccess {
240                    hash: response.hash.clone(),
241                });
242            }
243            TransitionFrontierSyncAction::BlocksFetchSuccess { .. } => {
244                let _ = store;
245                store.dispatch(TransitionFrontierSyncAction::BlocksNextApplyInit {});
246            }
247            TransitionFrontierSyncAction::BlocksNextApplyInit => {
248                let Some((block, pred_block)) = store
249                    .state()
250                    .transition_frontier
251                    .sync
252                    .blocks_apply_next()
253                    .map(|v| (v.0.clone(), v.1.clone()))
254                else {
255                    return;
256                };
257                let hash = block.hash.clone();
258
259                let is_our_block;
260
261                if let Some(stats) = store.service.stats() {
262                    stats.block_producer().block_apply_start(meta.time(), &hash);
263                    // TODO(tizoc): try a better approach that doesn't need
264                    // to make use of the collected stats.
265                    is_our_block = stats.block_producer().is_our_just_produced_block(&hash);
266                } else {
267                    is_our_block = false;
268                }
269
270                // During catchup, we skip the verificationf of completed work and zkApp txn proofs
271                // until get closer to the best tip, at which point full verification is enabled.
272                // We also skip verification of completed works if we produced this block.
273                let skip_verification = is_our_block
274                    || super::CATCHUP_BLOCK_VERIFY_TAIL_LENGTH
275                        < store.state().transition_frontier.sync.pending_count();
276
277                store.dispatch(LedgerWriteAction::Init {
278                    request: LedgerWriteRequest::BlockApply {
279                        block,
280                        pred_block,
281                        skip_verification,
282                    },
283                    on_init: redux::callback!(
284                        on_block_next_apply_init(request: LedgerWriteRequest) -> crate::Action {
285                            let LedgerWriteRequest::BlockApply {
286                                block,
287                                pred_block: _,
288                                skip_verification: _,
289                            } = request
290                            else {
291                                unreachable!()
292                            };
293                            let hash = block.hash().clone();
294                            TransitionFrontierSyncAction::BlocksNextApplyPending { hash }
295                        }
296                    ),
297                });
298            }
299            TransitionFrontierSyncAction::BlocksNextApplyPending { .. } => {}
300            TransitionFrontierSyncAction::BlocksNextApplyError { hash, error } => {
301                let Some((best_tip, failed_block)) = None.or_else(|| {
302                    Some((
303                        store.state().transition_frontier.sync.best_tip()?.clone(),
304                        store
305                            .state()
306                            .transition_frontier
307                            .sync
308                            .block_state(hash)?
309                            .block()?,
310                    ))
311                }) else {
312                    return;
313                };
314                let error = SyncError::BlockApplyFailed(failed_block.clone(), error.clone());
315                store.dispatch(TransitionFrontierAction::SyncFailed { best_tip, error });
316                // TODO this should be handled by a callback
317                store.dispatch(P2pNetworkPubsubAction::RejectMessage {
318                    message_id: Some(crate::p2p::BroadcastMessageId::BlockHash {
319                        hash: hash.clone(),
320                    }),
321                    peer_id: None,
322                    reason: "Failed to apply block".to_owned(),
323                });
324            }
325            TransitionFrontierSyncAction::BlocksNextApplySuccess {
326                hash,
327                just_emitted_a_proof: _,
328            } => {
329                if let Some(stats) = store.service.stats() {
330                    stats.block_producer().block_apply_end(meta.time(), hash);
331                }
332
333                if !store.dispatch(TransitionFrontierSyncAction::BlocksNextApplyInit) {
334                    store.dispatch(TransitionFrontierSyncAction::BlocksSuccess);
335                }
336            }
337            TransitionFrontierSyncAction::BlocksSendToArchive { data, .. } => {
338                store.service().send_to_archive(data.clone());
339            }
340            TransitionFrontierSyncAction::BlocksSuccess => {}
341            // Bootstrap/Catchup is practically complete at this point.
342            // This effect is where the finalization part needs to be
343            // executed, which is mostly to grab some data that we need
344            // from previous chain, before it's discarded after dispatching
345            // `TransitionFrontierSyncedAction`.
346            TransitionFrontierSyncAction::CommitInit => {
347                let transition_frontier = &store.state.get().transition_frontier;
348                let TransitionFrontierSyncState::BlocksSuccess {
349                    chain,
350                    root_snarked_ledger_updates,
351                    needed_protocol_states,
352                    ..
353                } = &transition_frontier.sync
354                else {
355                    return;
356                };
357                let Some(new_root) = chain.first() else {
358                    return;
359                };
360                let Some(new_best_tip) = chain.last() else {
361                    return;
362                };
363                let ledgers_to_keep = chain
364                    .iter()
365                    .map(|block| &block.block)
366                    .collect::<LedgersToKeep>();
367                let mut root_snarked_ledger_updates = root_snarked_ledger_updates.clone();
368                if transition_frontier
369                    .best_chain
370                    .iter()
371                    .any(|b| b.hash() == new_root.hash())
372                {
373                    let old_chain = transition_frontier
374                        .best_chain
375                        .iter()
376                        .map(AppliedBlock::block_with_hash);
377                    root_snarked_ledger_updates
378                        .extend_with_needed(new_root.block_with_hash(), old_chain);
379                }
380
381                let needed_protocol_states = if root_snarked_ledger_updates.is_empty() {
382                    // We don't need protocol states unless we need to
383                    // recreate some snarked ledgers during `commit`.
384                    Default::default()
385                } else {
386                    needed_protocol_states
387                        .iter()
388                        .chain(&transition_frontier.needed_protocol_states)
389                        .map(|(k, v)| (k.clone(), v.clone()))
390                        .collect()
391                };
392
393                store.dispatch(LedgerWriteAction::Init {
394                    request: LedgerWriteRequest::Commit {
395                        ledgers_to_keep,
396                        root_snarked_ledger_updates,
397                        needed_protocol_states,
398                        new_root: new_root.clone(),
399                        new_best_tip: new_best_tip.clone(),
400                    },
401                    on_init: redux::callback!(
402                        on_frontier_commit_init(_request: LedgerWriteRequest) -> crate::Action {
403                            TransitionFrontierSyncAction::CommitPending
404                        }
405                    ),
406                });
407            }
408            TransitionFrontierSyncAction::CommitPending => {}
409            TransitionFrontierSyncAction::CommitSuccess { .. } => {
410                unreachable!("handled in parent effects to avoid cloning")
411            }
412            TransitionFrontierSyncAction::Ledger(_) => {}
413        }
414    }
415}
416
417// Helper functions
418
419/// Gets from the current state the best tip sync target
420fn sync_best_tip(state: &crate::State) -> ArcBlockWithHash {
421    state.transition_frontier.sync.best_tip().unwrap().clone()
422}
423
424/// For snarked ledger sync targets, copy the previous snarked ledger if required
425fn maybe_copy_ledgers_for_sync<S>(
426    store: &mut Store<S>,
427    previous_root_snarked_ledger_hash: Option<LedgerHash>,
428    best_tip: &ArcBlockWithHash,
429) -> Result<bool, String>
430where
431    S: TransitionFrontierSyncLedgerSnarkedService,
432{
433    let sync = &store.state().transition_frontier.sync;
434
435    match sync {
436        TransitionFrontierSyncState::StakingLedgerPending(_) => {
437            prepare_staking_epoch_ledger_for_sync(store, best_tip)
438        }
439        TransitionFrontierSyncState::NextEpochLedgerPending(_) => {
440            prepare_next_epoch_ledger_for_sync(store, best_tip)
441        }
442
443        TransitionFrontierSyncState::RootLedgerPending(_) => {
444            prepare_transition_frontier_root_ledger_for_sync(
445                store,
446                previous_root_snarked_ledger_hash,
447                best_tip,
448            )
449        }
450        _ => Ok(true),
451    }
452}
453
454/// Copies (if necessary) the genesis ledger into the sync ledger state
455/// for the staking epoch ledger to use as a starting point.
456fn prepare_staking_epoch_ledger_for_sync<S>(
457    store: &mut Store<S>,
458    best_tip: &ArcBlockWithHash,
459) -> Result<bool, String>
460where
461    S: TransitionFrontierSyncLedgerSnarkedService,
462{
463    let target = SyncLedgerTarget::staking_epoch(best_tip).snarked_ledger_hash;
464    let origin = best_tip.genesis_ledger_hash().clone();
465
466    store
467        .service()
468        .copy_snarked_ledger_contents_for_sync(vec![origin], target, false)
469}
470
471/// Copies (if necessary) the staking ledger into the sync ledger state
472/// for the next epoch ledger to use as a starting point.
473fn prepare_next_epoch_ledger_for_sync<S>(
474    store: &mut Store<S>,
475    best_tip: &ArcBlockWithHash,
476) -> Result<bool, String>
477where
478    S: TransitionFrontierSyncLedgerSnarkedService,
479{
480    let sync = &store.state().transition_frontier.sync;
481    let root_block = sync.root_block().unwrap();
482    let Some(next_epoch_sync) = SyncLedgerTarget::next_epoch(best_tip, root_block) else {
483        return Ok(false);
484    };
485    let target = next_epoch_sync.snarked_ledger_hash;
486    let origin = SyncLedgerTarget::staking_epoch(best_tip).snarked_ledger_hash;
487
488    store
489        .service()
490        .copy_snarked_ledger_contents_for_sync(vec![origin], target, false)
491}
492
493/// Copies (if necessary) the next epoch ledger into the sync ledger state
494/// for the transition frontier root ledger to use as a starting point.
495fn prepare_transition_frontier_root_ledger_for_sync<S>(
496    store: &mut Store<S>,
497    previous_root_snarked_ledger_hash: Option<LedgerHash>,
498    best_tip: &ArcBlockWithHash,
499) -> Result<bool, String>
500where
501    S: TransitionFrontierSyncLedgerSnarkedService,
502{
503    let sync = &store.state().transition_frontier.sync;
504    let root_block = sync
505        .root_block()
506        .expect("Sync root block cannot be missing");
507
508    // Attempt in order: previous root, next epoch ledger, staking ledger
509    let mut candidate_origins: Vec<LedgerHash> =
510        previous_root_snarked_ledger_hash.into_iter().collect();
511    if let Some(next_epoch) = SyncLedgerTarget::next_epoch(best_tip, root_block) {
512        candidate_origins.push(next_epoch.snarked_ledger_hash.clone());
513    }
514    candidate_origins.push(
515        SyncLedgerTarget::staking_epoch(best_tip)
516            .snarked_ledger_hash
517            .clone(),
518    );
519
520    let target = root_block.snarked_ledger_hash().clone();
521
522    store
523        .service()
524        .copy_snarked_ledger_contents_for_sync(candidate_origins, target, false)
525}