Skip to main content

mina_node/stats/
stats_sync.rs

1use std::collections::VecDeque;
2
3use mina_core::block::ArcBlockWithHash;
4use mina_p2p_messages::v2::{LedgerHash, StateHash};
5use redux::Timestamp;
6use serde::{Deserialize, Serialize};
7
8use crate::transition_frontier::sync::{
9    ledger::SyncLedgerTargetKind, TransitionFrontierSyncBlockState,
10};
11
12const MAX_SNAPSHOTS_LEN: usize = 256;
13
14#[derive(Default)]
15pub struct SyncStats {
16    snapshots: VecDeque<SyncStatsSnapshot>,
17}
18
19#[derive(Serialize, Deserialize, Debug, Clone)]
20#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
21pub struct SyncStatsSnapshot {
22    pub kind: SyncKind,
23    pub best_tip_received: Timestamp,
24    pub synced: Option<Timestamp>,
25    pub ledgers: SyncLedgers,
26    pub blocks: Vec<SyncBlock>,
27    pub resyncs: Vec<LedgerResyncEvent>,
28}
29
30#[derive(Serialize, Deserialize, Debug, Clone)]
31#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
32pub enum SyncKind {
33    Bootstrap,
34    Catchup,
35}
36
37#[derive(Serialize, Deserialize, Debug, Clone)]
38#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
39pub enum LedgerResyncKind {
40    FetchStagedLedgerError(String),
41    RootLedgerChange,
42    EpochChange,
43    BestChainChange,
44}
45
46#[derive(Serialize, Deserialize, Debug, Clone)]
47#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
48pub struct LedgerResyncEvent {
49    pub kind: LedgerResyncKind,
50    pub time: Timestamp,
51}
52
53#[derive(Serialize, Deserialize, Debug, Default, Clone)]
54#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
55pub struct SyncLedgers {
56    pub staking_epoch: Option<SyncLedger>,
57    pub next_epoch: Option<SyncLedger>,
58    pub root: Option<SyncLedger>,
59}
60
61impl SyncLedgers {
62    /// Figure out if a resync is required, and if so, for what reason.
63    fn resync_kind(
64        &self,
65        best_tip: &ArcBlockWithHash,
66        root_block: &ArcBlockWithHash,
67    ) -> Option<LedgerResyncKind> {
68        let consensus_state = &best_tip.block.header.protocol_state.body.consensus_state;
69        let new_staking_epoch_ledger_hash = &consensus_state.staking_epoch_data.ledger.hash;
70        let new_root_ledger_hash = root_block.snarked_ledger_hash();
71
72        let staking_epoch_ledger_changed = self
73            .staking_epoch
74            .as_ref()
75            .and_then(|sync| sync.snarked.hash.as_ref())
76            .map(|prev_staking_epoch_ledger_hash| {
77                prev_staking_epoch_ledger_hash != new_staking_epoch_ledger_hash
78            })
79            .unwrap_or(false);
80
81        if let Some(prev_next_epoch_snarked_hash) = self
82            .next_epoch
83            .as_ref()
84            .and_then(|sync| sync.snarked.hash.as_ref())
85        {
86            if prev_next_epoch_snarked_hash == new_staking_epoch_ledger_hash {
87                // Previous next epoch moved to staking ledger, which means we advanced one epoch
88                return Some(LedgerResyncKind::EpochChange);
89            } else if staking_epoch_ledger_changed {
90                // If we didn't advance an epoch and neither epoch ledger hash matches, then the best chain changed
91                return Some(LedgerResyncKind::BestChainChange);
92            }
93        }
94
95        if let Some(prev_root_ledger_hash) = self
96            .root
97            .as_ref()
98            .and_then(|sync| sync.snarked.hash.as_ref())
99        {
100            if prev_root_ledger_hash != new_root_ledger_hash {
101                return Some(LedgerResyncKind::RootLedgerChange);
102            }
103        }
104
105        None
106    }
107}
108
109#[derive(Serialize, Deserialize, Debug, Default, Clone)]
110#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
111pub struct SyncLedger {
112    pub snarked: SyncSnarkedLedger,
113    pub staged: SyncStagedLedger,
114}
115
116#[derive(Serialize, Deserialize, Debug, Default, Clone)]
117#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
118pub struct SyncSnarkedLedger {
119    // TODO(openapi): generate ToSchema in Base58CheckOfBinProt macro
120    #[cfg_attr(feature = "openapi", schema(value_type = Option<String>))]
121    pub hash: Option<LedgerHash>,
122    pub fetch_hashes_start: Option<Timestamp>,
123    pub fetch_hashes_end: Option<Timestamp>,
124    pub fetch_accounts_start: Option<Timestamp>,
125    pub fetch_accounts_end: Option<Timestamp>,
126}
127
128#[derive(Serialize, Deserialize, Debug, Default, Clone)]
129#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
130pub struct SyncStagedLedger {
131    // TODO(openapi): generate ToSchema in Base58CheckOfBinProt macro
132    #[cfg_attr(feature = "openapi", schema(value_type = Option<String>))]
133    pub hash: Option<LedgerHash>,
134    pub fetch_parts_start: Option<Timestamp>,
135    pub fetch_parts_end: Option<Timestamp>,
136    pub reconstruct_start: Option<Timestamp>,
137    pub reconstruct_end: Option<Timestamp>,
138}
139
140#[derive(Serialize, Deserialize, Debug, Clone)]
141#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
142pub struct SyncBlock {
143    pub global_slot: Option<u32>,
144    pub height: u32,
145    pub hash: StateHash,
146    pub pred_hash: StateHash,
147    pub status: SyncBlockStatus,
148    pub fetch_start: Option<Timestamp>,
149    pub fetch_end: Option<Timestamp>,
150    pub apply_start: Option<Timestamp>,
151    pub apply_end: Option<Timestamp>,
152}
153
154#[derive(Serialize, Deserialize, Debug, Clone)]
155#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
156pub enum SyncBlockStatus {
157    Missing,
158    Fetching,
159    Fetched,
160    Applying,
161    ApplyFailed,
162    Applied,
163}
164
165pub enum SyncingLedger {
166    Init {
167        snarked_ledger_hash: LedgerHash,
168        staged_ledger_hash: Option<LedgerHash>,
169    },
170    FetchHashes {
171        start: Timestamp,
172        end: Timestamp,
173    },
174    FetchAccounts {
175        start: Timestamp,
176        end: Timestamp,
177    },
178    FetchParts {
179        start: Timestamp,
180        end: Option<Timestamp>,
181    },
182    ApplyParts {
183        start: Timestamp,
184        end: Option<Timestamp>,
185    },
186}
187
188impl SyncStats {
189    pub fn new_target(
190        &mut self,
191        time: Timestamp,
192        best_tip: &ArcBlockWithHash,
193        root_block: &ArcBlockWithHash,
194    ) -> &mut Self {
195        let kind = match self
196            .snapshots
197            .back()
198            .is_none_or(|s| matches!(s.kind, SyncKind::Bootstrap) && s.synced.is_none())
199        {
200            true => SyncKind::Bootstrap,
201            false => SyncKind::Catchup,
202        };
203        let best_tip_block_state = SyncBlock {
204            global_slot: Some(best_tip.global_slot()),
205            height: best_tip.height(),
206            hash: best_tip.hash().clone(),
207            pred_hash: best_tip.pred_hash().clone(),
208            status: SyncBlockStatus::Fetched,
209            fetch_start: None,
210            fetch_end: None,
211            apply_start: None,
212            apply_end: None,
213        };
214
215        if self.snapshots.len() >= MAX_SNAPSHOTS_LEN {
216            self.snapshots.pop_front();
217        }
218
219        // Retain the target ledger information from previous epochs in `ledgers`.
220        // This ensures that the frontend continues to have access to historical ledger data even
221        // after the node completes synchronization (at which point the sync stats no longer receive
222        // updates about the older epoch or root ledgers).
223        let ledgers = self
224            .snapshots
225            .back()
226            .map_or_else(Default::default, |snapshot| snapshot.ledgers.clone());
227
228        let mut resyncs = self
229            .snapshots
230            .back()
231            .map_or_else(Default::default, |snapshot| snapshot.resyncs.clone());
232
233        if let Some(prev_snapshot) = self.snapshots.back() {
234            if prev_snapshot.synced.is_none() {
235                if let Some(kind) = prev_snapshot.ledgers.resync_kind(best_tip, root_block) {
236                    resyncs.push(LedgerResyncEvent { kind, time });
237                }
238            }
239        }
240
241        self.snapshots.push_back(SyncStatsSnapshot {
242            kind,
243            best_tip_received: time,
244            synced: None,
245            ledgers,
246            blocks: vec![best_tip_block_state],
247            resyncs,
248        });
249
250        self
251    }
252
253    pub fn ledger(&mut self, kind: SyncLedgerTargetKind, update: SyncingLedger) -> &mut Self {
254        let Some(mut snapshot) = self.snapshots.pop_back() else {
255            return self;
256        };
257        let ledger = snapshot.ledgers.get_or_insert(kind);
258
259        match update {
260            SyncingLedger::Init {
261                snarked_ledger_hash,
262                staged_ledger_hash,
263            } => {
264                ledger.snarked.hash = Some(snarked_ledger_hash);
265                ledger.staged.hash = staged_ledger_hash;
266
267                if let Some(prev_sync) = &self.snapshots.back().and_then(|s| s.ledgers.get(kind)) {
268                    if prev_sync.snarked.hash == ledger.snarked.hash {
269                        ledger.snarked = prev_sync.snarked.clone();
270                    }
271
272                    if prev_sync.staged.hash == ledger.staged.hash {
273                        ledger.staged = prev_sync.staged.clone();
274                    }
275                }
276            }
277            SyncingLedger::FetchHashes { start, end } => {
278                ledger.snarked.fetch_hashes_start.get_or_insert(start);
279                let cur_end = ledger.snarked.fetch_hashes_end.get_or_insert(end);
280                *cur_end = end.max(*cur_end);
281            }
282            SyncingLedger::FetchAccounts { start, end } => {
283                ledger.snarked.fetch_accounts_start.get_or_insert(start);
284                let cur_end = ledger.snarked.fetch_accounts_end.get_or_insert(end);
285                *cur_end = end.max(*cur_end);
286            }
287            SyncingLedger::FetchParts { start, end } => {
288                ledger.staged.fetch_parts_start.get_or_insert(start);
289                if let Some(end) = end {
290                    let cur_end = ledger.staged.fetch_parts_end.get_or_insert(end);
291                    *cur_end = end.max(*cur_end);
292                }
293            }
294            SyncingLedger::ApplyParts { start, end } => {
295                ledger.staged.reconstruct_start.get_or_insert(start);
296                if let Some(end) = end {
297                    let cur_end = ledger.staged.reconstruct_end.get_or_insert(end);
298                    *cur_end = end.max(*cur_end);
299                }
300            }
301        }
302
303        self.snapshots.push_back(snapshot);
304
305        self
306    }
307
308    pub fn blocks_init(&mut self, states: &[TransitionFrontierSyncBlockState]) -> &mut Self {
309        let Some(snapshot) = self.snapshots.back_mut() else {
310            return self;
311        };
312        let Some((_root_height, best_tip_height)) = states
313            .last()
314            .and_then(|s| s.block())
315            .map(|b| (b.root_block_height(), b.height()))
316        else {
317            return self;
318        };
319
320        snapshot.blocks = states
321            .iter()
322            .rev()
323            // .take_while(|s| {
324            //     !s.is_apply_success() || s.block().is_some_and( |b| b.height() == root_height)
325            // })
326            .enumerate()
327            .map(|(i, s)| {
328                let height = best_tip_height.checked_sub(i as u32).expect("underflow");
329                let hash = s.block_hash().clone();
330                let pred_hash = s
331                    .block()
332                    .map(|b| b.pred_hash())
333                    .unwrap_or_else(|| {
334                        states
335                            .get(
336                                states
337                                    .len()
338                                    .saturating_sub(i)
339                                    .checked_sub(2)
340                                    .expect("underflow"),
341                            )
342                            .unwrap() // can't fail because index is less than states.len()
343                            .block_hash()
344                    })
345                    .clone();
346                let mut stats = SyncBlock::new(height, hash, pred_hash);
347                stats.update_with_block_state(s);
348                stats
349            })
350            .collect();
351
352        self
353    }
354
355    pub fn block_update(&mut self, state: &TransitionFrontierSyncBlockState) -> &mut Self {
356        let Some(snapshot) = self.snapshots.back_mut() else {
357            return self;
358        };
359        let block_hash = state.block_hash();
360        let Some(stats) = snapshot.blocks.iter_mut().find(|b| &b.hash == block_hash) else {
361            return self;
362        };
363        stats.update_with_block_state(state);
364        self
365    }
366
367    pub fn synced(&mut self, time: Timestamp) -> &mut Self {
368        let Some(snapshot) = self.snapshots.back_mut() else {
369            return self;
370        };
371        snapshot.synced = Some(time);
372        self
373    }
374
375    pub fn collect_stats(&self, limit: Option<usize>) -> Vec<SyncStatsSnapshot> {
376        let limit = limit.unwrap_or(usize::MAX);
377        self.snapshots.iter().rev().take(limit).cloned().collect()
378    }
379
380    pub fn staging_ledger_fetch_failure(&mut self, error: String, time: Timestamp) {
381        if let Some(snapshot) = self.snapshots.back_mut() {
382            snapshot.resyncs.push(LedgerResyncEvent {
383                kind: LedgerResyncKind::FetchStagedLedgerError(error),
384                time,
385            })
386        }
387    }
388}
389
390impl SyncLedgers {
391    pub fn get(&self, kind: SyncLedgerTargetKind) -> Option<&SyncLedger> {
392        match kind {
393            SyncLedgerTargetKind::StakingEpoch => self.staking_epoch.as_ref(),
394            SyncLedgerTargetKind::NextEpoch => self.next_epoch.as_ref(),
395            SyncLedgerTargetKind::Root => self.root.as_ref(),
396        }
397    }
398
399    fn get_or_insert(&mut self, kind: SyncLedgerTargetKind) -> &mut SyncLedger {
400        match kind {
401            SyncLedgerTargetKind::StakingEpoch => {
402                self.staking_epoch.get_or_insert_with(Default::default)
403            }
404            SyncLedgerTargetKind::NextEpoch => self.next_epoch.get_or_insert_with(Default::default),
405            SyncLedgerTargetKind::Root => self.root.get_or_insert_with(Default::default),
406        }
407    }
408}
409
410impl SyncBlock {
411    pub fn new(height: u32, hash: StateHash, pred_hash: StateHash) -> Self {
412        Self {
413            global_slot: None,
414            height,
415            hash,
416            pred_hash,
417            status: SyncBlockStatus::Missing,
418            fetch_start: None,
419            fetch_end: None,
420            apply_start: None,
421            apply_end: None,
422        }
423    }
424
425    pub fn update_with_block_state(&mut self, state: &TransitionFrontierSyncBlockState) {
426        match state {
427            TransitionFrontierSyncBlockState::FetchPending { attempts, .. } => {
428                if let Some(time) = attempts
429                    .iter()
430                    .filter_map(|(_, v)| v.fetch_pending_since())
431                    .min()
432                {
433                    self.status = SyncBlockStatus::Fetching;
434                    self.fetch_start.get_or_insert(time);
435                } else {
436                    self.status = SyncBlockStatus::Missing;
437                }
438            }
439            TransitionFrontierSyncBlockState::FetchSuccess { time, block, .. } => {
440                self.global_slot.get_or_insert_with(|| block.global_slot());
441                self.status = SyncBlockStatus::Fetched;
442                self.fetch_end = Some(*time);
443            }
444            TransitionFrontierSyncBlockState::ApplyPending { time, block, .. } => {
445                self.global_slot.get_or_insert_with(|| block.global_slot());
446                self.status = SyncBlockStatus::Applying;
447                self.apply_start = Some(*time);
448            }
449            TransitionFrontierSyncBlockState::ApplyError { time, block, .. } => {
450                self.global_slot.get_or_insert_with(|| block.global_slot());
451                self.status = SyncBlockStatus::ApplyFailed;
452                self.apply_end = Some(*time);
453            }
454            TransitionFrontierSyncBlockState::ApplySuccess { time, block, .. } => {
455                self.global_slot.get_or_insert_with(|| block.global_slot());
456                self.status = SyncBlockStatus::Applied;
457                self.apply_end = Some(*time);
458            }
459        }
460    }
461}