node/transition_frontier/sync/
transition_frontier_sync_effects.rs

1use mina_p2p_messages::v2::LedgerHash;
2use openmina_core::block::{AppliedBlock, ArcBlockWithHash};
3use p2p::{
4    channels::rpc::{P2pChannelsRpcAction, P2pRpcId},
5    P2pNetworkPubsubAction, PeerId,
6};
7use redux::ActionMeta;
8
9use crate::{
10    ledger::write::{LedgerWriteAction, LedgerWriteRequest, LedgersToKeep},
11    p2p::channels::rpc::P2pRpcRequest,
12    p2p_ready,
13    service::TransitionFrontierSyncLedgerSnarkedService,
14    Service, Store, TransitionFrontierAction,
15};
16
17use super::{
18    ledger::{
19        snarked::TransitionFrontierSyncLedgerSnarkedAction,
20        staged::TransitionFrontierSyncLedgerStagedAction, SyncLedgerTarget,
21        TransitionFrontierSyncLedgerAction,
22    },
23    SyncError, TransitionFrontierSyncAction, TransitionFrontierSyncState,
24};
25
26impl TransitionFrontierSyncAction {
27    pub fn effects<S>(&self, meta: &ActionMeta, store: &mut Store<S>)
28    where
29        S: Service,
30    {
31        match self {
32            TransitionFrontierSyncAction::Init { best_tip, .. } => {
33                let protocol_state_body = &best_tip.block.header.protocol_state.body;
34                let genesis_ledger_hash = &protocol_state_body.blockchain_state.genesis_ledger_hash;
35                let staking_epoch_ledger_hash = &protocol_state_body
36                    .consensus_state
37                    .staking_epoch_data
38                    .ledger
39                    .hash;
40                let next_epoch_ledger_hash = &protocol_state_body
41                    .consensus_state
42                    .next_epoch_data
43                    .ledger
44                    .hash;
45
46                // TODO(tizoc): if root ledger matches genesis, should anything special be done?
47                // snarked ledger will not need to be synced but staged ledger parts are still
48                // required
49
50                if genesis_ledger_hash != staking_epoch_ledger_hash {
51                    store.dispatch(TransitionFrontierSyncAction::LedgerStakingPending);
52                } else if genesis_ledger_hash != next_epoch_ledger_hash {
53                    store.dispatch(TransitionFrontierSyncAction::LedgerNextEpochPending);
54                } else {
55                    store.dispatch(TransitionFrontierSyncAction::LedgerRootPending);
56                }
57            }
58            TransitionFrontierSyncAction::BestTipUpdate {
59                previous_root_snarked_ledger_hash,
60                best_tip,
61                on_success,
62                ..
63            } => {
64                // TODO(tizoc): this is currently required because how how complicated the BestTipUpdate reducer is,
65                // once that is simplified this should be handled in separate actions.
66                maybe_copy_ledgers_for_sync(
67                    store,
68                    previous_root_snarked_ledger_hash.clone(),
69                    best_tip,
70                )
71                .unwrap();
72
73                // if root snarked ledger changed.
74                store.dispatch(TransitionFrontierSyncLedgerAction::Init);
75                // if root snarked ledger stayed same but root block changed
76                // while reconstructing staged ledger.
77                store.dispatch(TransitionFrontierSyncLedgerStagedAction::PartsFetchPending);
78                store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery);
79                // if we don't need to sync root staged ledger.
80                store.dispatch(TransitionFrontierSyncAction::BlocksPeersQuery);
81                // if we already have a block ready to be applied.
82                store.dispatch(TransitionFrontierSyncAction::BlocksNextApplyInit);
83
84                // TODO(binier): cleanup ledgers
85                if let Some(callback) = on_success {
86                    store.dispatch_callback(callback.clone(), ());
87                }
88            }
89            // TODO(tizoc): this action is never called with the current implementation,
90            // either remove it or figure out how to recover it as a reaction to
91            // `BestTipUpdate` above. Currently this logic is handled by
92            // `maybe_copy_ledgers_for_sync` at the end of this file.
93            // Same kind of applies to `LedgerNextEpochPending` and `LedgerRootPending`
94            // in some cases, but issue is mostly about `LedgerStakingPending` because
95            // it is the one most likely to be affected by the first `BestTipUpdate`
96            // action processed by the state machine.
97            TransitionFrontierSyncAction::LedgerStakingPending => {
98                prepare_staking_epoch_ledger_for_sync(store, &sync_best_tip(store.state()))
99                    .unwrap();
100
101                store.dispatch(TransitionFrontierSyncLedgerAction::Init);
102            }
103            TransitionFrontierSyncAction::LedgerStakingSuccess => {
104                if store.dispatch(TransitionFrontierSyncAction::LedgerNextEpochPending) {
105                } else if store.dispatch(TransitionFrontierSyncAction::LedgerRootPending) {
106                }
107            }
108            TransitionFrontierSyncAction::LedgerNextEpochPending => {
109                prepare_next_epoch_ledger_for_sync(store, &sync_best_tip(store.state())).unwrap();
110
111                store.dispatch(TransitionFrontierSyncLedgerAction::Init);
112            }
113            TransitionFrontierSyncAction::LedgerNextEpochSuccess => {
114                store.dispatch(TransitionFrontierSyncAction::LedgerRootPending);
115            }
116            TransitionFrontierSyncAction::LedgerRootPending => {
117                prepare_transition_frontier_root_ledger_for_sync(
118                    store,
119                    None,
120                    &sync_best_tip(store.state()),
121                )
122                .unwrap();
123
124                store.dispatch(TransitionFrontierSyncLedgerAction::Init);
125            }
126            TransitionFrontierSyncAction::LedgerRootSuccess => {
127                store.dispatch(TransitionFrontierSyncAction::BlocksPending);
128            }
129            TransitionFrontierSyncAction::BlocksPending => {
130                if !store.dispatch(TransitionFrontierSyncAction::BlocksSuccess) {
131                    store.dispatch(TransitionFrontierSyncAction::BlocksPeersQuery);
132                }
133            }
134            TransitionFrontierSyncAction::BlocksPeersQuery => {
135                let p2p = p2p_ready!(store.state().p2p, meta.time());
136                // TODO(binier): make sure they have the ledger we want to query.
137                let mut peer_ids = p2p
138                    .ready_peers_iter()
139                    .filter(|(_, p)| p.channels.rpc.can_send_request())
140                    .map(|(id, p)| (*id, p.connected_since))
141                    .collect::<Vec<_>>();
142                peer_ids.sort_by(|(_, t1), (_, t2)| t2.cmp(t1));
143
144                let mut retry_hashes = store
145                    .state()
146                    .transition_frontier
147                    .sync
148                    .blocks_fetch_retry_iter()
149                    .collect::<Vec<_>>();
150                retry_hashes.reverse();
151
152                for (peer_id, _) in peer_ids {
153                    if let Some(hash) = retry_hashes.last() {
154                        if store.dispatch(TransitionFrontierSyncAction::BlocksPeerQueryRetry {
155                            peer_id,
156                            hash: hash.clone(),
157                        }) {
158                            retry_hashes.pop();
159                            continue;
160                        }
161                    }
162
163                    match store.state().transition_frontier.sync.blocks_fetch_next() {
164                        Some(hash) => {
165                            store.dispatch(TransitionFrontierSyncAction::BlocksPeerQueryInit {
166                                peer_id,
167                                hash,
168                            });
169                        }
170                        None if retry_hashes.is_empty() => break,
171                        None => {}
172                    }
173                }
174            }
175            TransitionFrontierSyncAction::BlocksPeerQueryInit { hash, peer_id } => {
176                let p2p = p2p_ready!(store.state().p2p, meta.time());
177                let Some(rpc_id) = p2p
178                    .get_ready_peer(peer_id)
179                    .map(|v| v.channels.next_local_rpc_id())
180                else {
181                    return;
182                };
183
184                store.dispatch(P2pChannelsRpcAction::RequestSend {
185                    peer_id: *peer_id,
186                    id: rpc_id,
187                    request: Box::new(P2pRpcRequest::Block(hash.clone())),
188                    on_init: Some(redux::callback!(
189                        on_send_p2p_block_rpc_request(
190                            (peer_id: PeerId, rpc_id: P2pRpcId, request: P2pRpcRequest)
191                        ) -> crate::Action {
192                            let P2pRpcRequest::Block(hash) = request else {
193                                unreachable!()
194                            };
195                            TransitionFrontierSyncAction::BlocksPeerQueryPending {
196                                hash,
197                                peer_id,
198                                rpc_id,
199                            }
200                        }
201                    )),
202                });
203            }
204            TransitionFrontierSyncAction::BlocksPeerQueryRetry { hash, peer_id } => {
205                let p2p = p2p_ready!(store.state().p2p, meta.time());
206                let Some(rpc_id) = p2p
207                    .get_ready_peer(peer_id)
208                    .map(|v| v.channels.next_local_rpc_id())
209                else {
210                    return;
211                };
212
213                store.dispatch(P2pChannelsRpcAction::RequestSend {
214                    peer_id: *peer_id,
215                    id: rpc_id,
216                    request: Box::new(P2pRpcRequest::Block(hash.clone())),
217                    on_init: Some(redux::callback!(
218                        on_send_p2p_block_rpc_request_retry(
219                            (peer_id: PeerId, rpc_id: P2pRpcId, request: P2pRpcRequest)
220                        ) -> crate::Action {
221                            let P2pRpcRequest::Block(hash) = request else {
222                                unreachable!()
223                            };
224                            TransitionFrontierSyncAction::BlocksPeerQueryPending {
225                                hash,
226                                peer_id,
227                                rpc_id,
228                            }
229                        }
230                    )),
231                });
232            }
233            TransitionFrontierSyncAction::BlocksPeerQueryPending { .. } => {}
234            TransitionFrontierSyncAction::BlocksPeerQueryError { .. } => {
235                store.dispatch(TransitionFrontierSyncAction::BlocksPeersQuery);
236            }
237            TransitionFrontierSyncAction::BlocksPeerQuerySuccess { response, .. } => {
238                store.dispatch(TransitionFrontierSyncAction::BlocksPeersQuery);
239                store.dispatch(TransitionFrontierSyncAction::BlocksFetchSuccess {
240                    hash: response.hash.clone(),
241                });
242            }
243            TransitionFrontierSyncAction::BlocksFetchSuccess { .. } => {
244                let _ = store;
245                store.dispatch(TransitionFrontierSyncAction::BlocksNextApplyInit {});
246            }
247            TransitionFrontierSyncAction::BlocksNextApplyInit => {
248                let Some((block, pred_block)) = store
249                    .state()
250                    .transition_frontier
251                    .sync
252                    .blocks_apply_next()
253                    .map(|v| (v.0.clone(), v.1.clone()))
254                else {
255                    return;
256                };
257                let hash = block.hash.clone();
258
259                let is_our_block;
260
261                if let Some(stats) = store.service.stats() {
262                    stats.block_producer().block_apply_start(meta.time(), &hash);
263                    // TODO(tizoc): try a better approach that doesn't need
264                    // to make use of the collected stats.
265                    is_our_block = stats.block_producer().is_our_just_produced_block(&hash);
266                } else {
267                    is_our_block = false;
268                }
269
270                // During catchup, we skip the verificationf of completed work and zkApp txn proofs
271                // until get closer to the best tip, at which point full verification is enabled.
272                // We also skip verification of completed works if we produced this block.
273                let skip_verification = is_our_block
274                    || super::CATCHUP_BLOCK_VERIFY_TAIL_LENGTH
275                        < store.state().transition_frontier.sync.pending_count();
276
277                store.dispatch(LedgerWriteAction::Init {
278                    request: LedgerWriteRequest::BlockApply {
279                        block,
280                        pred_block,
281                        skip_verification,
282                    },
283                    on_init: redux::callback!(
284                        on_block_next_apply_init(request: LedgerWriteRequest) -> crate::Action {
285                            let LedgerWriteRequest::BlockApply {
286                                block,
287                                pred_block: _,
288                                skip_verification: _,
289                            } = request
290                            else {
291                                unreachable!()
292                            };
293                            let hash = block.hash().clone();
294                            TransitionFrontierSyncAction::BlocksNextApplyPending { hash }
295                        }
296                    ),
297                });
298            }
299            TransitionFrontierSyncAction::BlocksNextApplyPending { .. } => {}
300            TransitionFrontierSyncAction::BlocksNextApplyError { hash, error } => {
301                let Some((best_tip, failed_block)) = None.or_else(|| {
302                    Some((
303                        store.state().transition_frontier.sync.best_tip()?.clone(),
304                        store
305                            .state()
306                            .transition_frontier
307                            .sync
308                            .block_state(hash)?
309                            .block()?,
310                    ))
311                }) else {
312                    return;
313                };
314                let error = SyncError::BlockApplyFailed(failed_block.clone(), error.clone());
315                store.dispatch(TransitionFrontierAction::SyncFailed { best_tip, error });
316                // TODO this should be handled by a callback
317                store.dispatch(P2pNetworkPubsubAction::RejectMessage {
318                    message_id: Some(p2p::BroadcastMessageId::BlockHash { hash: hash.clone() }),
319                    peer_id: None,
320                    reason: "Failed to apply block".to_owned(),
321                });
322            }
323            TransitionFrontierSyncAction::BlocksNextApplySuccess {
324                hash,
325                just_emitted_a_proof: _,
326            } => {
327                if let Some(stats) = store.service.stats() {
328                    stats.block_producer().block_apply_end(meta.time(), hash);
329                }
330
331                if !store.dispatch(TransitionFrontierSyncAction::BlocksNextApplyInit) {
332                    store.dispatch(TransitionFrontierSyncAction::BlocksSuccess);
333                }
334            }
335            TransitionFrontierSyncAction::BlocksSendToArchive { data, .. } => {
336                store.service().send_to_archive(data.clone());
337            }
338            TransitionFrontierSyncAction::BlocksSuccess => {}
339            // Bootstrap/Catchup is practically complete at this point.
340            // This effect is where the finalization part needs to be
341            // executed, which is mostly to grab some data that we need
342            // from previous chain, before it's discarded after dispatching
343            // `TransitionFrontierSyncedAction`.
344            TransitionFrontierSyncAction::CommitInit => {
345                let transition_frontier = &store.state.get().transition_frontier;
346                let TransitionFrontierSyncState::BlocksSuccess {
347                    chain,
348                    root_snarked_ledger_updates,
349                    needed_protocol_states,
350                    ..
351                } = &transition_frontier.sync
352                else {
353                    return;
354                };
355                let Some(new_root) = chain.first() else {
356                    return;
357                };
358                let Some(new_best_tip) = chain.last() else {
359                    return;
360                };
361                let ledgers_to_keep = chain
362                    .iter()
363                    .map(|block| &block.block)
364                    .collect::<LedgersToKeep>();
365                let mut root_snarked_ledger_updates = root_snarked_ledger_updates.clone();
366                if transition_frontier
367                    .best_chain
368                    .iter()
369                    .any(|b| b.hash() == new_root.hash())
370                {
371                    let old_chain = transition_frontier
372                        .best_chain
373                        .iter()
374                        .map(AppliedBlock::block_with_hash);
375                    root_snarked_ledger_updates
376                        .extend_with_needed(new_root.block_with_hash(), old_chain);
377                }
378
379                let needed_protocol_states = if root_snarked_ledger_updates.is_empty() {
380                    // We don't need protocol states unless we need to
381                    // recreate some snarked ledgers during `commit`.
382                    Default::default()
383                } else {
384                    needed_protocol_states
385                        .iter()
386                        .chain(&transition_frontier.needed_protocol_states)
387                        .map(|(k, v)| (k.clone(), v.clone()))
388                        .collect()
389                };
390
391                store.dispatch(LedgerWriteAction::Init {
392                    request: LedgerWriteRequest::Commit {
393                        ledgers_to_keep,
394                        root_snarked_ledger_updates,
395                        needed_protocol_states,
396                        new_root: new_root.clone(),
397                        new_best_tip: new_best_tip.clone(),
398                    },
399                    on_init: redux::callback!(
400                        on_frontier_commit_init(_request: LedgerWriteRequest) -> crate::Action {
401                            TransitionFrontierSyncAction::CommitPending
402                        }
403                    ),
404                });
405            }
406            TransitionFrontierSyncAction::CommitPending => {}
407            TransitionFrontierSyncAction::CommitSuccess { .. } => {
408                unreachable!("handled in parent effects to avoid cloning")
409            }
410            TransitionFrontierSyncAction::Ledger(_) => {}
411        }
412    }
413}
414
415// Helper functions
416
417/// Gets from the current state the best tip sync target
418fn sync_best_tip(state: &crate::State) -> ArcBlockWithHash {
419    state.transition_frontier.sync.best_tip().unwrap().clone()
420}
421
422/// For snarked ledger sync targets, copy the previous snarked ledger if required
423fn maybe_copy_ledgers_for_sync<S>(
424    store: &mut Store<S>,
425    previous_root_snarked_ledger_hash: Option<LedgerHash>,
426    best_tip: &ArcBlockWithHash,
427) -> Result<bool, String>
428where
429    S: TransitionFrontierSyncLedgerSnarkedService,
430{
431    let sync = &store.state().transition_frontier.sync;
432
433    match sync {
434        TransitionFrontierSyncState::StakingLedgerPending(_) => {
435            prepare_staking_epoch_ledger_for_sync(store, best_tip)
436        }
437        TransitionFrontierSyncState::NextEpochLedgerPending(_) => {
438            prepare_next_epoch_ledger_for_sync(store, best_tip)
439        }
440
441        TransitionFrontierSyncState::RootLedgerPending(_) => {
442            prepare_transition_frontier_root_ledger_for_sync(
443                store,
444                previous_root_snarked_ledger_hash,
445                best_tip,
446            )
447        }
448        _ => Ok(true),
449    }
450}
451
452/// Copies (if necessary) the genesis ledger into the sync ledger state
453/// for the staking epoch ledger to use as a starting point.
454fn prepare_staking_epoch_ledger_for_sync<S>(
455    store: &mut Store<S>,
456    best_tip: &ArcBlockWithHash,
457) -> Result<bool, String>
458where
459    S: TransitionFrontierSyncLedgerSnarkedService,
460{
461    let target = SyncLedgerTarget::staking_epoch(best_tip).snarked_ledger_hash;
462    let origin = best_tip.genesis_ledger_hash().clone();
463
464    store
465        .service()
466        .copy_snarked_ledger_contents_for_sync(vec![origin], target, false)
467}
468
469/// Copies (if necessary) the staking ledger into the sync ledger state
470/// for the next epoch ledger to use as a starting point.
471fn prepare_next_epoch_ledger_for_sync<S>(
472    store: &mut Store<S>,
473    best_tip: &ArcBlockWithHash,
474) -> Result<bool, String>
475where
476    S: TransitionFrontierSyncLedgerSnarkedService,
477{
478    let sync = &store.state().transition_frontier.sync;
479    let root_block = sync.root_block().unwrap();
480    let Some(next_epoch_sync) = SyncLedgerTarget::next_epoch(best_tip, root_block) else {
481        return Ok(false);
482    };
483    let target = next_epoch_sync.snarked_ledger_hash;
484    let origin = SyncLedgerTarget::staking_epoch(best_tip).snarked_ledger_hash;
485
486    store
487        .service()
488        .copy_snarked_ledger_contents_for_sync(vec![origin], target, false)
489}
490
491/// Copies (if necessary) the next epoch ledger into the sync ledger state
492/// for the transition frontier root ledger to use as a starting point.
493fn prepare_transition_frontier_root_ledger_for_sync<S>(
494    store: &mut Store<S>,
495    previous_root_snarked_ledger_hash: Option<LedgerHash>,
496    best_tip: &ArcBlockWithHash,
497) -> Result<bool, String>
498where
499    S: TransitionFrontierSyncLedgerSnarkedService,
500{
501    let sync = &store.state().transition_frontier.sync;
502    let root_block = sync
503        .root_block()
504        .expect("Sync root block cannot be missing");
505
506    // Attempt in order: previous root, next epoch ledger, staking ledger
507    let mut candidate_origins: Vec<LedgerHash> =
508        previous_root_snarked_ledger_hash.into_iter().collect();
509    if let Some(next_epoch) = SyncLedgerTarget::next_epoch(best_tip, root_block) {
510        candidate_origins.push(next_epoch.snarked_ledger_hash.clone());
511    }
512    candidate_origins.push(
513        SyncLedgerTarget::staking_epoch(best_tip)
514            .snarked_ledger_hash
515            .clone(),
516    );
517
518    let target = root_block.snarked_ledger_hash().clone();
519
520    store
521        .service()
522        .copy_snarked_ledger_contents_for_sync(candidate_origins, target, false)
523}