1use std::collections::VecDeque;
2
3use mina_core::block::ArcBlockWithHash;
4use mina_p2p_messages::v2::{LedgerHash, StateHash};
5use redux::Timestamp;
6use serde::{Deserialize, Serialize};
7
8use crate::transition_frontier::sync::{
9 ledger::SyncLedgerTargetKind, TransitionFrontierSyncBlockState,
10};
11
12const MAX_SNAPSHOTS_LEN: usize = 256;
13
14#[derive(Default)]
15pub struct SyncStats {
16 snapshots: VecDeque<SyncStatsSnapshot>,
17}
18
19#[derive(Serialize, Deserialize, Debug, Clone)]
20#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
21pub struct SyncStatsSnapshot {
22 pub kind: SyncKind,
23 pub best_tip_received: Timestamp,
24 pub synced: Option<Timestamp>,
25 pub ledgers: SyncLedgers,
26 pub blocks: Vec<SyncBlock>,
27 pub resyncs: Vec<LedgerResyncEvent>,
28}
29
30#[derive(Serialize, Deserialize, Debug, Clone)]
31#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
32pub enum SyncKind {
33 Bootstrap,
34 Catchup,
35}
36
37#[derive(Serialize, Deserialize, Debug, Clone)]
38#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
39pub enum LedgerResyncKind {
40 FetchStagedLedgerError(String),
41 RootLedgerChange,
42 EpochChange,
43 BestChainChange,
44}
45
46#[derive(Serialize, Deserialize, Debug, Clone)]
47#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
48pub struct LedgerResyncEvent {
49 pub kind: LedgerResyncKind,
50 pub time: Timestamp,
51}
52
53#[derive(Serialize, Deserialize, Debug, Default, Clone)]
54#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
55pub struct SyncLedgers {
56 pub staking_epoch: Option<SyncLedger>,
57 pub next_epoch: Option<SyncLedger>,
58 pub root: Option<SyncLedger>,
59}
60
61impl SyncLedgers {
62 fn resync_kind(
64 &self,
65 best_tip: &ArcBlockWithHash,
66 root_block: &ArcBlockWithHash,
67 ) -> Option<LedgerResyncKind> {
68 let consensus_state = &best_tip.block.header.protocol_state.body.consensus_state;
69 let new_staking_epoch_ledger_hash = &consensus_state.staking_epoch_data.ledger.hash;
70 let new_root_ledger_hash = root_block.snarked_ledger_hash();
71
72 let staking_epoch_ledger_changed = self
73 .staking_epoch
74 .as_ref()
75 .and_then(|sync| sync.snarked.hash.as_ref())
76 .map(|prev_staking_epoch_ledger_hash| {
77 prev_staking_epoch_ledger_hash != new_staking_epoch_ledger_hash
78 })
79 .unwrap_or(false);
80
81 if let Some(prev_next_epoch_snarked_hash) = self
82 .next_epoch
83 .as_ref()
84 .and_then(|sync| sync.snarked.hash.as_ref())
85 {
86 if prev_next_epoch_snarked_hash == new_staking_epoch_ledger_hash {
87 return Some(LedgerResyncKind::EpochChange);
89 } else if staking_epoch_ledger_changed {
90 return Some(LedgerResyncKind::BestChainChange);
92 }
93 }
94
95 if let Some(prev_root_ledger_hash) = self
96 .root
97 .as_ref()
98 .and_then(|sync| sync.snarked.hash.as_ref())
99 {
100 if prev_root_ledger_hash != new_root_ledger_hash {
101 return Some(LedgerResyncKind::RootLedgerChange);
102 }
103 }
104
105 None
106 }
107}
108
109#[derive(Serialize, Deserialize, Debug, Default, Clone)]
110#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
111pub struct SyncLedger {
112 pub snarked: SyncSnarkedLedger,
113 pub staged: SyncStagedLedger,
114}
115
116#[derive(Serialize, Deserialize, Debug, Default, Clone)]
117#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
118pub struct SyncSnarkedLedger {
119 #[cfg_attr(feature = "openapi", schema(value_type = Option<String>))]
121 pub hash: Option<LedgerHash>,
122 pub fetch_hashes_start: Option<Timestamp>,
123 pub fetch_hashes_end: Option<Timestamp>,
124 pub fetch_accounts_start: Option<Timestamp>,
125 pub fetch_accounts_end: Option<Timestamp>,
126}
127
128#[derive(Serialize, Deserialize, Debug, Default, Clone)]
129#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
130pub struct SyncStagedLedger {
131 #[cfg_attr(feature = "openapi", schema(value_type = Option<String>))]
133 pub hash: Option<LedgerHash>,
134 pub fetch_parts_start: Option<Timestamp>,
135 pub fetch_parts_end: Option<Timestamp>,
136 pub reconstruct_start: Option<Timestamp>,
137 pub reconstruct_end: Option<Timestamp>,
138}
139
140#[derive(Serialize, Deserialize, Debug, Clone)]
141#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
142pub struct SyncBlock {
143 pub global_slot: Option<u32>,
144 pub height: u32,
145 pub hash: StateHash,
146 pub pred_hash: StateHash,
147 pub status: SyncBlockStatus,
148 pub fetch_start: Option<Timestamp>,
149 pub fetch_end: Option<Timestamp>,
150 pub apply_start: Option<Timestamp>,
151 pub apply_end: Option<Timestamp>,
152}
153
154#[derive(Serialize, Deserialize, Debug, Clone)]
155#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
156pub enum SyncBlockStatus {
157 Missing,
158 Fetching,
159 Fetched,
160 Applying,
161 ApplyFailed,
162 Applied,
163}
164
165pub enum SyncingLedger {
166 Init {
167 snarked_ledger_hash: LedgerHash,
168 staged_ledger_hash: Option<LedgerHash>,
169 },
170 FetchHashes {
171 start: Timestamp,
172 end: Timestamp,
173 },
174 FetchAccounts {
175 start: Timestamp,
176 end: Timestamp,
177 },
178 FetchParts {
179 start: Timestamp,
180 end: Option<Timestamp>,
181 },
182 ApplyParts {
183 start: Timestamp,
184 end: Option<Timestamp>,
185 },
186}
187
188impl SyncStats {
189 pub fn new_target(
190 &mut self,
191 time: Timestamp,
192 best_tip: &ArcBlockWithHash,
193 root_block: &ArcBlockWithHash,
194 ) -> &mut Self {
195 let kind = match self
196 .snapshots
197 .back()
198 .is_none_or(|s| matches!(s.kind, SyncKind::Bootstrap) && s.synced.is_none())
199 {
200 true => SyncKind::Bootstrap,
201 false => SyncKind::Catchup,
202 };
203 let best_tip_block_state = SyncBlock {
204 global_slot: Some(best_tip.global_slot()),
205 height: best_tip.height(),
206 hash: best_tip.hash().clone(),
207 pred_hash: best_tip.pred_hash().clone(),
208 status: SyncBlockStatus::Fetched,
209 fetch_start: None,
210 fetch_end: None,
211 apply_start: None,
212 apply_end: None,
213 };
214
215 if self.snapshots.len() >= MAX_SNAPSHOTS_LEN {
216 self.snapshots.pop_front();
217 }
218
219 let ledgers = self
224 .snapshots
225 .back()
226 .map_or_else(Default::default, |snapshot| snapshot.ledgers.clone());
227
228 let mut resyncs = self
229 .snapshots
230 .back()
231 .map_or_else(Default::default, |snapshot| snapshot.resyncs.clone());
232
233 if let Some(prev_snapshot) = self.snapshots.back() {
234 if prev_snapshot.synced.is_none() {
235 if let Some(kind) = prev_snapshot.ledgers.resync_kind(best_tip, root_block) {
236 resyncs.push(LedgerResyncEvent { kind, time });
237 }
238 }
239 }
240
241 self.snapshots.push_back(SyncStatsSnapshot {
242 kind,
243 best_tip_received: time,
244 synced: None,
245 ledgers,
246 blocks: vec![best_tip_block_state],
247 resyncs,
248 });
249
250 self
251 }
252
253 pub fn ledger(&mut self, kind: SyncLedgerTargetKind, update: SyncingLedger) -> &mut Self {
254 let Some(mut snapshot) = self.snapshots.pop_back() else {
255 return self;
256 };
257 let ledger = snapshot.ledgers.get_or_insert(kind);
258
259 match update {
260 SyncingLedger::Init {
261 snarked_ledger_hash,
262 staged_ledger_hash,
263 } => {
264 ledger.snarked.hash = Some(snarked_ledger_hash);
265 ledger.staged.hash = staged_ledger_hash;
266
267 if let Some(prev_sync) = &self.snapshots.back().and_then(|s| s.ledgers.get(kind)) {
268 if prev_sync.snarked.hash == ledger.snarked.hash {
269 ledger.snarked = prev_sync.snarked.clone();
270 }
271
272 if prev_sync.staged.hash == ledger.staged.hash {
273 ledger.staged = prev_sync.staged.clone();
274 }
275 }
276 }
277 SyncingLedger::FetchHashes { start, end } => {
278 ledger.snarked.fetch_hashes_start.get_or_insert(start);
279 let cur_end = ledger.snarked.fetch_hashes_end.get_or_insert(end);
280 *cur_end = end.max(*cur_end);
281 }
282 SyncingLedger::FetchAccounts { start, end } => {
283 ledger.snarked.fetch_accounts_start.get_or_insert(start);
284 let cur_end = ledger.snarked.fetch_accounts_end.get_or_insert(end);
285 *cur_end = end.max(*cur_end);
286 }
287 SyncingLedger::FetchParts { start, end } => {
288 ledger.staged.fetch_parts_start.get_or_insert(start);
289 if let Some(end) = end {
290 let cur_end = ledger.staged.fetch_parts_end.get_or_insert(end);
291 *cur_end = end.max(*cur_end);
292 }
293 }
294 SyncingLedger::ApplyParts { start, end } => {
295 ledger.staged.reconstruct_start.get_or_insert(start);
296 if let Some(end) = end {
297 let cur_end = ledger.staged.reconstruct_end.get_or_insert(end);
298 *cur_end = end.max(*cur_end);
299 }
300 }
301 }
302
303 self.snapshots.push_back(snapshot);
304
305 self
306 }
307
308 pub fn blocks_init(&mut self, states: &[TransitionFrontierSyncBlockState]) -> &mut Self {
309 let Some(snapshot) = self.snapshots.back_mut() else {
310 return self;
311 };
312 let Some((_root_height, best_tip_height)) = states
313 .last()
314 .and_then(|s| s.block())
315 .map(|b| (b.root_block_height(), b.height()))
316 else {
317 return self;
318 };
319
320 snapshot.blocks = states
321 .iter()
322 .rev()
323 .enumerate()
327 .map(|(i, s)| {
328 let height = best_tip_height.checked_sub(i as u32).expect("underflow");
329 let hash = s.block_hash().clone();
330 let pred_hash = s
331 .block()
332 .map(|b| b.pred_hash())
333 .unwrap_or_else(|| {
334 states
335 .get(
336 states
337 .len()
338 .saturating_sub(i)
339 .checked_sub(2)
340 .expect("underflow"),
341 )
342 .unwrap() .block_hash()
344 })
345 .clone();
346 let mut stats = SyncBlock::new(height, hash, pred_hash);
347 stats.update_with_block_state(s);
348 stats
349 })
350 .collect();
351
352 self
353 }
354
355 pub fn block_update(&mut self, state: &TransitionFrontierSyncBlockState) -> &mut Self {
356 let Some(snapshot) = self.snapshots.back_mut() else {
357 return self;
358 };
359 let block_hash = state.block_hash();
360 let Some(stats) = snapshot.blocks.iter_mut().find(|b| &b.hash == block_hash) else {
361 return self;
362 };
363 stats.update_with_block_state(state);
364 self
365 }
366
367 pub fn synced(&mut self, time: Timestamp) -> &mut Self {
368 let Some(snapshot) = self.snapshots.back_mut() else {
369 return self;
370 };
371 snapshot.synced = Some(time);
372 self
373 }
374
375 pub fn collect_stats(&self, limit: Option<usize>) -> Vec<SyncStatsSnapshot> {
376 let limit = limit.unwrap_or(usize::MAX);
377 self.snapshots.iter().rev().take(limit).cloned().collect()
378 }
379
380 pub fn staging_ledger_fetch_failure(&mut self, error: String, time: Timestamp) {
381 if let Some(snapshot) = self.snapshots.back_mut() {
382 snapshot.resyncs.push(LedgerResyncEvent {
383 kind: LedgerResyncKind::FetchStagedLedgerError(error),
384 time,
385 })
386 }
387 }
388}
389
390impl SyncLedgers {
391 pub fn get(&self, kind: SyncLedgerTargetKind) -> Option<&SyncLedger> {
392 match kind {
393 SyncLedgerTargetKind::StakingEpoch => self.staking_epoch.as_ref(),
394 SyncLedgerTargetKind::NextEpoch => self.next_epoch.as_ref(),
395 SyncLedgerTargetKind::Root => self.root.as_ref(),
396 }
397 }
398
399 fn get_or_insert(&mut self, kind: SyncLedgerTargetKind) -> &mut SyncLedger {
400 match kind {
401 SyncLedgerTargetKind::StakingEpoch => {
402 self.staking_epoch.get_or_insert_with(Default::default)
403 }
404 SyncLedgerTargetKind::NextEpoch => self.next_epoch.get_or_insert_with(Default::default),
405 SyncLedgerTargetKind::Root => self.root.get_or_insert_with(Default::default),
406 }
407 }
408}
409
410impl SyncBlock {
411 pub fn new(height: u32, hash: StateHash, pred_hash: StateHash) -> Self {
412 Self {
413 global_slot: None,
414 height,
415 hash,
416 pred_hash,
417 status: SyncBlockStatus::Missing,
418 fetch_start: None,
419 fetch_end: None,
420 apply_start: None,
421 apply_end: None,
422 }
423 }
424
425 pub fn update_with_block_state(&mut self, state: &TransitionFrontierSyncBlockState) {
426 match state {
427 TransitionFrontierSyncBlockState::FetchPending { attempts, .. } => {
428 if let Some(time) = attempts
429 .iter()
430 .filter_map(|(_, v)| v.fetch_pending_since())
431 .min()
432 {
433 self.status = SyncBlockStatus::Fetching;
434 self.fetch_start.get_or_insert(time);
435 } else {
436 self.status = SyncBlockStatus::Missing;
437 }
438 }
439 TransitionFrontierSyncBlockState::FetchSuccess { time, block, .. } => {
440 self.global_slot.get_or_insert_with(|| block.global_slot());
441 self.status = SyncBlockStatus::Fetched;
442 self.fetch_end = Some(*time);
443 }
444 TransitionFrontierSyncBlockState::ApplyPending { time, block, .. } => {
445 self.global_slot.get_or_insert_with(|| block.global_slot());
446 self.status = SyncBlockStatus::Applying;
447 self.apply_start = Some(*time);
448 }
449 TransitionFrontierSyncBlockState::ApplyError { time, block, .. } => {
450 self.global_slot.get_or_insert_with(|| block.global_slot());
451 self.status = SyncBlockStatus::ApplyFailed;
452 self.apply_end = Some(*time);
453 }
454 TransitionFrontierSyncBlockState::ApplySuccess { time, block, .. } => {
455 self.global_slot.get_or_insert_with(|| block.global_slot());
456 self.status = SyncBlockStatus::Applied;
457 self.apply_end = Some(*time);
458 }
459 }
460 }
461}