node/transition_frontier/sync/
transition_frontier_sync_state.rs

1use std::collections::BTreeMap;
2
3use mina_p2p_messages::v2::{self, LedgerHash, MinaStateProtocolStateValueStableV2, StateHash};
4use openmina_core::block::{AppliedBlock, ArcBlockWithHash};
5use redux::Timestamp;
6use serde::{Deserialize, Serialize};
7use strum_macros::Display;
8
9use crate::p2p::{channels::rpc::P2pRpcId, PeerId};
10
11use super::{
12    ledger::{SyncLedgerTarget, SyncLedgerTargetKind, TransitionFrontierSyncLedgerState},
13    PeerBlockFetchError,
14};
15
16#[derive(Serialize, Deserialize, Display, Debug, Clone)]
17pub enum TransitionFrontierSyncState {
18    Idle,
19    Init {
20        time: Timestamp,
21        best_tip: ArcBlockWithHash,
22        root_block: ArcBlockWithHash,
23        blocks_inbetween: Vec<StateHash>,
24    },
25    StakingLedgerPending(TransitionFrontierSyncLedgerPending),
26    StakingLedgerSuccess {
27        time: Timestamp,
28        best_tip: ArcBlockWithHash,
29        root_block: ArcBlockWithHash,
30        blocks_inbetween: Vec<StateHash>,
31        needed_protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
32    },
33    NextEpochLedgerPending(TransitionFrontierSyncLedgerPending),
34    NextEpochLedgerSuccess {
35        time: Timestamp,
36        best_tip: ArcBlockWithHash,
37        root_block: ArcBlockWithHash,
38        blocks_inbetween: Vec<StateHash>,
39        needed_protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
40    },
41    RootLedgerPending(TransitionFrontierSyncLedgerPending),
42    RootLedgerSuccess {
43        time: Timestamp,
44        best_tip: ArcBlockWithHash,
45        root_block: ArcBlockWithHash,
46        blocks_inbetween: Vec<StateHash>,
47        root_block_updates: Vec<ArcBlockWithHash>,
48        needed_protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
49    },
50    BlocksPending {
51        time: Timestamp,
52        chain: Vec<TransitionFrontierSyncBlockState>,
53        /// Snarked ledger updates/transitions that happened while we
54        /// were synchronizing blocks. If those updates do happen, we
55        /// need to create those snarked ledgers from the closest snarked
56        /// ledger that we have in the service already synchronized.
57        ///
58        /// Contains a map where the `key` is the new snarked ledger and
59        /// the `value` is more info required to construct that ledger.
60        root_snarked_ledger_updates: TransitionFrontierRootSnarkedLedgerUpdates,
61        needed_protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
62    },
63    BlocksSuccess {
64        time: Timestamp,
65        chain: Vec<AppliedBlock>,
66        root_snarked_ledger_updates: TransitionFrontierRootSnarkedLedgerUpdates,
67        needed_protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
68    },
69    CommitPending {
70        time: Timestamp,
71        chain: Vec<AppliedBlock>,
72        root_snarked_ledger_updates: TransitionFrontierRootSnarkedLedgerUpdates,
73        needed_protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
74    },
75    CommitSuccess {
76        time: Timestamp,
77        chain: Vec<AppliedBlock>,
78        root_snarked_ledger_updates: TransitionFrontierRootSnarkedLedgerUpdates,
79        needed_protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
80    },
81    Synced {
82        time: Timestamp,
83    },
84}
85
86#[derive(Serialize, Deserialize, Debug, Clone)]
87pub struct TransitionFrontierSyncLedgerPending {
88    pub time: Timestamp,
89    pub best_tip: ArcBlockWithHash,
90    pub root_block: ArcBlockWithHash,
91    pub blocks_inbetween: Vec<StateHash>,
92    pub root_block_updates: Vec<ArcBlockWithHash>,
93    pub ledger: TransitionFrontierSyncLedgerState,
94}
95
96#[derive(Serialize, Deserialize, Debug, Default, Clone)]
97pub struct TransitionFrontierRootSnarkedLedgerUpdates(
98    BTreeMap<LedgerHash, TransitionFrontierRootSnarkedLedgerUpdate>,
99);
100
101#[derive(Serialize, Deserialize, Debug, Clone)]
102pub struct TransitionFrontierRootSnarkedLedgerUpdate {
103    pub parent: LedgerHash,
104    /// Staged ledger hash of the applied block, that had the same snarked
105    /// ledger as the target. From that staged ledger we can fetch
106    /// transactions that we need to apply on top of `parent` in order
107    /// to construct target snarked ledger.
108    pub staged_ledger_hash: v2::MinaBaseStagedLedgerHashStableV1,
109}
110
111#[derive(Serialize, Deserialize, Debug, Clone)]
112pub enum TransitionFrontierSyncBlockState {
113    FetchPending {
114        time: Timestamp,
115        block_hash: StateHash,
116        attempts: BTreeMap<PeerId, PeerRpcState>,
117    },
118    FetchSuccess {
119        time: Timestamp,
120        block: ArcBlockWithHash,
121    },
122    ApplyPending {
123        time: Timestamp,
124        block: ArcBlockWithHash,
125    },
126    ApplyError {
127        time: Timestamp,
128        block: ArcBlockWithHash,
129        error: String,
130    },
131    ApplySuccess {
132        time: Timestamp,
133        block: AppliedBlock,
134    },
135}
136
137#[derive(Serialize, Deserialize, Debug, Clone)]
138pub enum PeerRpcState {
139    Init {
140        time: Timestamp,
141    },
142    Pending {
143        time: Timestamp,
144        rpc_id: P2pRpcId,
145    },
146    Error {
147        time: Timestamp,
148        rpc_id: P2pRpcId,
149        error: PeerBlockFetchError,
150    },
151    Success {
152        time: Timestamp,
153        block: ArcBlockWithHash,
154    },
155}
156
157#[derive(Serialize, Deserialize, Display, Debug, Clone)]
158pub enum SyncPhase {
159    Bootstrap,
160    Catchup,
161    Synced,
162}
163
164impl TransitionFrontierSyncState {
165    /// If the synchronization process has started but is not yet complete
166    pub fn is_pending(&self) -> bool {
167        !matches!(self, Self::Idle | Self::Synced { .. })
168    }
169
170    pub fn is_commit_pending(&self) -> bool {
171        matches!(self, Self::CommitPending { .. })
172    }
173
174    /// If the synchronization process is complete
175    pub fn is_synced(&self) -> bool {
176        matches!(self, Self::Synced { .. })
177    }
178
179    pub fn time(&self) -> Option<redux::Timestamp> {
180        match self {
181            Self::Idle => None,
182            Self::Init { time, .. } => Some(*time),
183            Self::StakingLedgerPending(s) => Some(s.time),
184            Self::StakingLedgerSuccess { time, .. } => Some(*time),
185            Self::NextEpochLedgerPending(s) => Some(s.time),
186            Self::NextEpochLedgerSuccess { time, .. } => Some(*time),
187            Self::RootLedgerPending(s) => Some(s.time),
188            Self::RootLedgerSuccess { time, .. } => Some(*time),
189            Self::BlocksPending { time, .. } => Some(*time),
190            Self::BlocksSuccess { time, .. } => Some(*time),
191            Self::CommitPending { time, .. } => Some(*time),
192            Self::CommitSuccess { time, .. } => Some(*time),
193            Self::Synced { time, .. } => Some(*time),
194        }
195    }
196
197    pub fn root_block(&self) -> Option<&ArcBlockWithHash> {
198        match self {
199            Self::Idle => None,
200            Self::Init { root_block, .. } => Some(root_block),
201            Self::StakingLedgerPending(s) => Some(&s.root_block),
202            Self::StakingLedgerSuccess { root_block, .. } => Some(root_block),
203            Self::NextEpochLedgerPending(s) => Some(&s.root_block),
204            Self::NextEpochLedgerSuccess { root_block, .. } => Some(root_block),
205            Self::RootLedgerPending(s) => Some(&s.root_block),
206            Self::RootLedgerSuccess { root_block, .. } => Some(root_block),
207            Self::BlocksPending { chain, .. } => chain.first().and_then(|b| b.block()),
208            Self::BlocksSuccess { chain, .. } => chain.first().map(AppliedBlock::block_with_hash),
209            Self::CommitPending { chain, .. } => chain.first().map(AppliedBlock::block_with_hash),
210            Self::CommitSuccess { chain, .. } => chain.first().map(AppliedBlock::block_with_hash),
211            Self::Synced { .. } => None,
212        }
213    }
214
215    pub fn best_tip(&self) -> Option<&ArcBlockWithHash> {
216        match self {
217            Self::Idle => None,
218            Self::Init { best_tip, .. } => Some(best_tip),
219            Self::StakingLedgerPending(s) => Some(&s.best_tip),
220            Self::StakingLedgerSuccess { best_tip, .. } => Some(best_tip),
221            Self::NextEpochLedgerPending(s) => Some(&s.best_tip),
222            Self::NextEpochLedgerSuccess { best_tip, .. } => Some(best_tip),
223            Self::RootLedgerPending(s) => Some(&s.best_tip),
224            Self::RootLedgerSuccess { best_tip, .. } => Some(best_tip),
225            Self::BlocksPending { chain, .. } => chain.last().and_then(|b| b.block()),
226            Self::BlocksSuccess { chain, .. } => chain.last().map(AppliedBlock::block_with_hash),
227            Self::CommitPending { chain, .. } => chain.last().map(AppliedBlock::block_with_hash),
228            Self::CommitSuccess { chain, .. } => chain.last().map(AppliedBlock::block_with_hash),
229            Self::Synced { .. } => None,
230        }
231    }
232
233    pub fn ledger(&self) -> Option<&TransitionFrontierSyncLedgerState> {
234        match self {
235            Self::StakingLedgerPending(s) => Some(&s.ledger),
236            Self::NextEpochLedgerPending(s) => Some(&s.ledger),
237            Self::RootLedgerPending(s) => Some(&s.ledger),
238            _ => None,
239        }
240    }
241
242    pub fn ledger_mut(&mut self) -> Option<&mut TransitionFrontierSyncLedgerState> {
243        match self {
244            Self::StakingLedgerPending(s) => Some(&mut s.ledger),
245            Self::NextEpochLedgerPending(s) => Some(&mut s.ledger),
246            Self::RootLedgerPending(s) => Some(&mut s.ledger),
247            _ => None,
248        }
249    }
250
251    pub fn ledger_target(&self) -> Option<SyncLedgerTarget> {
252        self.ledger().map(|s| s.target())
253    }
254
255    pub fn ledger_target_kind(&self) -> Option<SyncLedgerTargetKind> {
256        self.ledger().map(|s| s.target_kind())
257    }
258
259    /// True if the synchronization of the target ledger is complete.
260    ///
261    /// Epoch ledgers only require the snarked ledger to be synchronized,
262    /// but the ledger at the root of the transition frontier also requires
263    /// the staging ledger to be synchronized.
264    pub fn is_ledger_sync_complete(&self) -> bool {
265        match self {
266            Self::StakingLedgerPending(s) => s.ledger.is_snarked_ledger_synced(),
267            Self::NextEpochLedgerPending(s) => s.ledger.is_snarked_ledger_synced(),
268            Self::RootLedgerPending(s) => s.ledger.staged().is_some_and(|s| s.is_success()),
269            _ => false,
270        }
271    }
272
273    pub fn blocks_iter(&self) -> impl Iterator<Item = &TransitionFrontierSyncBlockState> {
274        static EMPTY: Vec<TransitionFrontierSyncBlockState> = Vec::new();
275        match self {
276            Self::BlocksPending { chain, .. } => chain.iter(),
277            _ => EMPTY.iter(),
278        }
279    }
280
281    pub fn pending_count(&self) -> usize {
282        self.blocks_iter()
283            .filter(|b| !matches!(b, TransitionFrontierSyncBlockState::ApplySuccess { .. }))
284            .count()
285    }
286
287    pub fn blocks_fetch_retry_iter(&self) -> impl '_ + Iterator<Item = StateHash> {
288        self.blocks_iter().filter_map(|s| s.retry_hash()).cloned()
289    }
290
291    pub fn blocks_fetch_next(&self) -> Option<StateHash> {
292        self.blocks_iter().find_map(|s| match s {
293            TransitionFrontierSyncBlockState::FetchPending {
294                block_hash,
295                attempts,
296                ..
297            } => Some(block_hash).filter(|_| attempts.is_empty()).cloned(),
298            _ => None,
299        })
300    }
301
302    pub fn block_state(&self, hash: &StateHash) -> Option<&TransitionFrontierSyncBlockState> {
303        self.blocks_iter().find(|s| s.block_hash() == hash)
304    }
305
306    pub fn block_state_mut(
307        &mut self,
308        hash: &StateHash,
309    ) -> Option<&mut TransitionFrontierSyncBlockState> {
310        match self {
311            Self::BlocksPending { chain, .. } => chain.iter_mut().find(|s| s.block_hash() == hash),
312            _ => None,
313        }
314    }
315
316    pub fn is_fetch_pending_from_peer(
317        &self,
318        hash: &StateHash,
319        peer_id: &PeerId,
320        rpc_id: P2pRpcId,
321    ) -> bool {
322        self.block_state(hash)
323            .is_some_and(|s| s.is_fetch_pending_from_peer(peer_id, rpc_id))
324    }
325
326    pub fn blocks_fetch_from_peer_pending_rpc_ids<'a>(
327        &'a self,
328        peer_id: &'a PeerId,
329    ) -> impl 'a + Iterator<Item = P2pRpcId> {
330        self.blocks_iter()
331            .filter_map(|b| b.fetch_pending_from_peer_rpc_id(peer_id))
332    }
333
334    pub fn blocks_apply_pending(&self) -> Option<&ArcBlockWithHash> {
335        self.blocks_iter()
336            .find(|s| s.is_apply_pending())
337            .and_then(|s| s.block())
338    }
339
340    pub fn blocks_apply_next(&self) -> Option<(&ArcBlockWithHash, &AppliedBlock)> {
341        let mut last_applied = None;
342        for s in self.blocks_iter() {
343            if s.is_apply_success() {
344                last_applied = s.applied_block();
345            } else if s.is_fetch_success() {
346                return Some((s.block()?, last_applied?));
347            } else {
348                return None;
349            }
350        }
351        None
352    }
353
354    pub fn sync_phase(&self) -> SyncPhase {
355        match self {
356            TransitionFrontierSyncState::Idle
357            | TransitionFrontierSyncState::Init { .. }
358            | TransitionFrontierSyncState::StakingLedgerPending(_)
359            | TransitionFrontierSyncState::StakingLedgerSuccess { .. }
360            | TransitionFrontierSyncState::NextEpochLedgerPending(_)
361            | TransitionFrontierSyncState::NextEpochLedgerSuccess { .. }
362            | TransitionFrontierSyncState::RootLedgerPending(_)
363            | TransitionFrontierSyncState::RootLedgerSuccess { .. } => SyncPhase::Bootstrap,
364            TransitionFrontierSyncState::BlocksPending { .. }
365            | TransitionFrontierSyncState::BlocksSuccess { .. }
366            | TransitionFrontierSyncState::CommitPending { .. }
367            | TransitionFrontierSyncState::CommitSuccess { .. } => SyncPhase::Catchup,
368            TransitionFrontierSyncState::Synced { .. } => SyncPhase::Synced,
369        }
370    }
371}
372
373impl TransitionFrontierSyncBlockState {
374    pub fn is_fetch_success(&self) -> bool {
375        matches!(self, Self::FetchSuccess { .. })
376    }
377
378    pub fn is_apply_pending(&self) -> bool {
379        matches!(self, Self::ApplyPending { .. })
380    }
381
382    pub fn is_apply_error(&self) -> bool {
383        matches!(self, Self::ApplyError { .. })
384    }
385
386    pub fn is_apply_success(&self) -> bool {
387        matches!(self, Self::ApplySuccess { .. })
388    }
389
390    pub fn block_hash(&self) -> &StateHash {
391        match self {
392            Self::FetchPending { block_hash, .. } => block_hash,
393            Self::FetchSuccess { block, .. }
394            | Self::ApplyPending { block, .. }
395            | Self::ApplyError { block, .. } => &block.hash,
396            Self::ApplySuccess { block, .. } => block.hash(),
397        }
398    }
399
400    pub fn block(&self) -> Option<&ArcBlockWithHash> {
401        match self {
402            Self::FetchPending { .. } => None,
403            Self::FetchSuccess { block, .. }
404            | Self::ApplyPending { block, .. }
405            | Self::ApplyError { block, .. } => Some(block),
406            Self::ApplySuccess { block, .. } => Some(block.block_with_hash()),
407        }
408    }
409
410    pub fn applied_block(&self) -> Option<&AppliedBlock> {
411        match self {
412            Self::FetchPending { .. }
413            | Self::FetchSuccess { .. }
414            | Self::ApplyPending { .. }
415            | Self::ApplyError { .. } => None,
416            Self::ApplySuccess { block, .. } => Some(block),
417        }
418    }
419
420    pub fn take_block(self) -> Option<ArcBlockWithHash> {
421        match self {
422            Self::FetchPending { .. } => None,
423            Self::FetchSuccess { block, .. }
424            | Self::ApplyPending { block, .. }
425            | Self::ApplyError { block, .. } => Some(block),
426            Self::ApplySuccess { block, .. } => Some(block.block),
427        }
428    }
429
430    pub fn take_applied_block(self) -> Option<AppliedBlock> {
431        match self {
432            Self::FetchPending { .. }
433            | Self::FetchSuccess { .. }
434            | Self::ApplyPending { .. }
435            | Self::ApplyError { .. } => None,
436            Self::ApplySuccess { block, .. } => Some(block),
437        }
438    }
439
440    pub fn fetch_pending_hash(&self) -> Option<&StateHash> {
441        match self {
442            Self::FetchPending { block_hash, .. } => Some(block_hash),
443            _ => None,
444        }
445    }
446
447    pub fn retry_hash(&self) -> Option<&StateHash> {
448        let Self::FetchPending {
449            block_hash,
450            attempts,
451            ..
452        } = self
453        else {
454            return None;
455        };
456        Some(block_hash)
457            .filter(|_| !attempts.is_empty() && attempts.iter().all(|(_, s)| s.is_error()))
458    }
459
460    pub fn fetch_pending_from_peer_rpc_id(&self, peer_id: &PeerId) -> Option<P2pRpcId> {
461        let Self::FetchPending { attempts, .. } = self else {
462            return None;
463        };
464        attempts.get(peer_id).and_then(|v| v.fetch_pending_rpc_id())
465    }
466
467    pub fn is_fetch_init_from_peer(&self, peer_id: &PeerId) -> bool {
468        let Self::FetchPending { attempts, .. } = self else {
469            return false;
470        };
471        attempts.get(peer_id).is_some_and(|s| s.is_fetch_init())
472    }
473
474    pub fn is_fetch_pending_from_peer(&self, peer_id: &PeerId, rpc_id: P2pRpcId) -> bool {
475        let Self::FetchPending { attempts, .. } = self else {
476            return false;
477        };
478        attempts
479            .get(peer_id)
480            .and_then(|s| s.fetch_pending_rpc_id())
481            .is_some_and(|expected| expected == rpc_id)
482    }
483
484    pub fn fetch_pending_attempts_mut(&mut self) -> Option<&mut BTreeMap<PeerId, PeerRpcState>> {
485        match self {
486            Self::FetchPending { attempts, .. } => Some(attempts),
487            _ => None,
488        }
489    }
490
491    pub fn fetch_pending_from_peer_mut(&mut self, peer_id: &PeerId) -> Option<&mut PeerRpcState> {
492        let Self::FetchPending { attempts, .. } = self else {
493            return None;
494        };
495        attempts.get_mut(peer_id)
496    }
497
498    pub fn fetch_pending_fetched_block(&self) -> Option<&ArcBlockWithHash> {
499        let Self::FetchPending { attempts, .. } = self else {
500            return None;
501        };
502        attempts.iter().find_map(|(_, s)| s.success_block())
503    }
504}
505
506impl PeerRpcState {
507    pub fn is_fetch_init(&self) -> bool {
508        matches!(self, Self::Init { .. })
509    }
510
511    pub fn is_error(&self) -> bool {
512        matches!(self, Self::Error { .. })
513    }
514
515    pub fn is_success(&self) -> bool {
516        matches!(self, Self::Success { .. })
517    }
518
519    pub fn fetch_pending_rpc_id(&self) -> Option<P2pRpcId> {
520        match self {
521            Self::Pending { rpc_id, .. } => Some(*rpc_id),
522            _ => None,
523        }
524    }
525
526    pub fn fetch_pending_since(&self) -> Option<Timestamp> {
527        match self {
528            Self::Pending { time, .. } => Some(*time),
529            _ => None,
530        }
531    }
532
533    pub fn success_block(&self) -> Option<&ArcBlockWithHash> {
534        match self {
535            Self::Success { block, .. } => Some(block),
536            _ => None,
537        }
538    }
539}
540
541impl TransitionFrontierRootSnarkedLedgerUpdates {
542    pub fn get(
543        &self,
544        ledger_hash: &LedgerHash,
545    ) -> Option<&TransitionFrontierRootSnarkedLedgerUpdate> {
546        self.0.get(ledger_hash)
547    }
548
549    pub fn is_empty(&self) -> bool {
550        self.0.is_empty()
551    }
552
553    pub fn len(&self) -> usize {
554        self.0.len()
555    }
556
557    /// Caller must make sure `new_root` is part of `old_chain`.
558    pub fn extend_with_needed<'a>(
559        &mut self,
560        new_root: &ArcBlockWithHash,
561        old_chain: impl 'a + IntoIterator<Item = &'a ArcBlockWithHash>,
562    ) {
563        let mut old_chain = old_chain.into_iter().peekable();
564        let Some(old_root) = old_chain.peek() else {
565            return;
566        };
567
568        let Some(diff_len) = new_root.height().checked_sub(old_root.height()) else {
569            return;
570        };
571
572        if new_root.snarked_ledger_hash() == old_root.snarked_ledger_hash() {
573            return;
574        }
575
576        self.0.extend(
577            old_chain
578                .take(diff_len as usize)
579                .collect::<Vec<_>>()
580                .into_iter()
581                .rev()
582                .scan(new_root, |last_block, b| {
583                    if last_block.snarked_ledger_hash() == b.snarked_ledger_hash() {
584                        *last_block = b;
585                        return Some(None);
586                    }
587                    let last_block = std::mem::replace(last_block, b);
588                    let update = TransitionFrontierRootSnarkedLedgerUpdate {
589                        parent: b.snarked_ledger_hash().clone(),
590                        staged_ledger_hash: last_block.staged_ledger_hashes().clone(),
591                    };
592                    let snarked_ledger_hash = last_block.snarked_ledger_hash().clone();
593
594                    Some(Some((snarked_ledger_hash, update)))
595                })
596                .flatten(),
597        );
598    }
599}