1use std::collections::VecDeque;
2
3use mina_p2p_messages::v2::{LedgerHash, StateHash};
4use openmina_core::block::ArcBlockWithHash;
5use redux::Timestamp;
6use serde::{Deserialize, Serialize};
7
8use crate::transition_frontier::sync::{
9 ledger::SyncLedgerTargetKind, TransitionFrontierSyncBlockState,
10};
11
12const MAX_SNAPSHOTS_LEN: usize = 256;
13
14#[derive(Default)]
15pub struct SyncStats {
16 snapshots: VecDeque<SyncStatsSnapshot>,
17}
18
19#[derive(Serialize, Deserialize, Debug, Clone)]
20pub struct SyncStatsSnapshot {
21 pub kind: SyncKind,
22 pub best_tip_received: Timestamp,
23 pub synced: Option<Timestamp>,
24 pub ledgers: SyncLedgers,
25 pub blocks: Vec<SyncBlock>,
26 pub resyncs: Vec<LedgerResyncEvent>,
27}
28
29#[derive(Serialize, Deserialize, Debug, Clone)]
30pub enum SyncKind {
31 Bootstrap,
32 Catchup,
33}
34
35#[derive(Serialize, Deserialize, Debug, Clone)]
36pub enum LedgerResyncKind {
37 FetchStagedLedgerError(String),
38 RootLedgerChange,
39 EpochChange,
40 BestChainChange,
41}
42
43#[derive(Serialize, Deserialize, Debug, Clone)]
44pub struct LedgerResyncEvent {
45 pub kind: LedgerResyncKind,
46 pub time: Timestamp,
47}
48
49#[derive(Serialize, Deserialize, Debug, Default, Clone)]
50pub struct SyncLedgers {
51 pub staking_epoch: Option<SyncLedger>,
52 pub next_epoch: Option<SyncLedger>,
53 pub root: Option<SyncLedger>,
54}
55
56impl SyncLedgers {
57 fn resync_kind(
59 &self,
60 best_tip: &ArcBlockWithHash,
61 root_block: &ArcBlockWithHash,
62 ) -> Option<LedgerResyncKind> {
63 let consensus_state = &best_tip.block.header.protocol_state.body.consensus_state;
64 let new_staking_epoch_ledger_hash = &consensus_state.staking_epoch_data.ledger.hash;
65 let new_root_ledger_hash = root_block.snarked_ledger_hash();
66
67 let staking_epoch_ledger_changed = self
68 .staking_epoch
69 .as_ref()
70 .and_then(|sync| sync.snarked.hash.as_ref())
71 .map(|prev_staking_epoch_ledger_hash| {
72 prev_staking_epoch_ledger_hash != new_staking_epoch_ledger_hash
73 })
74 .unwrap_or(false);
75
76 if let Some(prev_next_epoch_snarked_hash) = self
77 .next_epoch
78 .as_ref()
79 .and_then(|sync| sync.snarked.hash.as_ref())
80 {
81 if prev_next_epoch_snarked_hash == new_staking_epoch_ledger_hash {
82 return Some(LedgerResyncKind::EpochChange);
84 } else if staking_epoch_ledger_changed {
85 return Some(LedgerResyncKind::BestChainChange);
87 }
88 }
89
90 if let Some(prev_root_ledger_hash) = self
91 .root
92 .as_ref()
93 .and_then(|sync| sync.snarked.hash.as_ref())
94 {
95 if prev_root_ledger_hash != new_root_ledger_hash {
96 return Some(LedgerResyncKind::RootLedgerChange);
97 }
98 }
99
100 None
101 }
102}
103
104#[derive(Serialize, Deserialize, Debug, Default, Clone)]
105pub struct SyncLedger {
106 pub snarked: SyncSnarkedLedger,
107 pub staged: SyncStagedLedger,
108}
109
110#[derive(Serialize, Deserialize, Debug, Default, Clone)]
111pub struct SyncSnarkedLedger {
112 pub hash: Option<LedgerHash>,
113 pub fetch_hashes_start: Option<Timestamp>,
114 pub fetch_hashes_end: Option<Timestamp>,
115 pub fetch_accounts_start: Option<Timestamp>,
116 pub fetch_accounts_end: Option<Timestamp>,
117}
118
119#[derive(Serialize, Deserialize, Debug, Default, Clone)]
120pub struct SyncStagedLedger {
121 pub hash: Option<LedgerHash>,
122 pub fetch_parts_start: Option<Timestamp>,
123 pub fetch_parts_end: Option<Timestamp>,
124 pub reconstruct_start: Option<Timestamp>,
125 pub reconstruct_end: Option<Timestamp>,
126}
127
128#[derive(Serialize, Deserialize, Debug, Clone)]
129pub struct SyncBlock {
130 pub global_slot: Option<u32>,
131 pub height: u32,
132 pub hash: StateHash,
133 pub pred_hash: StateHash,
134 pub status: SyncBlockStatus,
135 pub fetch_start: Option<Timestamp>,
136 pub fetch_end: Option<Timestamp>,
137 pub apply_start: Option<Timestamp>,
138 pub apply_end: Option<Timestamp>,
139}
140
141#[derive(Serialize, Deserialize, Debug, Clone)]
142pub enum SyncBlockStatus {
143 Missing,
144 Fetching,
145 Fetched,
146 Applying,
147 ApplyFailed,
148 Applied,
149}
150
151pub enum SyncingLedger {
152 Init {
153 snarked_ledger_hash: LedgerHash,
154 staged_ledger_hash: Option<LedgerHash>,
155 },
156 FetchHashes {
157 start: Timestamp,
158 end: Timestamp,
159 },
160 FetchAccounts {
161 start: Timestamp,
162 end: Timestamp,
163 },
164 FetchParts {
165 start: Timestamp,
166 end: Option<Timestamp>,
167 },
168 ApplyParts {
169 start: Timestamp,
170 end: Option<Timestamp>,
171 },
172}
173
174impl SyncStats {
175 pub fn new_target(
176 &mut self,
177 time: Timestamp,
178 best_tip: &ArcBlockWithHash,
179 root_block: &ArcBlockWithHash,
180 ) -> &mut Self {
181 let kind = match self
182 .snapshots
183 .back()
184 .is_none_or(|s| matches!(s.kind, SyncKind::Bootstrap) && s.synced.is_none())
185 {
186 true => SyncKind::Bootstrap,
187 false => SyncKind::Catchup,
188 };
189 let best_tip_block_state = SyncBlock {
190 global_slot: Some(best_tip.global_slot()),
191 height: best_tip.height(),
192 hash: best_tip.hash().clone(),
193 pred_hash: best_tip.pred_hash().clone(),
194 status: SyncBlockStatus::Fetched,
195 fetch_start: None,
196 fetch_end: None,
197 apply_start: None,
198 apply_end: None,
199 };
200
201 if self.snapshots.len() >= MAX_SNAPSHOTS_LEN {
202 self.snapshots.pop_front();
203 }
204
205 let ledgers = self
210 .snapshots
211 .back()
212 .map_or_else(Default::default, |snapshot| snapshot.ledgers.clone());
213
214 let mut resyncs = self
215 .snapshots
216 .back()
217 .map_or_else(Default::default, |snapshot| snapshot.resyncs.clone());
218
219 if let Some(prev_snapshot) = self.snapshots.back() {
220 if prev_snapshot.synced.is_none() {
221 if let Some(kind) = prev_snapshot.ledgers.resync_kind(best_tip, root_block) {
222 resyncs.push(LedgerResyncEvent { kind, time });
223 }
224 }
225 }
226
227 self.snapshots.push_back(SyncStatsSnapshot {
228 kind,
229 best_tip_received: time,
230 synced: None,
231 ledgers,
232 blocks: vec![best_tip_block_state],
233 resyncs,
234 });
235
236 self
237 }
238
239 pub fn ledger(&mut self, kind: SyncLedgerTargetKind, update: SyncingLedger) -> &mut Self {
240 let Some(mut snapshot) = self.snapshots.pop_back() else {
241 return self;
242 };
243 let ledger = snapshot.ledgers.get_or_insert(kind);
244
245 match update {
246 SyncingLedger::Init {
247 snarked_ledger_hash,
248 staged_ledger_hash,
249 } => {
250 ledger.snarked.hash = Some(snarked_ledger_hash);
251 ledger.staged.hash = staged_ledger_hash;
252
253 if let Some(prev_sync) = &self.snapshots.back().and_then(|s| s.ledgers.get(kind)) {
254 if prev_sync.snarked.hash == ledger.snarked.hash {
255 ledger.snarked = prev_sync.snarked.clone();
256 }
257
258 if prev_sync.staged.hash == ledger.staged.hash {
259 ledger.staged = prev_sync.staged.clone();
260 }
261 }
262 }
263 SyncingLedger::FetchHashes { start, end } => {
264 ledger.snarked.fetch_hashes_start.get_or_insert(start);
265 let cur_end = ledger.snarked.fetch_hashes_end.get_or_insert(end);
266 *cur_end = end.max(*cur_end);
267 }
268 SyncingLedger::FetchAccounts { start, end } => {
269 ledger.snarked.fetch_accounts_start.get_or_insert(start);
270 let cur_end = ledger.snarked.fetch_accounts_end.get_or_insert(end);
271 *cur_end = end.max(*cur_end);
272 }
273 SyncingLedger::FetchParts { start, end } => {
274 ledger.staged.fetch_parts_start.get_or_insert(start);
275 if let Some(end) = end {
276 let cur_end = ledger.staged.fetch_parts_end.get_or_insert(end);
277 *cur_end = end.max(*cur_end);
278 }
279 }
280 SyncingLedger::ApplyParts { start, end } => {
281 ledger.staged.reconstruct_start.get_or_insert(start);
282 if let Some(end) = end {
283 let cur_end = ledger.staged.reconstruct_end.get_or_insert(end);
284 *cur_end = end.max(*cur_end);
285 }
286 }
287 }
288
289 self.snapshots.push_back(snapshot);
290
291 self
292 }
293
294 pub fn blocks_init(&mut self, states: &[TransitionFrontierSyncBlockState]) -> &mut Self {
295 let Some(snapshot) = self.snapshots.back_mut() else {
296 return self;
297 };
298 let Some((_root_height, best_tip_height)) = states
299 .last()
300 .and_then(|s| s.block())
301 .map(|b| (b.root_block_height(), b.height()))
302 else {
303 return self;
304 };
305
306 snapshot.blocks = states
307 .iter()
308 .rev()
309 .enumerate()
313 .map(|(i, s)| {
314 let height = best_tip_height.checked_sub(i as u32).expect("underflow");
315 let hash = s.block_hash().clone();
316 let pred_hash = s
317 .block()
318 .map(|b| b.pred_hash())
319 .unwrap_or_else(|| {
320 states
321 .get(
322 states
323 .len()
324 .saturating_sub(i)
325 .checked_sub(2)
326 .expect("underflow"),
327 )
328 .unwrap() .block_hash()
330 })
331 .clone();
332 let mut stats = SyncBlock::new(height, hash, pred_hash);
333 stats.update_with_block_state(s);
334 stats
335 })
336 .collect();
337
338 self
339 }
340
341 pub fn block_update(&mut self, state: &TransitionFrontierSyncBlockState) -> &mut Self {
342 let Some(snapshot) = self.snapshots.back_mut() else {
343 return self;
344 };
345 let block_hash = state.block_hash();
346 let Some(stats) = snapshot.blocks.iter_mut().find(|b| &b.hash == block_hash) else {
347 return self;
348 };
349 stats.update_with_block_state(state);
350 self
351 }
352
353 pub fn synced(&mut self, time: Timestamp) -> &mut Self {
354 let Some(snapshot) = self.snapshots.back_mut() else {
355 return self;
356 };
357 snapshot.synced = Some(time);
358 self
359 }
360
361 pub fn collect_stats(&self, limit: Option<usize>) -> Vec<SyncStatsSnapshot> {
362 let limit = limit.unwrap_or(usize::MAX);
363 self.snapshots.iter().rev().take(limit).cloned().collect()
364 }
365
366 pub fn staging_ledger_fetch_failure(&mut self, error: String, time: Timestamp) {
367 if let Some(snapshot) = self.snapshots.back_mut() {
368 snapshot.resyncs.push(LedgerResyncEvent {
369 kind: LedgerResyncKind::FetchStagedLedgerError(error),
370 time,
371 })
372 }
373 }
374}
375
376impl SyncLedgers {
377 pub fn get(&self, kind: SyncLedgerTargetKind) -> Option<&SyncLedger> {
378 match kind {
379 SyncLedgerTargetKind::StakingEpoch => self.staking_epoch.as_ref(),
380 SyncLedgerTargetKind::NextEpoch => self.next_epoch.as_ref(),
381 SyncLedgerTargetKind::Root => self.root.as_ref(),
382 }
383 }
384
385 fn get_or_insert(&mut self, kind: SyncLedgerTargetKind) -> &mut SyncLedger {
386 match kind {
387 SyncLedgerTargetKind::StakingEpoch => {
388 self.staking_epoch.get_or_insert_with(Default::default)
389 }
390 SyncLedgerTargetKind::NextEpoch => self.next_epoch.get_or_insert_with(Default::default),
391 SyncLedgerTargetKind::Root => self.root.get_or_insert_with(Default::default),
392 }
393 }
394}
395
396impl SyncBlock {
397 pub fn new(height: u32, hash: StateHash, pred_hash: StateHash) -> Self {
398 Self {
399 global_slot: None,
400 height,
401 hash,
402 pred_hash,
403 status: SyncBlockStatus::Missing,
404 fetch_start: None,
405 fetch_end: None,
406 apply_start: None,
407 apply_end: None,
408 }
409 }
410
411 pub fn update_with_block_state(&mut self, state: &TransitionFrontierSyncBlockState) {
412 match state {
413 TransitionFrontierSyncBlockState::FetchPending { attempts, .. } => {
414 if let Some(time) = attempts
415 .iter()
416 .filter_map(|(_, v)| v.fetch_pending_since())
417 .min()
418 {
419 self.status = SyncBlockStatus::Fetching;
420 self.fetch_start.get_or_insert(time);
421 } else {
422 self.status = SyncBlockStatus::Missing;
423 }
424 }
425 TransitionFrontierSyncBlockState::FetchSuccess { time, block, .. } => {
426 self.global_slot.get_or_insert_with(|| block.global_slot());
427 self.status = SyncBlockStatus::Fetched;
428 self.fetch_end = Some(*time);
429 }
430 TransitionFrontierSyncBlockState::ApplyPending { time, block, .. } => {
431 self.global_slot.get_or_insert_with(|| block.global_slot());
432 self.status = SyncBlockStatus::Applying;
433 self.apply_start = Some(*time);
434 }
435 TransitionFrontierSyncBlockState::ApplyError { time, block, .. } => {
436 self.global_slot.get_or_insert_with(|| block.global_slot());
437 self.status = SyncBlockStatus::ApplyFailed;
438 self.apply_end = Some(*time);
439 }
440 TransitionFrontierSyncBlockState::ApplySuccess { time, block, .. } => {
441 self.global_slot.get_or_insert_with(|| block.global_slot());
442 self.status = SyncBlockStatus::Applied;
443 self.apply_end = Some(*time);
444 }
445 }
446 }
447}