1use std::collections::BTreeMap;
2
3use mina_p2p_messages::v2::{self, LedgerHash, MinaStateProtocolStateValueStableV2, StateHash};
4use openmina_core::block::{AppliedBlock, ArcBlockWithHash};
5use redux::Timestamp;
6use serde::{Deserialize, Serialize};
7use strum_macros::Display;
8
9use crate::p2p::{channels::rpc::P2pRpcId, PeerId};
10
11use super::{
12 ledger::{SyncLedgerTarget, SyncLedgerTargetKind, TransitionFrontierSyncLedgerState},
13 PeerBlockFetchError,
14};
15
16#[derive(Serialize, Deserialize, Display, Debug, Clone)]
17pub enum TransitionFrontierSyncState {
18 Idle,
19 Init {
20 time: Timestamp,
21 best_tip: ArcBlockWithHash,
22 root_block: ArcBlockWithHash,
23 blocks_inbetween: Vec<StateHash>,
24 },
25 StakingLedgerPending(TransitionFrontierSyncLedgerPending),
26 StakingLedgerSuccess {
27 time: Timestamp,
28 best_tip: ArcBlockWithHash,
29 root_block: ArcBlockWithHash,
30 blocks_inbetween: Vec<StateHash>,
31 needed_protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
32 },
33 NextEpochLedgerPending(TransitionFrontierSyncLedgerPending),
34 NextEpochLedgerSuccess {
35 time: Timestamp,
36 best_tip: ArcBlockWithHash,
37 root_block: ArcBlockWithHash,
38 blocks_inbetween: Vec<StateHash>,
39 needed_protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
40 },
41 RootLedgerPending(TransitionFrontierSyncLedgerPending),
42 RootLedgerSuccess {
43 time: Timestamp,
44 best_tip: ArcBlockWithHash,
45 root_block: ArcBlockWithHash,
46 blocks_inbetween: Vec<StateHash>,
47 root_block_updates: Vec<ArcBlockWithHash>,
48 needed_protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
49 },
50 BlocksPending {
51 time: Timestamp,
52 chain: Vec<TransitionFrontierSyncBlockState>,
53 root_snarked_ledger_updates: TransitionFrontierRootSnarkedLedgerUpdates,
61 needed_protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
62 },
63 BlocksSuccess {
64 time: Timestamp,
65 chain: Vec<AppliedBlock>,
66 root_snarked_ledger_updates: TransitionFrontierRootSnarkedLedgerUpdates,
67 needed_protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
68 },
69 CommitPending {
70 time: Timestamp,
71 chain: Vec<AppliedBlock>,
72 root_snarked_ledger_updates: TransitionFrontierRootSnarkedLedgerUpdates,
73 needed_protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
74 },
75 CommitSuccess {
76 time: Timestamp,
77 chain: Vec<AppliedBlock>,
78 root_snarked_ledger_updates: TransitionFrontierRootSnarkedLedgerUpdates,
79 needed_protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
80 },
81 Synced {
82 time: Timestamp,
83 },
84}
85
86#[derive(Serialize, Deserialize, Debug, Clone)]
87pub struct TransitionFrontierSyncLedgerPending {
88 pub time: Timestamp,
89 pub best_tip: ArcBlockWithHash,
90 pub root_block: ArcBlockWithHash,
91 pub blocks_inbetween: Vec<StateHash>,
92 pub root_block_updates: Vec<ArcBlockWithHash>,
93 pub ledger: TransitionFrontierSyncLedgerState,
94}
95
96#[derive(Serialize, Deserialize, Debug, Default, Clone)]
97pub struct TransitionFrontierRootSnarkedLedgerUpdates(
98 BTreeMap<LedgerHash, TransitionFrontierRootSnarkedLedgerUpdate>,
99);
100
101#[derive(Serialize, Deserialize, Debug, Clone)]
102pub struct TransitionFrontierRootSnarkedLedgerUpdate {
103 pub parent: LedgerHash,
104 pub staged_ledger_hash: v2::MinaBaseStagedLedgerHashStableV1,
109}
110
111#[derive(Serialize, Deserialize, Debug, Clone)]
112pub enum TransitionFrontierSyncBlockState {
113 FetchPending {
114 time: Timestamp,
115 block_hash: StateHash,
116 attempts: BTreeMap<PeerId, PeerRpcState>,
117 },
118 FetchSuccess {
119 time: Timestamp,
120 block: ArcBlockWithHash,
121 },
122 ApplyPending {
123 time: Timestamp,
124 block: ArcBlockWithHash,
125 },
126 ApplyError {
127 time: Timestamp,
128 block: ArcBlockWithHash,
129 error: String,
130 },
131 ApplySuccess {
132 time: Timestamp,
133 block: AppliedBlock,
134 },
135}
136
137#[derive(Serialize, Deserialize, Debug, Clone)]
138pub enum PeerRpcState {
139 Init {
140 time: Timestamp,
141 },
142 Pending {
143 time: Timestamp,
144 rpc_id: P2pRpcId,
145 },
146 Error {
147 time: Timestamp,
148 rpc_id: P2pRpcId,
149 error: PeerBlockFetchError,
150 },
151 Success {
152 time: Timestamp,
153 block: ArcBlockWithHash,
154 },
155}
156
157#[derive(Serialize, Deserialize, Display, Debug, Clone)]
158pub enum SyncPhase {
159 Bootstrap,
160 Catchup,
161 Synced,
162}
163
164impl TransitionFrontierSyncState {
165 pub fn is_pending(&self) -> bool {
167 !matches!(self, Self::Idle | Self::Synced { .. })
168 }
169
170 pub fn is_commit_pending(&self) -> bool {
171 matches!(self, Self::CommitPending { .. })
172 }
173
174 pub fn is_synced(&self) -> bool {
176 matches!(self, Self::Synced { .. })
177 }
178
179 pub fn time(&self) -> Option<redux::Timestamp> {
180 match self {
181 Self::Idle => None,
182 Self::Init { time, .. } => Some(*time),
183 Self::StakingLedgerPending(s) => Some(s.time),
184 Self::StakingLedgerSuccess { time, .. } => Some(*time),
185 Self::NextEpochLedgerPending(s) => Some(s.time),
186 Self::NextEpochLedgerSuccess { time, .. } => Some(*time),
187 Self::RootLedgerPending(s) => Some(s.time),
188 Self::RootLedgerSuccess { time, .. } => Some(*time),
189 Self::BlocksPending { time, .. } => Some(*time),
190 Self::BlocksSuccess { time, .. } => Some(*time),
191 Self::CommitPending { time, .. } => Some(*time),
192 Self::CommitSuccess { time, .. } => Some(*time),
193 Self::Synced { time, .. } => Some(*time),
194 }
195 }
196
197 pub fn root_block(&self) -> Option<&ArcBlockWithHash> {
198 match self {
199 Self::Idle => None,
200 Self::Init { root_block, .. } => Some(root_block),
201 Self::StakingLedgerPending(s) => Some(&s.root_block),
202 Self::StakingLedgerSuccess { root_block, .. } => Some(root_block),
203 Self::NextEpochLedgerPending(s) => Some(&s.root_block),
204 Self::NextEpochLedgerSuccess { root_block, .. } => Some(root_block),
205 Self::RootLedgerPending(s) => Some(&s.root_block),
206 Self::RootLedgerSuccess { root_block, .. } => Some(root_block),
207 Self::BlocksPending { chain, .. } => chain.first().and_then(|b| b.block()),
208 Self::BlocksSuccess { chain, .. } => chain.first().map(AppliedBlock::block_with_hash),
209 Self::CommitPending { chain, .. } => chain.first().map(AppliedBlock::block_with_hash),
210 Self::CommitSuccess { chain, .. } => chain.first().map(AppliedBlock::block_with_hash),
211 Self::Synced { .. } => None,
212 }
213 }
214
215 pub fn best_tip(&self) -> Option<&ArcBlockWithHash> {
216 match self {
217 Self::Idle => None,
218 Self::Init { best_tip, .. } => Some(best_tip),
219 Self::StakingLedgerPending(s) => Some(&s.best_tip),
220 Self::StakingLedgerSuccess { best_tip, .. } => Some(best_tip),
221 Self::NextEpochLedgerPending(s) => Some(&s.best_tip),
222 Self::NextEpochLedgerSuccess { best_tip, .. } => Some(best_tip),
223 Self::RootLedgerPending(s) => Some(&s.best_tip),
224 Self::RootLedgerSuccess { best_tip, .. } => Some(best_tip),
225 Self::BlocksPending { chain, .. } => chain.last().and_then(|b| b.block()),
226 Self::BlocksSuccess { chain, .. } => chain.last().map(AppliedBlock::block_with_hash),
227 Self::CommitPending { chain, .. } => chain.last().map(AppliedBlock::block_with_hash),
228 Self::CommitSuccess { chain, .. } => chain.last().map(AppliedBlock::block_with_hash),
229 Self::Synced { .. } => None,
230 }
231 }
232
233 pub fn ledger(&self) -> Option<&TransitionFrontierSyncLedgerState> {
234 match self {
235 Self::StakingLedgerPending(s) => Some(&s.ledger),
236 Self::NextEpochLedgerPending(s) => Some(&s.ledger),
237 Self::RootLedgerPending(s) => Some(&s.ledger),
238 _ => None,
239 }
240 }
241
242 pub fn ledger_mut(&mut self) -> Option<&mut TransitionFrontierSyncLedgerState> {
243 match self {
244 Self::StakingLedgerPending(s) => Some(&mut s.ledger),
245 Self::NextEpochLedgerPending(s) => Some(&mut s.ledger),
246 Self::RootLedgerPending(s) => Some(&mut s.ledger),
247 _ => None,
248 }
249 }
250
251 pub fn ledger_target(&self) -> Option<SyncLedgerTarget> {
252 self.ledger().map(|s| s.target())
253 }
254
255 pub fn ledger_target_kind(&self) -> Option<SyncLedgerTargetKind> {
256 self.ledger().map(|s| s.target_kind())
257 }
258
259 pub fn is_ledger_sync_complete(&self) -> bool {
265 match self {
266 Self::StakingLedgerPending(s) => s.ledger.is_snarked_ledger_synced(),
267 Self::NextEpochLedgerPending(s) => s.ledger.is_snarked_ledger_synced(),
268 Self::RootLedgerPending(s) => s.ledger.staged().is_some_and(|s| s.is_success()),
269 _ => false,
270 }
271 }
272
273 pub fn blocks_iter(&self) -> impl Iterator<Item = &TransitionFrontierSyncBlockState> {
274 static EMPTY: Vec<TransitionFrontierSyncBlockState> = Vec::new();
275 match self {
276 Self::BlocksPending { chain, .. } => chain.iter(),
277 _ => EMPTY.iter(),
278 }
279 }
280
281 pub fn pending_count(&self) -> usize {
282 self.blocks_iter()
283 .filter(|b| !matches!(b, TransitionFrontierSyncBlockState::ApplySuccess { .. }))
284 .count()
285 }
286
287 pub fn blocks_fetch_retry_iter(&self) -> impl '_ + Iterator<Item = StateHash> {
288 self.blocks_iter().filter_map(|s| s.retry_hash()).cloned()
289 }
290
291 pub fn blocks_fetch_next(&self) -> Option<StateHash> {
292 self.blocks_iter().find_map(|s| match s {
293 TransitionFrontierSyncBlockState::FetchPending {
294 block_hash,
295 attempts,
296 ..
297 } => Some(block_hash).filter(|_| attempts.is_empty()).cloned(),
298 _ => None,
299 })
300 }
301
302 pub fn block_state(&self, hash: &StateHash) -> Option<&TransitionFrontierSyncBlockState> {
303 self.blocks_iter().find(|s| s.block_hash() == hash)
304 }
305
306 pub fn block_state_mut(
307 &mut self,
308 hash: &StateHash,
309 ) -> Option<&mut TransitionFrontierSyncBlockState> {
310 match self {
311 Self::BlocksPending { chain, .. } => chain.iter_mut().find(|s| s.block_hash() == hash),
312 _ => None,
313 }
314 }
315
316 pub fn is_fetch_pending_from_peer(
317 &self,
318 hash: &StateHash,
319 peer_id: &PeerId,
320 rpc_id: P2pRpcId,
321 ) -> bool {
322 self.block_state(hash)
323 .is_some_and(|s| s.is_fetch_pending_from_peer(peer_id, rpc_id))
324 }
325
326 pub fn blocks_fetch_from_peer_pending_rpc_ids<'a>(
327 &'a self,
328 peer_id: &'a PeerId,
329 ) -> impl 'a + Iterator<Item = P2pRpcId> {
330 self.blocks_iter()
331 .filter_map(|b| b.fetch_pending_from_peer_rpc_id(peer_id))
332 }
333
334 pub fn blocks_apply_pending(&self) -> Option<&ArcBlockWithHash> {
335 self.blocks_iter()
336 .find(|s| s.is_apply_pending())
337 .and_then(|s| s.block())
338 }
339
340 pub fn blocks_apply_next(&self) -> Option<(&ArcBlockWithHash, &AppliedBlock)> {
341 let mut last_applied = None;
342 for s in self.blocks_iter() {
343 if s.is_apply_success() {
344 last_applied = s.applied_block();
345 } else if s.is_fetch_success() {
346 return Some((s.block()?, last_applied?));
347 } else {
348 return None;
349 }
350 }
351 None
352 }
353
354 pub fn sync_phase(&self) -> SyncPhase {
355 match self {
356 TransitionFrontierSyncState::Idle
357 | TransitionFrontierSyncState::Init { .. }
358 | TransitionFrontierSyncState::StakingLedgerPending(_)
359 | TransitionFrontierSyncState::StakingLedgerSuccess { .. }
360 | TransitionFrontierSyncState::NextEpochLedgerPending(_)
361 | TransitionFrontierSyncState::NextEpochLedgerSuccess { .. }
362 | TransitionFrontierSyncState::RootLedgerPending(_)
363 | TransitionFrontierSyncState::RootLedgerSuccess { .. } => SyncPhase::Bootstrap,
364 TransitionFrontierSyncState::BlocksPending { .. }
365 | TransitionFrontierSyncState::BlocksSuccess { .. }
366 | TransitionFrontierSyncState::CommitPending { .. }
367 | TransitionFrontierSyncState::CommitSuccess { .. } => SyncPhase::Catchup,
368 TransitionFrontierSyncState::Synced { .. } => SyncPhase::Synced,
369 }
370 }
371}
372
373impl TransitionFrontierSyncBlockState {
374 pub fn is_fetch_success(&self) -> bool {
375 matches!(self, Self::FetchSuccess { .. })
376 }
377
378 pub fn is_apply_pending(&self) -> bool {
379 matches!(self, Self::ApplyPending { .. })
380 }
381
382 pub fn is_apply_error(&self) -> bool {
383 matches!(self, Self::ApplyError { .. })
384 }
385
386 pub fn is_apply_success(&self) -> bool {
387 matches!(self, Self::ApplySuccess { .. })
388 }
389
390 pub fn block_hash(&self) -> &StateHash {
391 match self {
392 Self::FetchPending { block_hash, .. } => block_hash,
393 Self::FetchSuccess { block, .. }
394 | Self::ApplyPending { block, .. }
395 | Self::ApplyError { block, .. } => &block.hash,
396 Self::ApplySuccess { block, .. } => block.hash(),
397 }
398 }
399
400 pub fn block(&self) -> Option<&ArcBlockWithHash> {
401 match self {
402 Self::FetchPending { .. } => None,
403 Self::FetchSuccess { block, .. }
404 | Self::ApplyPending { block, .. }
405 | Self::ApplyError { block, .. } => Some(block),
406 Self::ApplySuccess { block, .. } => Some(block.block_with_hash()),
407 }
408 }
409
410 pub fn applied_block(&self) -> Option<&AppliedBlock> {
411 match self {
412 Self::FetchPending { .. }
413 | Self::FetchSuccess { .. }
414 | Self::ApplyPending { .. }
415 | Self::ApplyError { .. } => None,
416 Self::ApplySuccess { block, .. } => Some(block),
417 }
418 }
419
420 pub fn take_block(self) -> Option<ArcBlockWithHash> {
421 match self {
422 Self::FetchPending { .. } => None,
423 Self::FetchSuccess { block, .. }
424 | Self::ApplyPending { block, .. }
425 | Self::ApplyError { block, .. } => Some(block),
426 Self::ApplySuccess { block, .. } => Some(block.block),
427 }
428 }
429
430 pub fn take_applied_block(self) -> Option<AppliedBlock> {
431 match self {
432 Self::FetchPending { .. }
433 | Self::FetchSuccess { .. }
434 | Self::ApplyPending { .. }
435 | Self::ApplyError { .. } => None,
436 Self::ApplySuccess { block, .. } => Some(block),
437 }
438 }
439
440 pub fn fetch_pending_hash(&self) -> Option<&StateHash> {
441 match self {
442 Self::FetchPending { block_hash, .. } => Some(block_hash),
443 _ => None,
444 }
445 }
446
447 pub fn retry_hash(&self) -> Option<&StateHash> {
448 let Self::FetchPending {
449 block_hash,
450 attempts,
451 ..
452 } = self
453 else {
454 return None;
455 };
456 Some(block_hash)
457 .filter(|_| !attempts.is_empty() && attempts.iter().all(|(_, s)| s.is_error()))
458 }
459
460 pub fn fetch_pending_from_peer_rpc_id(&self, peer_id: &PeerId) -> Option<P2pRpcId> {
461 let Self::FetchPending { attempts, .. } = self else {
462 return None;
463 };
464 attempts.get(peer_id).and_then(|v| v.fetch_pending_rpc_id())
465 }
466
467 pub fn is_fetch_init_from_peer(&self, peer_id: &PeerId) -> bool {
468 let Self::FetchPending { attempts, .. } = self else {
469 return false;
470 };
471 attempts.get(peer_id).is_some_and(|s| s.is_fetch_init())
472 }
473
474 pub fn is_fetch_pending_from_peer(&self, peer_id: &PeerId, rpc_id: P2pRpcId) -> bool {
475 let Self::FetchPending { attempts, .. } = self else {
476 return false;
477 };
478 attempts
479 .get(peer_id)
480 .and_then(|s| s.fetch_pending_rpc_id())
481 .is_some_and(|expected| expected == rpc_id)
482 }
483
484 pub fn fetch_pending_attempts_mut(&mut self) -> Option<&mut BTreeMap<PeerId, PeerRpcState>> {
485 match self {
486 Self::FetchPending { attempts, .. } => Some(attempts),
487 _ => None,
488 }
489 }
490
491 pub fn fetch_pending_from_peer_mut(&mut self, peer_id: &PeerId) -> Option<&mut PeerRpcState> {
492 let Self::FetchPending { attempts, .. } = self else {
493 return None;
494 };
495 attempts.get_mut(peer_id)
496 }
497
498 pub fn fetch_pending_fetched_block(&self) -> Option<&ArcBlockWithHash> {
499 let Self::FetchPending { attempts, .. } = self else {
500 return None;
501 };
502 attempts.iter().find_map(|(_, s)| s.success_block())
503 }
504}
505
506impl PeerRpcState {
507 pub fn is_fetch_init(&self) -> bool {
508 matches!(self, Self::Init { .. })
509 }
510
511 pub fn is_error(&self) -> bool {
512 matches!(self, Self::Error { .. })
513 }
514
515 pub fn is_success(&self) -> bool {
516 matches!(self, Self::Success { .. })
517 }
518
519 pub fn fetch_pending_rpc_id(&self) -> Option<P2pRpcId> {
520 match self {
521 Self::Pending { rpc_id, .. } => Some(*rpc_id),
522 _ => None,
523 }
524 }
525
526 pub fn fetch_pending_since(&self) -> Option<Timestamp> {
527 match self {
528 Self::Pending { time, .. } => Some(*time),
529 _ => None,
530 }
531 }
532
533 pub fn success_block(&self) -> Option<&ArcBlockWithHash> {
534 match self {
535 Self::Success { block, .. } => Some(block),
536 _ => None,
537 }
538 }
539}
540
541impl TransitionFrontierRootSnarkedLedgerUpdates {
542 pub fn get(
543 &self,
544 ledger_hash: &LedgerHash,
545 ) -> Option<&TransitionFrontierRootSnarkedLedgerUpdate> {
546 self.0.get(ledger_hash)
547 }
548
549 pub fn is_empty(&self) -> bool {
550 self.0.is_empty()
551 }
552
553 pub fn len(&self) -> usize {
554 self.0.len()
555 }
556
557 pub fn extend_with_needed<'a>(
559 &mut self,
560 new_root: &ArcBlockWithHash,
561 old_chain: impl 'a + IntoIterator<Item = &'a ArcBlockWithHash>,
562 ) {
563 let mut old_chain = old_chain.into_iter().peekable();
564 let Some(old_root) = old_chain.peek() else {
565 return;
566 };
567
568 let Some(diff_len) = new_root.height().checked_sub(old_root.height()) else {
569 return;
570 };
571
572 if new_root.snarked_ledger_hash() == old_root.snarked_ledger_hash() {
573 return;
574 }
575
576 self.0.extend(
577 old_chain
578 .take(diff_len as usize)
579 .collect::<Vec<_>>()
580 .into_iter()
581 .rev()
582 .scan(new_root, |last_block, b| {
583 if last_block.snarked_ledger_hash() == b.snarked_ledger_hash() {
584 *last_block = b;
585 return Some(None);
586 }
587 let last_block = std::mem::replace(last_block, b);
588 let update = TransitionFrontierRootSnarkedLedgerUpdate {
589 parent: b.snarked_ledger_hash().clone(),
590 staged_ledger_hash: last_block.staged_ledger_hashes().clone(),
591 };
592 let snarked_ledger_hash = last_block.snarked_ledger_hash().clone();
593
594 Some(Some((snarked_ledger_hash, update)))
595 })
596 .flatten(),
597 );
598 }
599}