1use mina_p2p_messages::v2::{LedgerHash, StateHash};
2use openmina_core::{block::ArcBlockWithHash, consensus::consensus_take, ActionEvent};
3use redux::Callback;
4use serde::{Deserialize, Serialize};
5
6use crate::{
7 ledger::write::{BlockApplyResult, CommitResult},
8 p2p::{channels::rpc::P2pRpcId, PeerId},
9 transition_frontier::sync::TransitionFrontierSyncLedgerPending,
10 TransitionFrontierAction,
11};
12
13use super::{
14 ledger::{
15 SyncLedgerTarget, TransitionFrontierSyncLedgerAction, TransitionFrontierSyncLedgerState,
16 },
17 PeerBlockFetchError, TransitionFrontierSyncState,
18};
19
20pub type TransitionFrontierSyncActionWithMeta = redux::ActionWithMeta<TransitionFrontierSyncAction>;
21pub type TransitionFrontierSyncActionWithMetaRef<'a> =
22 redux::ActionWithMeta<&'a TransitionFrontierSyncAction>;
23
24#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
25pub enum TransitionFrontierSyncAction {
26 #[action_event(level = info, fields(
28 block_hash = display(&best_tip.hash),
29 root_block_hash = display(&root_block.hash),
30 ))]
31 Init {
32 best_tip: ArcBlockWithHash,
33 root_block: ArcBlockWithHash,
34 blocks_inbetween: Vec<StateHash>,
35 },
36 #[action_event(level = info, fields(
38 new_best_tip_hash = display(&best_tip.hash),
39 new_best_tip_height = best_tip.height(),
40 new_root_block_hash = display(&root_block.hash),
41 new_root_snarked_ledger_hash = display(root_block.snarked_ledger_hash()),
42 new_root_staged_ledger_hash = display(root_block.merkle_root_hash()),
43 ))]
44 BestTipUpdate {
45 previous_root_snarked_ledger_hash: Option<LedgerHash>,
47 best_tip: ArcBlockWithHash,
48 root_block: ArcBlockWithHash,
49 blocks_inbetween: Vec<StateHash>,
50 on_success: Option<Callback<()>>,
51 },
52 #[action_event(level = info)]
54 LedgerStakingPending,
55 #[action_event(level = info)]
57 LedgerStakingSuccess,
58 #[action_event(level = info)]
60 LedgerNextEpochPending,
61 #[action_event(level = info)]
63 LedgerNextEpochSuccess,
64 #[action_event(level = info)]
66 LedgerRootPending,
67 #[action_event(level = info)]
69 LedgerRootSuccess,
70 BlocksPending,
71 BlocksPeersQuery,
72 BlocksPeerQueryInit {
73 hash: StateHash,
74 peer_id: PeerId,
75 },
76 BlocksPeerQueryRetry {
77 hash: StateHash,
78 peer_id: PeerId,
79 },
80 BlocksPeerQueryPending {
81 hash: StateHash,
82 peer_id: PeerId,
83 rpc_id: P2pRpcId,
84 },
85 BlocksPeerQueryError {
86 peer_id: PeerId,
87 rpc_id: P2pRpcId,
88 error: PeerBlockFetchError,
89 },
90 BlocksPeerQuerySuccess {
91 peer_id: PeerId,
92 rpc_id: P2pRpcId,
93 response: ArcBlockWithHash,
94 },
95 BlocksFetchSuccess {
96 hash: StateHash,
97 },
98 BlocksNextApplyInit,
99 BlocksNextApplyPending {
100 hash: StateHash,
101 },
102 BlocksNextApplyError {
103 hash: StateHash,
104 error: String,
105 },
106 BlocksNextApplySuccess {
107 hash: StateHash,
108 just_emitted_a_proof: bool,
109 },
110 #[action_event(level = info, fields(
112 block_hash = display(&hash),
113 ))]
114 BlocksSendToArchive {
115 hash: StateHash,
116 data: BlockApplyResult,
117 },
118 BlocksSuccess,
120 CommitInit,
123 CommitPending,
124 CommitSuccess {
126 result: CommitResult,
127 },
128 Ledger(TransitionFrontierSyncLedgerAction),
130}
131
132impl redux::EnablingCondition<crate::State> for TransitionFrontierSyncAction {
133 fn is_enabled(&self, state: &crate::State, time: redux::Timestamp) -> bool {
134 match self {
135 TransitionFrontierSyncAction::Init { best_tip, .. } => {
136 !state.transition_frontier.sync.is_pending()
137 && !state.transition_frontier.sync.is_synced()
138 && state
139 .transition_frontier
140 .best_tip()
141 .is_none_or(|tip| best_tip.hash != tip.hash)
142 && state
143 .transition_frontier
144 .candidates
145 .best_verified_block()
146 .is_some_and(|block| best_tip.hash() == block.hash())
147 }
148 TransitionFrontierSyncAction::BestTipUpdate {
149 best_tip,
150 blocks_inbetween,
151 root_block,
152 ..
153 } => {
154 let blacklist = &state.transition_frontier.blacklist;
155 (state.transition_frontier.sync.is_pending() || state.transition_frontier.sync.is_synced())
156 && !matches!(&state.transition_frontier.sync, TransitionFrontierSyncState::CommitPending { .. } | TransitionFrontierSyncState::CommitSuccess { .. })
157 && state
158 .transition_frontier
159 .best_tip()
160 .is_some_and( |tip| best_tip.hash != tip.hash)
161 && state
162 .transition_frontier
163 .sync
164 .best_tip()
165 .is_none_or(|tip| best_tip.hash != tip.hash)
166 && state
168 .transition_frontier
169 .sync
170 .best_tip()
171 .or(state.transition_frontier.best_tip())
172 .is_some_and( |tip| {
173 if tip.is_genesis() && best_tip.height() > tip.height() {
174 true
177 } else {
178 consensus_take(tip.consensus_state(), best_tip.consensus_state(), tip.hash(), best_tip.hash())
179 }
180 })
181 && !blacklist.contains_key(best_tip.hash())
183 && !blacklist.contains_key(root_block.hash())
184 && !blocks_inbetween.iter().any(|hash| blacklist.contains_key(hash))
185 && state.block_producer.producing_won_slot()
194 .filter(|_| !state.block_producer.is_me(best_tip.producer()))
195 .is_none_or(|won_slot| won_slot < best_tip)
199 }
200 TransitionFrontierSyncAction::LedgerStakingPending => {
201 matches!(
202 state.transition_frontier.sync,
203 TransitionFrontierSyncState::Init { .. }
204 )
205 }
206 TransitionFrontierSyncAction::LedgerStakingSuccess => matches!(
207 state.transition_frontier.sync,
208 TransitionFrontierSyncState::StakingLedgerPending(
209 TransitionFrontierSyncLedgerPending {
210 ledger: TransitionFrontierSyncLedgerState::Success { .. },
211 ..
212 }
213 )
214 ),
215 TransitionFrontierSyncAction::LedgerNextEpochPending => {
216 match &state.transition_frontier.sync {
217 TransitionFrontierSyncState::Init {
218 best_tip,
219 root_block,
220 ..
221 } => SyncLedgerTarget::next_epoch(best_tip, root_block).is_some(),
222 TransitionFrontierSyncState::StakingLedgerSuccess {
223 best_tip,
224 root_block,
225 ..
226 } => SyncLedgerTarget::next_epoch(best_tip, root_block).is_some(),
227 _ => false,
228 }
229 }
230 TransitionFrontierSyncAction::LedgerNextEpochSuccess => matches!(
231 state.transition_frontier.sync,
232 TransitionFrontierSyncState::NextEpochLedgerPending(
233 TransitionFrontierSyncLedgerPending {
234 ledger: TransitionFrontierSyncLedgerState::Success { .. },
235 ..
236 }
237 )
238 ),
239 TransitionFrontierSyncAction::LedgerRootPending => {
240 match &state.transition_frontier.sync {
241 TransitionFrontierSyncState::Init {
242 best_tip,
243 root_block,
244 ..
245 }
246 | TransitionFrontierSyncState::StakingLedgerSuccess {
247 best_tip,
248 root_block,
249 ..
250 } => SyncLedgerTarget::next_epoch(best_tip, root_block).is_none(),
251 TransitionFrontierSyncState::NextEpochLedgerSuccess { .. } => true,
252 _ => false,
253 }
254 }
255 TransitionFrontierSyncAction::LedgerRootSuccess => matches!(
256 state.transition_frontier.sync,
257 TransitionFrontierSyncState::RootLedgerPending(
258 TransitionFrontierSyncLedgerPending {
259 ledger: TransitionFrontierSyncLedgerState::Success { .. },
260 ..
261 }
262 )
263 ),
264 TransitionFrontierSyncAction::BlocksPending => matches!(
265 state.transition_frontier.sync,
266 TransitionFrontierSyncState::RootLedgerSuccess { .. }
267 ),
268 TransitionFrontierSyncAction::BlocksPeersQuery => {
269 let peers_available = state
270 .p2p
271 .ready_peers_iter()
272 .any(|(_, p)| p.channels.rpc.can_send_request());
273 let sync = &state.transition_frontier.sync;
274 peers_available
275 && (sync.blocks_fetch_next().is_some()
276 || sync.blocks_fetch_retry_iter().next().is_some())
277 }
278 TransitionFrontierSyncAction::BlocksPeerQueryInit { hash, peer_id } => {
279 let check_next_hash = state
280 .transition_frontier
281 .sync
282 .blocks_fetch_next()
283 .is_some_and(|expected| &expected == hash);
284
285 let check_peer_available = state
286 .p2p
287 .get_ready_peer(peer_id)
288 .and_then(|p| {
289 let sync_best_tip = state.transition_frontier.sync.best_tip()?;
290 let peer_best_tip = p.best_tip.as_ref()?;
291 Some(p).filter(|_| sync_best_tip.hash == peer_best_tip.hash)
292 })
293 .is_some_and(|p| p.channels.rpc.can_send_request());
294
295 check_next_hash && check_peer_available
296 }
297 TransitionFrontierSyncAction::BlocksPeerQueryRetry { hash, peer_id } => {
298 let check_next_hash = state
299 .transition_frontier
300 .sync
301 .blocks_fetch_retry_iter()
302 .next()
303 .is_some_and(|expected| &expected == hash);
304
305 let check_peer_available = state
306 .p2p
307 .get_ready_peer(peer_id)
308 .and_then(|p| {
309 let sync_best_tip = state.transition_frontier.sync.best_tip()?;
310 let peer_best_tip = p.best_tip.as_ref()?;
311 Some(p).filter(|_| sync_best_tip.hash == peer_best_tip.hash)
312 })
313 .is_some_and(|p| p.channels.rpc.can_send_request());
314
315 check_next_hash && check_peer_available
316 }
317 TransitionFrontierSyncAction::BlocksPeerQueryPending { hash, peer_id, .. } => state
318 .transition_frontier
319 .sync
320 .block_state(hash)
321 .is_some_and(|b| b.is_fetch_init_from_peer(peer_id)),
322 TransitionFrontierSyncAction::BlocksPeerQueryError {
323 peer_id, rpc_id, ..
324 } => state
325 .transition_frontier
326 .sync
327 .blocks_iter()
328 .any(|s| s.is_fetch_pending_from_peer(peer_id, *rpc_id)),
329 TransitionFrontierSyncAction::BlocksPeerQuerySuccess {
330 peer_id,
331 rpc_id,
332 response,
333 } => state
334 .transition_frontier
335 .sync
336 .block_state(&response.hash)
337 .filter(|s| s.is_fetch_pending_from_peer(peer_id, *rpc_id))
338 .is_some_and(|s| s.block_hash() == &response.hash),
339 TransitionFrontierSyncAction::BlocksFetchSuccess { hash } => state
340 .transition_frontier
341 .sync
342 .block_state(hash)
343 .is_some_and(|s| s.fetch_pending_fetched_block().is_some()),
344 TransitionFrontierSyncAction::BlocksNextApplyInit => {
345 state.transition_frontier.sync.blocks_apply_next().is_some()
346 }
347 TransitionFrontierSyncAction::BlocksNextApplyPending { hash } => state
348 .transition_frontier
349 .sync
350 .blocks_apply_next()
351 .is_some_and(|(b, _)| &b.hash == hash),
352 TransitionFrontierSyncAction::BlocksNextApplyError { hash, .. } => state
353 .transition_frontier
354 .sync
355 .blocks_apply_pending()
356 .is_some_and(|b| &b.hash == hash),
357 TransitionFrontierSyncAction::BlocksNextApplySuccess {
358 hash,
359 just_emitted_a_proof: _,
360 } => state
361 .transition_frontier
362 .sync
363 .blocks_apply_pending()
364 .is_some_and(|b| &b.hash == hash),
365 TransitionFrontierSyncAction::BlocksSuccess => match &state.transition_frontier.sync {
366 TransitionFrontierSyncState::BlocksPending { chain, .. } => {
367 chain.iter().all(|v| v.is_apply_success())
368 }
369 _ => false,
370 },
371 TransitionFrontierSyncAction::BlocksSendToArchive { .. } => {
372 state.transition_frontier.archive_enabled
373 }
374 TransitionFrontierSyncAction::CommitInit => matches!(
375 state.transition_frontier.sync,
376 TransitionFrontierSyncState::BlocksSuccess { .. },
377 ),
378 TransitionFrontierSyncAction::CommitPending => matches!(
379 state.transition_frontier.sync,
380 TransitionFrontierSyncState::BlocksSuccess { .. },
381 ),
382 TransitionFrontierSyncAction::CommitSuccess { .. } => matches!(
383 state.transition_frontier.sync,
384 TransitionFrontierSyncState::CommitPending { .. },
385 ),
386 TransitionFrontierSyncAction::Ledger(action) => action.is_enabled(state, time),
387 }
388 }
389}
390
391impl From<TransitionFrontierSyncAction> for crate::Action {
392 fn from(value: TransitionFrontierSyncAction) -> Self {
393 Self::TransitionFrontier(TransitionFrontierAction::Sync(value))
394 }
395}