node/transition_frontier/sync/
transition_frontier_sync_reducer.rs

1use std::collections::{BTreeMap, VecDeque};
2
3use mina_p2p_messages::v2::StateHash;
4use openmina_core::block::{AppliedBlock, ArcBlockWithHash};
5
6use super::{
7    ledger::{
8        snarked::TransitionFrontierSyncLedgerSnarkedState, SyncLedgerTarget, SyncLedgerTargetKind,
9        TransitionFrontierSyncLedgerState,
10    },
11    PeerRpcState, TransitionFrontierRootSnarkedLedgerUpdates, TransitionFrontierSyncAction,
12    TransitionFrontierSyncActionWithMetaRef, TransitionFrontierSyncBlockState,
13    TransitionFrontierSyncLedgerPending, TransitionFrontierSyncState,
14};
15
16impl TransitionFrontierSyncState {
17    pub fn reducer(
18        mut state_context: crate::Substate<Self>,
19        action: TransitionFrontierSyncActionWithMetaRef<'_>,
20        best_chain: &[AppliedBlock],
21    ) {
22        let Ok(state) = state_context.get_substate_mut() else {
23            // TODO: log or propagate
24            return;
25        };
26        let (action, meta) = action.split();
27
28        match action {
29            TransitionFrontierSyncAction::Init {
30                best_tip,
31                root_block,
32                blocks_inbetween,
33            } => {
34                *state = Self::Init {
35                    time: meta.time(),
36                    best_tip: best_tip.clone(),
37                    root_block: root_block.clone(),
38                    blocks_inbetween: blocks_inbetween.clone(),
39                };
40            }
41            // TODO(binier): refactor
42            TransitionFrontierSyncAction::BestTipUpdate {
43                previous_root_snarked_ledger_hash: _,
44                best_tip,
45                root_block,
46                blocks_inbetween,
47                ..
48            } => match state {
49                Self::StakingLedgerPending(substate)
50                | Self::NextEpochLedgerPending(substate)
51                | Self::RootLedgerPending(substate) => {
52                    substate.time = meta.time();
53                    substate.blocks_inbetween.clone_from(blocks_inbetween);
54                    let old_root_block =
55                        std::mem::replace(&mut substate.root_block, root_block.clone());
56                    let old_best_tip = std::mem::replace(&mut substate.best_tip, best_tip.clone());
57
58                    let staking_epoch_target = SyncLedgerTarget::staking_epoch(best_tip);
59                    let next_epoch_target = SyncLedgerTarget::next_epoch(best_tip, root_block);
60
61                    let new_target = if let Self::StakingLedgerPending(substate) = state {
62                        substate
63                            .ledger
64                            .update_target(meta.time(), staking_epoch_target);
65                        None
66                    } else if let Self::NextEpochLedgerPending(substate) = state {
67                        if old_best_tip.staking_epoch_ledger_hash()
68                            != best_tip.staking_epoch_ledger_hash()
69                        {
70                            Some((substate, staking_epoch_target))
71                        } else {
72                            if let Some(next_epoch_target) = next_epoch_target {
73                                substate
74                                    .ledger
75                                    .update_target(meta.time(), next_epoch_target);
76                            }
77                            None
78                        }
79                    } else if let Self::RootLedgerPending(substate) = state {
80                        if old_best_tip.staking_epoch_ledger_hash()
81                            != best_tip.staking_epoch_ledger_hash()
82                        {
83                            Some((substate, staking_epoch_target))
84                        } else if let Some(next_epoch_target) = next_epoch_target.filter(|_| {
85                            old_best_tip.next_epoch_ledger_hash()
86                                != best_tip.next_epoch_ledger_hash()
87                        }) {
88                            Some((substate, next_epoch_target))
89                        } else if substate
90                            .ledger
91                            .staged()
92                            .is_some_and(|s| s.is_parts_fetched())
93                            && root_block.pred_hash() == old_root_block.hash()
94                        {
95                            // Optimization. Prevent changing staging ledger target,
96                            // if the new root block is the extension of the old one.
97                            // Since in such case, we can reuse the old reconstructed
98                            // staged ledger to reconstruct the new one.
99                            substate.root_block_updates.push(old_root_block);
100                            None
101                        } else {
102                            substate.root_block_updates = Default::default();
103                            substate
104                                .ledger
105                                .update_target(meta.time(), SyncLedgerTarget::root(root_block));
106                            None
107                        }
108                    } else {
109                        return;
110                    };
111
112                    let Some((substate, new_target)) = new_target else {
113                        return;
114                    };
115                    let new_target_kind = new_target.kind;
116                    substate.ledger =
117                        TransitionFrontierSyncLedgerSnarkedState::pending(meta.time(), new_target)
118                            .into();
119                    *state = match new_target_kind {
120                        SyncLedgerTargetKind::StakingEpoch => {
121                            Self::StakingLedgerPending(substate.clone())
122                        }
123                        SyncLedgerTargetKind::NextEpoch => {
124                            Self::NextEpochLedgerPending(substate.clone())
125                        }
126                        SyncLedgerTargetKind::Root => Self::RootLedgerPending(substate.clone()),
127                    };
128                }
129                Self::BlocksPending {
130                    chain,
131                    root_snarked_ledger_updates,
132                    needed_protocol_states,
133                    ..
134                } => {
135                    let mut applied_blocks: BTreeMap<_, _> =
136                        best_chain.iter().map(|b| (b.hash(), b)).collect();
137
138                    let old_chain = VecDeque::from(std::mem::take(chain));
139                    let old_root = old_chain.front().and_then(|b| b.block()).unwrap().clone();
140                    let old_best_tip = old_chain.back().and_then(|b| b.block()).unwrap().clone();
141                    let new_root = root_block;
142                    let new_best_tip = best_tip;
143
144                    let old_chain_has_new_root_applied = old_chain
145                        .iter()
146                        .find(|b| b.block_hash() == &new_root.hash)
147                        .is_some_and(|b| b.is_apply_success());
148
149                    if applied_blocks.contains_key(&new_root.hash) || old_chain_has_new_root_applied
150                    {
151                        if old_chain_has_new_root_applied {
152                            root_snarked_ledger_updates.extend_with_needed(
153                                new_root,
154                                old_chain.iter().filter_map(|s| s.block()),
155                            );
156                        }
157
158                        let mut old_block_states: BTreeMap<_, _> = old_chain
159                            .into_iter()
160                            .map(|b| (b.block_hash().clone(), b))
161                            .collect();
162
163                        let mut push_block = |hash, maybe_block: Option<&ArcBlockWithHash>| {
164                            chain.push({
165                                if let Some(old_state) =
166                                    old_block_states.remove(hash).filter(|old_state| {
167                                        old_state.block().is_some() || maybe_block.is_none()
168                                    })
169                                {
170                                    old_state
171                                } else if let Some(block) = applied_blocks.remove(hash) {
172                                    TransitionFrontierSyncBlockState::ApplySuccess {
173                                        time: meta.time(),
174                                        block: block.clone(),
175                                    }
176                                } else if let Some(block) = maybe_block {
177                                    TransitionFrontierSyncBlockState::FetchSuccess {
178                                        time: meta.time(),
179                                        block: block.clone(),
180                                    }
181                                } else {
182                                    TransitionFrontierSyncBlockState::FetchPending {
183                                        time: meta.time(),
184                                        block_hash: hash.clone(),
185                                        attempts: Default::default(),
186                                    }
187                                }
188                            })
189                        };
190
191                        push_block(new_root.hash(), Some(new_root));
192                        for hash in blocks_inbetween {
193                            push_block(hash, None);
194                        }
195                        push_block(new_best_tip.hash(), Some(new_best_tip));
196
197                        needed_protocol_states.extend(old_block_states.into_iter().filter_map(
198                            |(hash, s)| {
199                                Some((hash, s.take_block()?.block.header.protocol_state.clone()))
200                            },
201                        ));
202                    } else {
203                        let cur_best_root = best_chain.first();
204                        let cur_best_tip = best_chain.last();
205                        *state = next_required_ledger_to_sync(
206                            meta.time(),
207                            cur_best_tip.map(AppliedBlock::block_with_hash),
208                            cur_best_root.map(AppliedBlock::block_with_hash),
209                            &old_best_tip,
210                            &old_root,
211                            new_best_tip,
212                            new_root,
213                            blocks_inbetween,
214                        );
215                    }
216                }
217                Self::CommitPending { .. } => {}
218                Self::CommitSuccess { .. } => {}
219                Self::Synced { time, .. } => {
220                    let applied_blocks: BTreeMap<_, _> =
221                        best_chain.iter().map(|b| (b.hash(), b)).collect();
222
223                    let old_best_tip = best_chain.last().unwrap();
224                    let old_root = best_chain.first().unwrap();
225                    let new_best_tip = best_tip;
226                    let new_root = root_block;
227
228                    if applied_blocks.contains_key(&new_root.hash) {
229                        let chain = std::iter::once(root_block.hash())
230                            .chain(blocks_inbetween)
231                            .chain(std::iter::once(new_best_tip.hash()))
232                            .map(|hash| match applied_blocks.get(hash) {
233                                Some(&block) => TransitionFrontierSyncBlockState::ApplySuccess {
234                                    time: *time,
235                                    block: block.clone(),
236                                },
237                                None if hash == new_best_tip.hash() => {
238                                    TransitionFrontierSyncBlockState::FetchSuccess {
239                                        time: meta.time(),
240                                        block: new_best_tip.clone(),
241                                    }
242                                }
243                                None => TransitionFrontierSyncBlockState::FetchPending {
244                                    time: meta.time(),
245                                    block_hash: hash.clone(),
246                                    attempts: Default::default(),
247                                },
248                            })
249                            .collect::<Vec<_>>();
250                        *state = Self::BlocksPending {
251                            time: meta.time(),
252                            chain,
253                            root_snarked_ledger_updates: Default::default(),
254                            needed_protocol_states: Default::default(),
255                        };
256                    } else {
257                        *state = next_required_ledger_to_sync(
258                            meta.time(),
259                            None,
260                            None,
261                            old_best_tip.block_with_hash(),
262                            old_root.block_with_hash(),
263                            new_best_tip,
264                            new_root,
265                            blocks_inbetween,
266                        );
267                    }
268                }
269                _ => (),
270            },
271            TransitionFrontierSyncAction::LedgerStakingPending => {
272                if let Self::Init {
273                    best_tip,
274                    root_block,
275                    blocks_inbetween,
276                    ..
277                } = state
278                {
279                    *state = Self::StakingLedgerPending(TransitionFrontierSyncLedgerPending {
280                        time: meta.time(),
281                        best_tip: best_tip.clone(),
282                        root_block: root_block.clone(),
283                        blocks_inbetween: std::mem::take(blocks_inbetween),
284                        root_block_updates: Default::default(),
285                        ledger: TransitionFrontierSyncLedgerState::Init {
286                            time: meta.time(),
287                            target: SyncLedgerTarget::staking_epoch(best_tip),
288                        },
289                    });
290                }
291            }
292            TransitionFrontierSyncAction::LedgerStakingSuccess => {
293                if let Self::StakingLedgerPending(substate) = state {
294                    let TransitionFrontierSyncLedgerState::Success {
295                        needed_protocol_states,
296                        ..
297                    } = &mut substate.ledger
298                    else {
299                        return;
300                    };
301                    *state = Self::StakingLedgerSuccess {
302                        time: meta.time(),
303                        best_tip: substate.best_tip.clone(),
304                        root_block: substate.root_block.clone(),
305                        blocks_inbetween: std::mem::take(&mut substate.blocks_inbetween),
306                        needed_protocol_states: std::mem::take(needed_protocol_states),
307                    };
308                }
309            }
310            TransitionFrontierSyncAction::LedgerNextEpochPending => {
311                let (best_tip, root_block, blocks_inbetween) = match state {
312                    Self::Init {
313                        best_tip,
314                        root_block,
315                        blocks_inbetween,
316                        ..
317                    }
318                    | Self::StakingLedgerSuccess {
319                        best_tip,
320                        root_block,
321                        blocks_inbetween,
322                        ..
323                    } => (best_tip, root_block, blocks_inbetween),
324                    _ => return,
325                };
326                let Some(target) = SyncLedgerTarget::next_epoch(best_tip, root_block) else {
327                    return;
328                };
329                *state = Self::NextEpochLedgerPending(TransitionFrontierSyncLedgerPending {
330                    time: meta.time(),
331                    best_tip: best_tip.clone(),
332                    root_block: root_block.clone(),
333                    blocks_inbetween: std::mem::take(blocks_inbetween),
334                    root_block_updates: Default::default(),
335                    ledger: TransitionFrontierSyncLedgerState::Init {
336                        time: meta.time(),
337                        target,
338                    },
339                });
340            }
341            TransitionFrontierSyncAction::LedgerNextEpochSuccess => {
342                if let Self::NextEpochLedgerPending(substate) = state {
343                    let TransitionFrontierSyncLedgerState::Success {
344                        needed_protocol_states,
345                        ..
346                    } = &mut substate.ledger
347                    else {
348                        return;
349                    };
350                    *state = Self::NextEpochLedgerSuccess {
351                        time: meta.time(),
352                        best_tip: substate.best_tip.clone(),
353                        root_block: substate.root_block.clone(),
354                        blocks_inbetween: std::mem::take(&mut substate.blocks_inbetween),
355                        needed_protocol_states: std::mem::take(needed_protocol_states),
356                    };
357                }
358            }
359            TransitionFrontierSyncAction::LedgerRootPending => {
360                let (best_tip, root_block, blocks_inbetween) = match state {
361                    Self::Init {
362                        best_tip,
363                        root_block,
364                        blocks_inbetween,
365                        ..
366                    }
367                    | Self::StakingLedgerSuccess {
368                        best_tip,
369                        root_block,
370                        blocks_inbetween,
371                        ..
372                    }
373                    | Self::NextEpochLedgerSuccess {
374                        best_tip,
375                        root_block,
376                        blocks_inbetween,
377                        ..
378                    } => (best_tip, root_block, blocks_inbetween),
379                    _ => return,
380                };
381                *state = Self::RootLedgerPending(TransitionFrontierSyncLedgerPending {
382                    time: meta.time(),
383                    best_tip: best_tip.clone(),
384                    root_block: root_block.clone(),
385                    blocks_inbetween: std::mem::take(blocks_inbetween),
386                    root_block_updates: Default::default(),
387                    ledger: TransitionFrontierSyncLedgerState::Init {
388                        time: meta.time(),
389                        target: SyncLedgerTarget::root(root_block),
390                    },
391                });
392            }
393            TransitionFrontierSyncAction::LedgerRootSuccess => {
394                if let Self::RootLedgerPending(substate) = state {
395                    let TransitionFrontierSyncLedgerState::Success {
396                        needed_protocol_states,
397                        ..
398                    } = &mut substate.ledger
399                    else {
400                        return;
401                    };
402                    *state = Self::RootLedgerSuccess {
403                        time: meta.time(),
404                        best_tip: substate.best_tip.clone(),
405                        root_block: substate.root_block.clone(),
406                        blocks_inbetween: std::mem::take(&mut substate.blocks_inbetween),
407                        root_block_updates: std::mem::take(&mut substate.root_block_updates),
408                        needed_protocol_states: std::mem::take(needed_protocol_states),
409                    };
410                }
411            }
412            TransitionFrontierSyncAction::BlocksPending => {
413                let Self::RootLedgerSuccess {
414                    best_tip,
415                    root_block,
416                    blocks_inbetween,
417                    root_block_updates,
418                    needed_protocol_states,
419                    ..
420                } = state
421                else {
422                    return;
423                };
424                let (best_tip, root_block) = (best_tip.clone(), root_block.clone());
425                let blocks_inbetween = std::mem::take(blocks_inbetween);
426                let root_block_updates = std::mem::take(root_block_updates);
427
428                let mut root_snarked_ledger_updates =
429                    TransitionFrontierRootSnarkedLedgerUpdates::default();
430                let mut root_block_updates_iter = root_block_updates.windows(2);
431                while let Some([old_root, new_root]) = root_block_updates_iter.next() {
432                    root_snarked_ledger_updates
433                        .extend_with_needed(new_root, std::iter::once(old_root));
434                }
435                root_snarked_ledger_updates
436                    .extend_with_needed(&root_block, root_block_updates.iter().rev().take(1));
437
438                let mut applied_blocks: BTreeMap<_, _> =
439                    best_chain.iter().map(|b| (b.hash(), b)).collect();
440
441                let k = best_tip.constants().k.as_u32() as usize;
442                let mut chain = Vec::with_capacity(k.saturating_add(root_block_updates.len()));
443
444                // TODO(binier): maybe time should be when we originally
445                // applied this block? Same for below.
446
447                // Root block is always applied since we have reconstructed it
448                // in previous steps.
449                let mut root_block_updates_iter = root_block_updates.into_iter();
450                // NOTE: flag for root block is always set to `false` because
451                // it is not possible to recover this info from the staging ledger reconstruction,
452                // so the value will not always be correct for the root block.
453                // The `false` value is used to be compatible with:
454                // <https://github.com/MinaProtocol/mina/blob/e975835deab303c7f48b09ec2fd0e41ec649aef6/src/lib/transition_frontier/full_frontier/full_frontier.ml#L157-L160>
455                let root_block_just_emitted_a_proof = false;
456
457                if let Some(reconstructed_root_block) = root_block_updates_iter.next() {
458                    chain.push(TransitionFrontierSyncBlockState::ApplySuccess {
459                        time: meta.time(),
460                        block: AppliedBlock {
461                            block: reconstructed_root_block.clone(),
462                            just_emitted_a_proof: root_block_just_emitted_a_proof,
463                        },
464                    });
465                    chain.extend(
466                        root_block_updates_iter
467                            .chain(std::iter::once(root_block))
468                            .map(|block| TransitionFrontierSyncBlockState::FetchSuccess {
469                                time: meta.time(),
470                                block,
471                            }),
472                    );
473                } else {
474                    chain.push(TransitionFrontierSyncBlockState::ApplySuccess {
475                        time: meta.time(),
476                        block: AppliedBlock {
477                            block: root_block,
478                            just_emitted_a_proof: root_block_just_emitted_a_proof,
479                        },
480                    });
481                }
482
483                chain.extend(blocks_inbetween.into_iter().map(|block_hash| {
484                    if let Some(block) = applied_blocks.remove(&block_hash) {
485                        TransitionFrontierSyncBlockState::ApplySuccess {
486                            time: meta.time(),
487                            block: (*block).clone(),
488                        }
489                    } else {
490                        TransitionFrontierSyncBlockState::FetchPending {
491                            time: meta.time(),
492                            block_hash,
493                            attempts: Default::default(),
494                        }
495                    }
496                }));
497                chain.push(TransitionFrontierSyncBlockState::FetchSuccess {
498                    time: meta.time(),
499                    block: best_tip,
500                });
501
502                *state = Self::BlocksPending {
503                    time: meta.time(),
504                    chain,
505                    root_snarked_ledger_updates,
506                    needed_protocol_states: std::mem::take(needed_protocol_states),
507                };
508            }
509            TransitionFrontierSyncAction::BlocksPeersQuery => {}
510            TransitionFrontierSyncAction::BlocksPeerQueryInit { hash, peer_id } => {
511                let Some(block_state) = state.block_state_mut(hash) else {
512                    return;
513                };
514                let Some(attempts) = block_state.fetch_pending_attempts_mut() else {
515                    return;
516                };
517                attempts.insert(*peer_id, PeerRpcState::Init { time: meta.time() });
518            }
519            TransitionFrontierSyncAction::BlocksPeerQueryRetry { hash, peer_id } => {
520                let Some(block_state) = state.block_state_mut(hash) else {
521                    return;
522                };
523                let Some(attempts) = block_state.fetch_pending_attempts_mut() else {
524                    return;
525                };
526                attempts.insert(*peer_id, PeerRpcState::Init { time: meta.time() });
527            }
528            TransitionFrontierSyncAction::BlocksPeerQueryPending {
529                hash,
530                peer_id,
531                rpc_id,
532            } => {
533                let Some(block_state) = state.block_state_mut(hash) else {
534                    return;
535                };
536                let Some(peer_state) = block_state.fetch_pending_from_peer_mut(peer_id) else {
537                    return;
538                };
539                *peer_state = PeerRpcState::Pending {
540                    time: meta.time(),
541                    rpc_id: *rpc_id,
542                };
543            }
544            TransitionFrontierSyncAction::BlocksPeerQueryError {
545                peer_id,
546                rpc_id,
547                error,
548            } => {
549                let Self::BlocksPending { chain, .. } = state else {
550                    return;
551                };
552                let Some(peer_state) = chain.iter_mut().find_map(|b| {
553                    b.fetch_pending_from_peer_mut(peer_id)
554                        .filter(|peer_rpc_state| {
555                            matches!(peer_rpc_state, PeerRpcState::Pending { .. })
556                        })
557                }) else {
558                    return;
559                };
560                *peer_state = PeerRpcState::Error {
561                    time: meta.time(),
562                    rpc_id: *rpc_id,
563                    error: error.clone(),
564                };
565            }
566            TransitionFrontierSyncAction::BlocksPeerQuerySuccess {
567                peer_id, response, ..
568            } => {
569                let Some(block_state) = state.block_state_mut(&response.hash) else {
570                    return;
571                };
572                let Some(peer_state) = block_state.fetch_pending_from_peer_mut(peer_id) else {
573                    return;
574                };
575                *peer_state = PeerRpcState::Success {
576                    time: meta.time(),
577                    block: response.clone(),
578                };
579            }
580            TransitionFrontierSyncAction::BlocksFetchSuccess { hash } => {
581                let Some(block_state) = state.block_state_mut(hash) else {
582                    return;
583                };
584                let Some(block) = block_state.fetch_pending_fetched_block() else {
585                    return;
586                };
587                *block_state = TransitionFrontierSyncBlockState::FetchSuccess {
588                    time: meta.time(),
589                    block: block.clone(),
590                };
591            }
592            TransitionFrontierSyncAction::BlocksNextApplyInit => {}
593            TransitionFrontierSyncAction::BlocksNextApplyPending { hash } => {
594                let Some(block_state) = state.block_state_mut(hash) else {
595                    return;
596                };
597                let Some(block) = block_state.block() else {
598                    return;
599                };
600
601                *block_state = TransitionFrontierSyncBlockState::ApplyPending {
602                    time: meta.time(),
603                    block: block.clone(),
604                };
605            }
606            TransitionFrontierSyncAction::BlocksNextApplyError { hash, error } => {
607                let Some(block_state) = state.block_state_mut(hash) else {
608                    return;
609                };
610                let Some(block) = block_state.block() else {
611                    return;
612                };
613
614                *block_state = TransitionFrontierSyncBlockState::ApplyError {
615                    time: meta.time(),
616                    block: block.clone(),
617                    error: error.clone(),
618                };
619            }
620            TransitionFrontierSyncAction::BlocksNextApplySuccess {
621                hash,
622                just_emitted_a_proof,
623            } => {
624                let Some(block_state) = state.block_state_mut(hash) else {
625                    return;
626                };
627                let Some(block) = block_state.block() else {
628                    return;
629                };
630
631                *block_state = TransitionFrontierSyncBlockState::ApplySuccess {
632                    time: meta.time(),
633                    block: AppliedBlock {
634                        block: block.clone(),
635                        just_emitted_a_proof: *just_emitted_a_proof,
636                    },
637                };
638            }
639            TransitionFrontierSyncAction::BlocksSendToArchive { .. } => {}
640            TransitionFrontierSyncAction::BlocksSuccess => {
641                let Self::BlocksPending {
642                    chain,
643                    root_snarked_ledger_updates,
644                    needed_protocol_states,
645                    ..
646                } = state
647                else {
648                    return;
649                };
650                let Some(k) = chain
651                    .last()
652                    .and_then(|v| v.block())
653                    .map(|b| b.constants().k.as_u32() as usize)
654                else {
655                    return;
656                };
657                let mut needed_protocol_states = std::mem::take(needed_protocol_states);
658                let start_i = chain.len().saturating_sub(k.saturating_add(1));
659                let mut iter = std::mem::take(chain)
660                    .into_iter()
661                    .filter_map(|v| v.take_applied_block());
662
663                for _ in 0..start_i {
664                    if let Some(b) = iter.next() {
665                        needed_protocol_states
666                            .insert(b.hash().clone(), b.header().protocol_state.clone());
667                    }
668                }
669
670                *state = Self::BlocksSuccess {
671                    time: meta.time(),
672                    chain: iter.collect(),
673                    root_snarked_ledger_updates: std::mem::take(root_snarked_ledger_updates),
674                    needed_protocol_states,
675                };
676            }
677            TransitionFrontierSyncAction::CommitInit => {}
678            TransitionFrontierSyncAction::CommitPending => {
679                if let Self::BlocksSuccess {
680                    chain,
681                    root_snarked_ledger_updates,
682                    needed_protocol_states,
683                    ..
684                } = state
685                {
686                    *state = Self::CommitPending {
687                        time: meta.time(),
688                        chain: std::mem::take(chain),
689                        root_snarked_ledger_updates: std::mem::take(root_snarked_ledger_updates),
690                        needed_protocol_states: std::mem::take(needed_protocol_states),
691                    };
692                }
693            }
694            TransitionFrontierSyncAction::CommitSuccess { .. } => {
695                if let Self::CommitPending {
696                    chain,
697                    root_snarked_ledger_updates,
698                    needed_protocol_states,
699                    ..
700                } = state
701                {
702                    *state = Self::CommitSuccess {
703                        time: meta.time(),
704                        chain: std::mem::take(chain),
705                        root_snarked_ledger_updates: std::mem::take(root_snarked_ledger_updates),
706                        needed_protocol_states: std::mem::take(needed_protocol_states),
707                    };
708                }
709            }
710            TransitionFrontierSyncAction::Ledger(a) => {
711                if state.ledger_mut().is_some() {
712                    TransitionFrontierSyncLedgerState::reducer(
713                        crate::Substate::from_compatible_substate(state_context),
714                        meta.with_action(a),
715                    );
716                }
717            }
718        }
719    }
720}
721
722#[allow(clippy::too_many_arguments)]
723fn next_required_ledger_to_sync(
724    time: redux::Timestamp,
725    cur_best_tip: Option<&ArcBlockWithHash>,
726    cur_best_root: Option<&ArcBlockWithHash>,
727    old_best_tip: &ArcBlockWithHash,
728    old_root: &ArcBlockWithHash,
729    new_best_tip: &ArcBlockWithHash,
730    new_root: &ArcBlockWithHash,
731    new_blocks_inbetween: &[StateHash],
732) -> TransitionFrontierSyncState {
733    let next_epoch_target = SyncLedgerTarget::next_epoch(new_best_tip, new_root);
734
735    let (kind, ledger) = if old_best_tip.staking_epoch_ledger_hash()
736        != new_best_tip.staking_epoch_ledger_hash()
737        && cur_best_tip.is_none_or(|cur| {
738            cur.staking_epoch_ledger_hash() != new_best_tip.staking_epoch_ledger_hash()
739        }) {
740        let ledger = TransitionFrontierSyncLedgerSnarkedState::pending(
741            time,
742            SyncLedgerTarget::staking_epoch(new_best_tip),
743        )
744        .into();
745        (SyncLedgerTargetKind::StakingEpoch, ledger)
746    } else if old_best_tip.next_epoch_ledger_hash() != new_best_tip.next_epoch_ledger_hash()
747        && cur_best_tip.is_none_or(|cur| {
748            cur.staking_epoch_ledger_hash() != new_best_tip.staking_epoch_ledger_hash()
749        })
750        && next_epoch_target.is_some()
751    {
752        let ledger =
753            TransitionFrontierSyncLedgerSnarkedState::pending(time, next_epoch_target.unwrap())
754                .into();
755        (SyncLedgerTargetKind::NextEpoch, ledger)
756    } else if old_root.snarked_ledger_hash() == new_root.snarked_ledger_hash()
757        || cur_best_root
758            .is_some_and(|cur| cur.snarked_ledger_hash() == new_root.snarked_ledger_hash())
759    {
760        let ledger = TransitionFrontierSyncLedgerSnarkedState::Success {
761            time,
762            target: SyncLedgerTarget::root(new_root),
763        }
764        .into();
765        (SyncLedgerTargetKind::Root, ledger)
766    } else {
767        let ledger = TransitionFrontierSyncLedgerSnarkedState::pending(
768            time,
769            SyncLedgerTarget::root(new_root),
770        )
771        .into();
772        (SyncLedgerTargetKind::Root, ledger)
773    };
774
775    let substate = TransitionFrontierSyncLedgerPending {
776        time,
777        best_tip: new_best_tip.clone(),
778        root_block: new_root.clone(),
779        blocks_inbetween: new_blocks_inbetween.to_owned(),
780        root_block_updates: Default::default(),
781        ledger,
782    };
783    match kind {
784        SyncLedgerTargetKind::StakingEpoch => {
785            TransitionFrontierSyncState::StakingLedgerPending(substate)
786        }
787        SyncLedgerTargetKind::NextEpoch => {
788            TransitionFrontierSyncState::NextEpochLedgerPending(substate)
789        }
790        SyncLedgerTargetKind::Root => TransitionFrontierSyncState::RootLedgerPending(substate),
791    }
792}