node/stats/
stats_sync.rs

1use std::collections::VecDeque;
2
3use mina_p2p_messages::v2::{LedgerHash, StateHash};
4use openmina_core::block::ArcBlockWithHash;
5use redux::Timestamp;
6use serde::{Deserialize, Serialize};
7
8use crate::transition_frontier::sync::{
9    ledger::SyncLedgerTargetKind, TransitionFrontierSyncBlockState,
10};
11
12const MAX_SNAPSHOTS_LEN: usize = 256;
13
14#[derive(Default)]
15pub struct SyncStats {
16    snapshots: VecDeque<SyncStatsSnapshot>,
17}
18
19#[derive(Serialize, Deserialize, Debug, Clone)]
20pub struct SyncStatsSnapshot {
21    pub kind: SyncKind,
22    pub best_tip_received: Timestamp,
23    pub synced: Option<Timestamp>,
24    pub ledgers: SyncLedgers,
25    pub blocks: Vec<SyncBlock>,
26    pub resyncs: Vec<LedgerResyncEvent>,
27}
28
29#[derive(Serialize, Deserialize, Debug, Clone)]
30pub enum SyncKind {
31    Bootstrap,
32    Catchup,
33}
34
35#[derive(Serialize, Deserialize, Debug, Clone)]
36pub enum LedgerResyncKind {
37    FetchStagedLedgerError(String),
38    RootLedgerChange,
39    EpochChange,
40    BestChainChange,
41}
42
43#[derive(Serialize, Deserialize, Debug, Clone)]
44pub struct LedgerResyncEvent {
45    pub kind: LedgerResyncKind,
46    pub time: Timestamp,
47}
48
49#[derive(Serialize, Deserialize, Debug, Default, Clone)]
50pub struct SyncLedgers {
51    pub staking_epoch: Option<SyncLedger>,
52    pub next_epoch: Option<SyncLedger>,
53    pub root: Option<SyncLedger>,
54}
55
56impl SyncLedgers {
57    /// Figure out if a resync is required, and if so, for what reason.
58    fn resync_kind(
59        &self,
60        best_tip: &ArcBlockWithHash,
61        root_block: &ArcBlockWithHash,
62    ) -> Option<LedgerResyncKind> {
63        let consensus_state = &best_tip.block.header.protocol_state.body.consensus_state;
64        let new_staking_epoch_ledger_hash = &consensus_state.staking_epoch_data.ledger.hash;
65        let new_root_ledger_hash = root_block.snarked_ledger_hash();
66
67        let staking_epoch_ledger_changed = self
68            .staking_epoch
69            .as_ref()
70            .and_then(|sync| sync.snarked.hash.as_ref())
71            .map(|prev_staking_epoch_ledger_hash| {
72                prev_staking_epoch_ledger_hash != new_staking_epoch_ledger_hash
73            })
74            .unwrap_or(false);
75
76        if let Some(prev_next_epoch_snarked_hash) = self
77            .next_epoch
78            .as_ref()
79            .and_then(|sync| sync.snarked.hash.as_ref())
80        {
81            if prev_next_epoch_snarked_hash == new_staking_epoch_ledger_hash {
82                // Previous next epoch moved to staking ledger, which means we advanced one epoch
83                return Some(LedgerResyncKind::EpochChange);
84            } else if staking_epoch_ledger_changed {
85                // If we didn't advance an epoch and neither epoch ledger hash matches, then the best chain changed
86                return Some(LedgerResyncKind::BestChainChange);
87            }
88        }
89
90        if let Some(prev_root_ledger_hash) = self
91            .root
92            .as_ref()
93            .and_then(|sync| sync.snarked.hash.as_ref())
94        {
95            if prev_root_ledger_hash != new_root_ledger_hash {
96                return Some(LedgerResyncKind::RootLedgerChange);
97            }
98        }
99
100        None
101    }
102}
103
104#[derive(Serialize, Deserialize, Debug, Default, Clone)]
105pub struct SyncLedger {
106    pub snarked: SyncSnarkedLedger,
107    pub staged: SyncStagedLedger,
108}
109
110#[derive(Serialize, Deserialize, Debug, Default, Clone)]
111pub struct SyncSnarkedLedger {
112    pub hash: Option<LedgerHash>,
113    pub fetch_hashes_start: Option<Timestamp>,
114    pub fetch_hashes_end: Option<Timestamp>,
115    pub fetch_accounts_start: Option<Timestamp>,
116    pub fetch_accounts_end: Option<Timestamp>,
117}
118
119#[derive(Serialize, Deserialize, Debug, Default, Clone)]
120pub struct SyncStagedLedger {
121    pub hash: Option<LedgerHash>,
122    pub fetch_parts_start: Option<Timestamp>,
123    pub fetch_parts_end: Option<Timestamp>,
124    pub reconstruct_start: Option<Timestamp>,
125    pub reconstruct_end: Option<Timestamp>,
126}
127
128#[derive(Serialize, Deserialize, Debug, Clone)]
129pub struct SyncBlock {
130    pub global_slot: Option<u32>,
131    pub height: u32,
132    pub hash: StateHash,
133    pub pred_hash: StateHash,
134    pub status: SyncBlockStatus,
135    pub fetch_start: Option<Timestamp>,
136    pub fetch_end: Option<Timestamp>,
137    pub apply_start: Option<Timestamp>,
138    pub apply_end: Option<Timestamp>,
139}
140
141#[derive(Serialize, Deserialize, Debug, Clone)]
142pub enum SyncBlockStatus {
143    Missing,
144    Fetching,
145    Fetched,
146    Applying,
147    ApplyFailed,
148    Applied,
149}
150
151pub enum SyncingLedger {
152    Init {
153        snarked_ledger_hash: LedgerHash,
154        staged_ledger_hash: Option<LedgerHash>,
155    },
156    FetchHashes {
157        start: Timestamp,
158        end: Timestamp,
159    },
160    FetchAccounts {
161        start: Timestamp,
162        end: Timestamp,
163    },
164    FetchParts {
165        start: Timestamp,
166        end: Option<Timestamp>,
167    },
168    ApplyParts {
169        start: Timestamp,
170        end: Option<Timestamp>,
171    },
172}
173
174impl SyncStats {
175    pub fn new_target(
176        &mut self,
177        time: Timestamp,
178        best_tip: &ArcBlockWithHash,
179        root_block: &ArcBlockWithHash,
180    ) -> &mut Self {
181        let kind = match self
182            .snapshots
183            .back()
184            .is_none_or(|s| matches!(s.kind, SyncKind::Bootstrap) && s.synced.is_none())
185        {
186            true => SyncKind::Bootstrap,
187            false => SyncKind::Catchup,
188        };
189        let best_tip_block_state = SyncBlock {
190            global_slot: Some(best_tip.global_slot()),
191            height: best_tip.height(),
192            hash: best_tip.hash().clone(),
193            pred_hash: best_tip.pred_hash().clone(),
194            status: SyncBlockStatus::Fetched,
195            fetch_start: None,
196            fetch_end: None,
197            apply_start: None,
198            apply_end: None,
199        };
200
201        if self.snapshots.len() >= MAX_SNAPSHOTS_LEN {
202            self.snapshots.pop_front();
203        }
204
205        // Retain the target ledger information from previous epochs in `ledgers`.
206        // This ensures that the frontend continues to have access to historical ledger data even
207        // after the node completes synchronization (at which point the sync stats no longer receive
208        // updates about the older epoch or root ledgers).
209        let ledgers = self
210            .snapshots
211            .back()
212            .map_or_else(Default::default, |snapshot| snapshot.ledgers.clone());
213
214        let mut resyncs = self
215            .snapshots
216            .back()
217            .map_or_else(Default::default, |snapshot| snapshot.resyncs.clone());
218
219        if let Some(prev_snapshot) = self.snapshots.back() {
220            if prev_snapshot.synced.is_none() {
221                if let Some(kind) = prev_snapshot.ledgers.resync_kind(best_tip, root_block) {
222                    resyncs.push(LedgerResyncEvent { kind, time });
223                }
224            }
225        }
226
227        self.snapshots.push_back(SyncStatsSnapshot {
228            kind,
229            best_tip_received: time,
230            synced: None,
231            ledgers,
232            blocks: vec![best_tip_block_state],
233            resyncs,
234        });
235
236        self
237    }
238
239    pub fn ledger(&mut self, kind: SyncLedgerTargetKind, update: SyncingLedger) -> &mut Self {
240        let Some(mut snapshot) = self.snapshots.pop_back() else {
241            return self;
242        };
243        let ledger = snapshot.ledgers.get_or_insert(kind);
244
245        match update {
246            SyncingLedger::Init {
247                snarked_ledger_hash,
248                staged_ledger_hash,
249            } => {
250                ledger.snarked.hash = Some(snarked_ledger_hash);
251                ledger.staged.hash = staged_ledger_hash;
252
253                if let Some(prev_sync) = &self.snapshots.back().and_then(|s| s.ledgers.get(kind)) {
254                    if prev_sync.snarked.hash == ledger.snarked.hash {
255                        ledger.snarked = prev_sync.snarked.clone();
256                    }
257
258                    if prev_sync.staged.hash == ledger.staged.hash {
259                        ledger.staged = prev_sync.staged.clone();
260                    }
261                }
262            }
263            SyncingLedger::FetchHashes { start, end } => {
264                ledger.snarked.fetch_hashes_start.get_or_insert(start);
265                let cur_end = ledger.snarked.fetch_hashes_end.get_or_insert(end);
266                *cur_end = end.max(*cur_end);
267            }
268            SyncingLedger::FetchAccounts { start, end } => {
269                ledger.snarked.fetch_accounts_start.get_or_insert(start);
270                let cur_end = ledger.snarked.fetch_accounts_end.get_or_insert(end);
271                *cur_end = end.max(*cur_end);
272            }
273            SyncingLedger::FetchParts { start, end } => {
274                ledger.staged.fetch_parts_start.get_or_insert(start);
275                if let Some(end) = end {
276                    let cur_end = ledger.staged.fetch_parts_end.get_or_insert(end);
277                    *cur_end = end.max(*cur_end);
278                }
279            }
280            SyncingLedger::ApplyParts { start, end } => {
281                ledger.staged.reconstruct_start.get_or_insert(start);
282                if let Some(end) = end {
283                    let cur_end = ledger.staged.reconstruct_end.get_or_insert(end);
284                    *cur_end = end.max(*cur_end);
285                }
286            }
287        }
288
289        self.snapshots.push_back(snapshot);
290
291        self
292    }
293
294    pub fn blocks_init(&mut self, states: &[TransitionFrontierSyncBlockState]) -> &mut Self {
295        let Some(snapshot) = self.snapshots.back_mut() else {
296            return self;
297        };
298        let Some((_root_height, best_tip_height)) = states
299            .last()
300            .and_then(|s| s.block())
301            .map(|b| (b.root_block_height(), b.height()))
302        else {
303            return self;
304        };
305
306        snapshot.blocks = states
307            .iter()
308            .rev()
309            // .take_while(|s| {
310            //     !s.is_apply_success() || s.block().is_some_and( |b| b.height() == root_height)
311            // })
312            .enumerate()
313            .map(|(i, s)| {
314                let height = best_tip_height.checked_sub(i as u32).expect("underflow");
315                let hash = s.block_hash().clone();
316                let pred_hash = s
317                    .block()
318                    .map(|b| b.pred_hash())
319                    .unwrap_or_else(|| {
320                        states
321                            .get(
322                                states
323                                    .len()
324                                    .saturating_sub(i)
325                                    .checked_sub(2)
326                                    .expect("underflow"),
327                            )
328                            .unwrap() // can't fail because index is less than states.len()
329                            .block_hash()
330                    })
331                    .clone();
332                let mut stats = SyncBlock::new(height, hash, pred_hash);
333                stats.update_with_block_state(s);
334                stats
335            })
336            .collect();
337
338        self
339    }
340
341    pub fn block_update(&mut self, state: &TransitionFrontierSyncBlockState) -> &mut Self {
342        let Some(snapshot) = self.snapshots.back_mut() else {
343            return self;
344        };
345        let block_hash = state.block_hash();
346        let Some(stats) = snapshot.blocks.iter_mut().find(|b| &b.hash == block_hash) else {
347            return self;
348        };
349        stats.update_with_block_state(state);
350        self
351    }
352
353    pub fn synced(&mut self, time: Timestamp) -> &mut Self {
354        let Some(snapshot) = self.snapshots.back_mut() else {
355            return self;
356        };
357        snapshot.synced = Some(time);
358        self
359    }
360
361    pub fn collect_stats(&self, limit: Option<usize>) -> Vec<SyncStatsSnapshot> {
362        let limit = limit.unwrap_or(usize::MAX);
363        self.snapshots.iter().rev().take(limit).cloned().collect()
364    }
365
366    pub fn staging_ledger_fetch_failure(&mut self, error: String, time: Timestamp) {
367        if let Some(snapshot) = self.snapshots.back_mut() {
368            snapshot.resyncs.push(LedgerResyncEvent {
369                kind: LedgerResyncKind::FetchStagedLedgerError(error),
370                time,
371            })
372        }
373    }
374}
375
376impl SyncLedgers {
377    pub fn get(&self, kind: SyncLedgerTargetKind) -> Option<&SyncLedger> {
378        match kind {
379            SyncLedgerTargetKind::StakingEpoch => self.staking_epoch.as_ref(),
380            SyncLedgerTargetKind::NextEpoch => self.next_epoch.as_ref(),
381            SyncLedgerTargetKind::Root => self.root.as_ref(),
382        }
383    }
384
385    fn get_or_insert(&mut self, kind: SyncLedgerTargetKind) -> &mut SyncLedger {
386        match kind {
387            SyncLedgerTargetKind::StakingEpoch => {
388                self.staking_epoch.get_or_insert_with(Default::default)
389            }
390            SyncLedgerTargetKind::NextEpoch => self.next_epoch.get_or_insert_with(Default::default),
391            SyncLedgerTargetKind::Root => self.root.get_or_insert_with(Default::default),
392        }
393    }
394}
395
396impl SyncBlock {
397    pub fn new(height: u32, hash: StateHash, pred_hash: StateHash) -> Self {
398        Self {
399            global_slot: None,
400            height,
401            hash,
402            pred_hash,
403            status: SyncBlockStatus::Missing,
404            fetch_start: None,
405            fetch_end: None,
406            apply_start: None,
407            apply_end: None,
408        }
409    }
410
411    pub fn update_with_block_state(&mut self, state: &TransitionFrontierSyncBlockState) {
412        match state {
413            TransitionFrontierSyncBlockState::FetchPending { attempts, .. } => {
414                if let Some(time) = attempts
415                    .iter()
416                    .filter_map(|(_, v)| v.fetch_pending_since())
417                    .min()
418                {
419                    self.status = SyncBlockStatus::Fetching;
420                    self.fetch_start.get_or_insert(time);
421                } else {
422                    self.status = SyncBlockStatus::Missing;
423                }
424            }
425            TransitionFrontierSyncBlockState::FetchSuccess { time, block, .. } => {
426                self.global_slot.get_or_insert_with(|| block.global_slot());
427                self.status = SyncBlockStatus::Fetched;
428                self.fetch_end = Some(*time);
429            }
430            TransitionFrontierSyncBlockState::ApplyPending { time, block, .. } => {
431                self.global_slot.get_or_insert_with(|| block.global_slot());
432                self.status = SyncBlockStatus::Applying;
433                self.apply_start = Some(*time);
434            }
435            TransitionFrontierSyncBlockState::ApplyError { time, block, .. } => {
436                self.global_slot.get_or_insert_with(|| block.global_slot());
437                self.status = SyncBlockStatus::ApplyFailed;
438                self.apply_end = Some(*time);
439            }
440            TransitionFrontierSyncBlockState::ApplySuccess { time, block, .. } => {
441                self.global_slot.get_or_insert_with(|| block.global_slot());
442                self.status = SyncBlockStatus::Applied;
443                self.apply_end = Some(*time);
444            }
445        }
446    }
447}