node/transition_frontier/sync/
transition_frontier_sync_actions.rs

1use mina_p2p_messages::v2::{LedgerHash, StateHash};
2use openmina_core::{block::ArcBlockWithHash, consensus::consensus_take, ActionEvent};
3use redux::Callback;
4use serde::{Deserialize, Serialize};
5
6use crate::{
7    ledger::write::{BlockApplyResult, CommitResult},
8    p2p::{channels::rpc::P2pRpcId, PeerId},
9    transition_frontier::sync::TransitionFrontierSyncLedgerPending,
10    TransitionFrontierAction,
11};
12
13use super::{
14    ledger::{
15        SyncLedgerTarget, TransitionFrontierSyncLedgerAction, TransitionFrontierSyncLedgerState,
16    },
17    PeerBlockFetchError, TransitionFrontierSyncState,
18};
19
20pub type TransitionFrontierSyncActionWithMeta = redux::ActionWithMeta<TransitionFrontierSyncAction>;
21pub type TransitionFrontierSyncActionWithMetaRef<'a> =
22    redux::ActionWithMeta<&'a TransitionFrontierSyncAction>;
23
24#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
25pub enum TransitionFrontierSyncAction {
26    /// Set transition frontier target to new best tip (for still unsynced frontiers)
27    #[action_event(level = info, fields(
28        block_hash = display(&best_tip.hash),
29        root_block_hash = display(&root_block.hash),
30    ))]
31    Init {
32        best_tip: ArcBlockWithHash,
33        root_block: ArcBlockWithHash,
34        blocks_inbetween: Vec<StateHash>,
35    },
36    /// Set sync target to a new best tip
37    #[action_event(level = info, fields(
38        new_best_tip_hash = display(&best_tip.hash),
39        new_best_tip_height = best_tip.height(),
40        new_root_block_hash = display(&root_block.hash),
41        new_root_snarked_ledger_hash = display(root_block.snarked_ledger_hash()),
42        new_root_staged_ledger_hash = display(root_block.merkle_root_hash()),
43    ))]
44    BestTipUpdate {
45        // Required to be able to reuse partially synced root ledgers
46        previous_root_snarked_ledger_hash: Option<LedgerHash>,
47        best_tip: ArcBlockWithHash,
48        root_block: ArcBlockWithHash,
49        blocks_inbetween: Vec<StateHash>,
50        on_success: Option<Callback<()>>,
51    },
52    /// Staking Ledger sync is pending
53    #[action_event(level = info)]
54    LedgerStakingPending,
55    /// Staking Ledger sync was successful
56    #[action_event(level = info)]
57    LedgerStakingSuccess,
58    /// Next Epoch Ledger sync is pending
59    #[action_event(level = info)]
60    LedgerNextEpochPending,
61    /// Next Epoch Ledger sync was successful
62    #[action_event(level = info)]
63    LedgerNextEpochSuccess,
64    /// Transition frontier Root Ledger sync is pending
65    #[action_event(level = info)]
66    LedgerRootPending,
67    /// Transition frontier Root Ledger sync was successful
68    #[action_event(level = info)]
69    LedgerRootSuccess,
70    BlocksPending,
71    BlocksPeersQuery,
72    BlocksPeerQueryInit {
73        hash: StateHash,
74        peer_id: PeerId,
75    },
76    BlocksPeerQueryRetry {
77        hash: StateHash,
78        peer_id: PeerId,
79    },
80    BlocksPeerQueryPending {
81        hash: StateHash,
82        peer_id: PeerId,
83        rpc_id: P2pRpcId,
84    },
85    BlocksPeerQueryError {
86        peer_id: PeerId,
87        rpc_id: P2pRpcId,
88        error: PeerBlockFetchError,
89    },
90    BlocksPeerQuerySuccess {
91        peer_id: PeerId,
92        rpc_id: P2pRpcId,
93        response: ArcBlockWithHash,
94    },
95    BlocksFetchSuccess {
96        hash: StateHash,
97    },
98    BlocksNextApplyInit,
99    BlocksNextApplyPending {
100        hash: StateHash,
101    },
102    BlocksNextApplyError {
103        hash: StateHash,
104        error: String,
105    },
106    BlocksNextApplySuccess {
107        hash: StateHash,
108        just_emitted_a_proof: bool,
109    },
110    /// Sending block to archive
111    #[action_event(level = info, fields(
112        block_hash = display(&hash),
113    ))]
114    BlocksSendToArchive {
115        hash: StateHash,
116        data: BlockApplyResult,
117    },
118    /// Done applying all pending blocks
119    BlocksSuccess,
120    /// Commit all the accumulated changes after the
121    /// synchronization is done to the ledger service.
122    CommitInit,
123    CommitPending,
124    /// Committing changes after sync finished.
125    CommitSuccess {
126        result: CommitResult,
127    },
128    /// Synchronization to a target ledger
129    Ledger(TransitionFrontierSyncLedgerAction),
130}
131
132impl redux::EnablingCondition<crate::State> for TransitionFrontierSyncAction {
133    fn is_enabled(&self, state: &crate::State, time: redux::Timestamp) -> bool {
134        match self {
135            TransitionFrontierSyncAction::Init { best_tip, .. } => {
136                !state.transition_frontier.sync.is_pending()
137                    && !state.transition_frontier.sync.is_synced()
138                    && state
139                        .transition_frontier
140                        .best_tip()
141                        .is_none_or(|tip| best_tip.hash != tip.hash)
142                    && state
143                        .transition_frontier
144                        .candidates
145                        .best_verified_block()
146                        .is_some_and(|block| best_tip.hash() == block.hash())
147            }
148            TransitionFrontierSyncAction::BestTipUpdate {
149                best_tip,
150                blocks_inbetween,
151                root_block,
152                ..
153            } => {
154                let blacklist = &state.transition_frontier.blacklist;
155                (state.transition_frontier.sync.is_pending() || state.transition_frontier.sync.is_synced())
156                    && !matches!(&state.transition_frontier.sync, TransitionFrontierSyncState::CommitPending { .. } | TransitionFrontierSyncState::CommitSuccess { .. })
157                && state
158                    .transition_frontier
159                    .best_tip()
160                    .is_some_and( |tip| best_tip.hash != tip.hash)
161                && state
162                    .transition_frontier
163                    .sync
164                    .best_tip()
165                    .is_none_or(|tip| best_tip.hash != tip.hash)
166                // TODO(binier): TMP. we shouldn't need to check consensus here.
167                && state
168                    .transition_frontier
169                    .sync
170                    .best_tip()
171                    .or(state.transition_frontier.best_tip())
172                    .is_some_and( |tip| {
173                        if tip.is_genesis() && best_tip.height() > tip.height() {
174                            // TODO(binier): once genesis blocks are same, uncomment below.
175                            // tip.hash() == &best_tip.header().protocol_state.body.genesis_state_hash
176                            true
177                        } else {
178                            consensus_take(tip.consensus_state(), best_tip.consensus_state(), tip.hash(), best_tip.hash())
179                        }
180                    })
181                // check the block blacklist
182                && !blacklist.contains_key(best_tip.hash())
183                && !blacklist.contains_key(root_block.hash())
184                && !blocks_inbetween.iter().any(|hash| blacklist.contains_key(hash))
185                // Don't sync to best tip if we are in the middle of producing
186                // a block unless that best tip candidate is better consensus-wise
187                // than the one that we are producing.
188                //
189                // Otherwise other block producers might spam the network
190                // with blocks that are better than current best tip, yet
191                // inferior to the block that we are producing and we can't
192                // let that get in the way of us producing a block.
193                && state.block_producer.producing_won_slot()
194                    .filter(|_| !state.block_producer.is_me(best_tip.producer()))
195                    // TODO(binier): check if candidate best tip is short or
196                    // long range fork and based on that compare slot that
197                    // we are producing.
198                    .is_none_or(|won_slot| won_slot < best_tip)
199            }
200            TransitionFrontierSyncAction::LedgerStakingPending => {
201                matches!(
202                    state.transition_frontier.sync,
203                    TransitionFrontierSyncState::Init { .. }
204                )
205            }
206            TransitionFrontierSyncAction::LedgerStakingSuccess => matches!(
207                state.transition_frontier.sync,
208                TransitionFrontierSyncState::StakingLedgerPending(
209                    TransitionFrontierSyncLedgerPending {
210                        ledger: TransitionFrontierSyncLedgerState::Success { .. },
211                        ..
212                    }
213                )
214            ),
215            TransitionFrontierSyncAction::LedgerNextEpochPending => {
216                match &state.transition_frontier.sync {
217                    TransitionFrontierSyncState::Init {
218                        best_tip,
219                        root_block,
220                        ..
221                    } => SyncLedgerTarget::next_epoch(best_tip, root_block).is_some(),
222                    TransitionFrontierSyncState::StakingLedgerSuccess {
223                        best_tip,
224                        root_block,
225                        ..
226                    } => SyncLedgerTarget::next_epoch(best_tip, root_block).is_some(),
227                    _ => false,
228                }
229            }
230            TransitionFrontierSyncAction::LedgerNextEpochSuccess => matches!(
231                state.transition_frontier.sync,
232                TransitionFrontierSyncState::NextEpochLedgerPending(
233                    TransitionFrontierSyncLedgerPending {
234                        ledger: TransitionFrontierSyncLedgerState::Success { .. },
235                        ..
236                    }
237                )
238            ),
239            TransitionFrontierSyncAction::LedgerRootPending => {
240                match &state.transition_frontier.sync {
241                    TransitionFrontierSyncState::Init {
242                        best_tip,
243                        root_block,
244                        ..
245                    }
246                    | TransitionFrontierSyncState::StakingLedgerSuccess {
247                        best_tip,
248                        root_block,
249                        ..
250                    } => SyncLedgerTarget::next_epoch(best_tip, root_block).is_none(),
251                    TransitionFrontierSyncState::NextEpochLedgerSuccess { .. } => true,
252                    _ => false,
253                }
254            }
255            TransitionFrontierSyncAction::LedgerRootSuccess => matches!(
256                state.transition_frontier.sync,
257                TransitionFrontierSyncState::RootLedgerPending(
258                    TransitionFrontierSyncLedgerPending {
259                        ledger: TransitionFrontierSyncLedgerState::Success { .. },
260                        ..
261                    }
262                )
263            ),
264            TransitionFrontierSyncAction::BlocksPending => matches!(
265                state.transition_frontier.sync,
266                TransitionFrontierSyncState::RootLedgerSuccess { .. }
267            ),
268            TransitionFrontierSyncAction::BlocksPeersQuery => {
269                let peers_available = state
270                    .p2p
271                    .ready_peers_iter()
272                    .any(|(_, p)| p.channels.rpc.can_send_request());
273                let sync = &state.transition_frontier.sync;
274                peers_available
275                    && (sync.blocks_fetch_next().is_some()
276                        || sync.blocks_fetch_retry_iter().next().is_some())
277            }
278            TransitionFrontierSyncAction::BlocksPeerQueryInit { hash, peer_id } => {
279                let check_next_hash = state
280                    .transition_frontier
281                    .sync
282                    .blocks_fetch_next()
283                    .is_some_and(|expected| &expected == hash);
284
285                let check_peer_available = state
286                    .p2p
287                    .get_ready_peer(peer_id)
288                    .and_then(|p| {
289                        let sync_best_tip = state.transition_frontier.sync.best_tip()?;
290                        let peer_best_tip = p.best_tip.as_ref()?;
291                        Some(p).filter(|_| sync_best_tip.hash == peer_best_tip.hash)
292                    })
293                    .is_some_and(|p| p.channels.rpc.can_send_request());
294
295                check_next_hash && check_peer_available
296            }
297            TransitionFrontierSyncAction::BlocksPeerQueryRetry { hash, peer_id } => {
298                let check_next_hash = state
299                    .transition_frontier
300                    .sync
301                    .blocks_fetch_retry_iter()
302                    .next()
303                    .is_some_and(|expected| &expected == hash);
304
305                let check_peer_available = state
306                    .p2p
307                    .get_ready_peer(peer_id)
308                    .and_then(|p| {
309                        let sync_best_tip = state.transition_frontier.sync.best_tip()?;
310                        let peer_best_tip = p.best_tip.as_ref()?;
311                        Some(p).filter(|_| sync_best_tip.hash == peer_best_tip.hash)
312                    })
313                    .is_some_and(|p| p.channels.rpc.can_send_request());
314
315                check_next_hash && check_peer_available
316            }
317            TransitionFrontierSyncAction::BlocksPeerQueryPending { hash, peer_id, .. } => state
318                .transition_frontier
319                .sync
320                .block_state(hash)
321                .is_some_and(|b| b.is_fetch_init_from_peer(peer_id)),
322            TransitionFrontierSyncAction::BlocksPeerQueryError {
323                peer_id, rpc_id, ..
324            } => state
325                .transition_frontier
326                .sync
327                .blocks_iter()
328                .any(|s| s.is_fetch_pending_from_peer(peer_id, *rpc_id)),
329            TransitionFrontierSyncAction::BlocksPeerQuerySuccess {
330                peer_id,
331                rpc_id,
332                response,
333            } => state
334                .transition_frontier
335                .sync
336                .block_state(&response.hash)
337                .filter(|s| s.is_fetch_pending_from_peer(peer_id, *rpc_id))
338                .is_some_and(|s| s.block_hash() == &response.hash),
339            TransitionFrontierSyncAction::BlocksFetchSuccess { hash } => state
340                .transition_frontier
341                .sync
342                .block_state(hash)
343                .is_some_and(|s| s.fetch_pending_fetched_block().is_some()),
344            TransitionFrontierSyncAction::BlocksNextApplyInit => {
345                state.transition_frontier.sync.blocks_apply_next().is_some()
346            }
347            TransitionFrontierSyncAction::BlocksNextApplyPending { hash } => state
348                .transition_frontier
349                .sync
350                .blocks_apply_next()
351                .is_some_and(|(b, _)| &b.hash == hash),
352            TransitionFrontierSyncAction::BlocksNextApplyError { hash, .. } => state
353                .transition_frontier
354                .sync
355                .blocks_apply_pending()
356                .is_some_and(|b| &b.hash == hash),
357            TransitionFrontierSyncAction::BlocksNextApplySuccess {
358                hash,
359                just_emitted_a_proof: _,
360            } => state
361                .transition_frontier
362                .sync
363                .blocks_apply_pending()
364                .is_some_and(|b| &b.hash == hash),
365            TransitionFrontierSyncAction::BlocksSuccess => match &state.transition_frontier.sync {
366                TransitionFrontierSyncState::BlocksPending { chain, .. } => {
367                    chain.iter().all(|v| v.is_apply_success())
368                }
369                _ => false,
370            },
371            TransitionFrontierSyncAction::BlocksSendToArchive { .. } => {
372                state.transition_frontier.archive_enabled
373            }
374            TransitionFrontierSyncAction::CommitInit => matches!(
375                state.transition_frontier.sync,
376                TransitionFrontierSyncState::BlocksSuccess { .. },
377            ),
378            TransitionFrontierSyncAction::CommitPending => matches!(
379                state.transition_frontier.sync,
380                TransitionFrontierSyncState::BlocksSuccess { .. },
381            ),
382            TransitionFrontierSyncAction::CommitSuccess { .. } => matches!(
383                state.transition_frontier.sync,
384                TransitionFrontierSyncState::CommitPending { .. },
385            ),
386            TransitionFrontierSyncAction::Ledger(action) => action.is_enabled(state, time),
387        }
388    }
389}
390
391impl From<TransitionFrontierSyncAction> for crate::Action {
392    fn from(value: TransitionFrontierSyncAction) -> Self {
393        Self::TransitionFrontier(TransitionFrontierAction::Sync(value))
394    }
395}