node/transition_frontier/sync/
transition_frontier_sync_effects.rs1use mina_p2p_messages::v2::LedgerHash;
2use openmina_core::block::{AppliedBlock, ArcBlockWithHash};
3use p2p::{
4 channels::rpc::{P2pChannelsRpcAction, P2pRpcId},
5 P2pNetworkPubsubAction, PeerId,
6};
7use redux::ActionMeta;
8
9use crate::{
10 ledger::write::{LedgerWriteAction, LedgerWriteRequest, LedgersToKeep},
11 p2p::channels::rpc::P2pRpcRequest,
12 p2p_ready,
13 service::TransitionFrontierSyncLedgerSnarkedService,
14 Service, Store, TransitionFrontierAction,
15};
16
17use super::{
18 ledger::{
19 snarked::TransitionFrontierSyncLedgerSnarkedAction,
20 staged::TransitionFrontierSyncLedgerStagedAction, SyncLedgerTarget,
21 TransitionFrontierSyncLedgerAction,
22 },
23 SyncError, TransitionFrontierSyncAction, TransitionFrontierSyncState,
24};
25
26impl TransitionFrontierSyncAction {
27 pub fn effects<S>(&self, meta: &ActionMeta, store: &mut Store<S>)
28 where
29 S: Service,
30 {
31 match self {
32 TransitionFrontierSyncAction::Init { best_tip, .. } => {
33 let protocol_state_body = &best_tip.block.header.protocol_state.body;
34 let genesis_ledger_hash = &protocol_state_body.blockchain_state.genesis_ledger_hash;
35 let staking_epoch_ledger_hash = &protocol_state_body
36 .consensus_state
37 .staking_epoch_data
38 .ledger
39 .hash;
40 let next_epoch_ledger_hash = &protocol_state_body
41 .consensus_state
42 .next_epoch_data
43 .ledger
44 .hash;
45
46 if genesis_ledger_hash != staking_epoch_ledger_hash {
51 store.dispatch(TransitionFrontierSyncAction::LedgerStakingPending);
52 } else if genesis_ledger_hash != next_epoch_ledger_hash {
53 store.dispatch(TransitionFrontierSyncAction::LedgerNextEpochPending);
54 } else {
55 store.dispatch(TransitionFrontierSyncAction::LedgerRootPending);
56 }
57 }
58 TransitionFrontierSyncAction::BestTipUpdate {
59 previous_root_snarked_ledger_hash,
60 best_tip,
61 on_success,
62 ..
63 } => {
64 maybe_copy_ledgers_for_sync(
67 store,
68 previous_root_snarked_ledger_hash.clone(),
69 best_tip,
70 )
71 .unwrap();
72
73 store.dispatch(TransitionFrontierSyncLedgerAction::Init);
75 store.dispatch(TransitionFrontierSyncLedgerStagedAction::PartsFetchPending);
78 store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery);
79 store.dispatch(TransitionFrontierSyncAction::BlocksPeersQuery);
81 store.dispatch(TransitionFrontierSyncAction::BlocksNextApplyInit);
83
84 if let Some(callback) = on_success {
86 store.dispatch_callback(callback.clone(), ());
87 }
88 }
89 TransitionFrontierSyncAction::LedgerStakingPending => {
98 prepare_staking_epoch_ledger_for_sync(store, &sync_best_tip(store.state()))
99 .unwrap();
100
101 store.dispatch(TransitionFrontierSyncLedgerAction::Init);
102 }
103 TransitionFrontierSyncAction::LedgerStakingSuccess => {
104 if store.dispatch(TransitionFrontierSyncAction::LedgerNextEpochPending) {
105 } else if store.dispatch(TransitionFrontierSyncAction::LedgerRootPending) {
106 }
107 }
108 TransitionFrontierSyncAction::LedgerNextEpochPending => {
109 prepare_next_epoch_ledger_for_sync(store, &sync_best_tip(store.state())).unwrap();
110
111 store.dispatch(TransitionFrontierSyncLedgerAction::Init);
112 }
113 TransitionFrontierSyncAction::LedgerNextEpochSuccess => {
114 store.dispatch(TransitionFrontierSyncAction::LedgerRootPending);
115 }
116 TransitionFrontierSyncAction::LedgerRootPending => {
117 prepare_transition_frontier_root_ledger_for_sync(
118 store,
119 None,
120 &sync_best_tip(store.state()),
121 )
122 .unwrap();
123
124 store.dispatch(TransitionFrontierSyncLedgerAction::Init);
125 }
126 TransitionFrontierSyncAction::LedgerRootSuccess => {
127 store.dispatch(TransitionFrontierSyncAction::BlocksPending);
128 }
129 TransitionFrontierSyncAction::BlocksPending => {
130 if !store.dispatch(TransitionFrontierSyncAction::BlocksSuccess) {
131 store.dispatch(TransitionFrontierSyncAction::BlocksPeersQuery);
132 }
133 }
134 TransitionFrontierSyncAction::BlocksPeersQuery => {
135 let p2p = p2p_ready!(store.state().p2p, meta.time());
136 let mut peer_ids = p2p
138 .ready_peers_iter()
139 .filter(|(_, p)| p.channels.rpc.can_send_request())
140 .map(|(id, p)| (*id, p.connected_since))
141 .collect::<Vec<_>>();
142 peer_ids.sort_by(|(_, t1), (_, t2)| t2.cmp(t1));
143
144 let mut retry_hashes = store
145 .state()
146 .transition_frontier
147 .sync
148 .blocks_fetch_retry_iter()
149 .collect::<Vec<_>>();
150 retry_hashes.reverse();
151
152 for (peer_id, _) in peer_ids {
153 if let Some(hash) = retry_hashes.last() {
154 if store.dispatch(TransitionFrontierSyncAction::BlocksPeerQueryRetry {
155 peer_id,
156 hash: hash.clone(),
157 }) {
158 retry_hashes.pop();
159 continue;
160 }
161 }
162
163 match store.state().transition_frontier.sync.blocks_fetch_next() {
164 Some(hash) => {
165 store.dispatch(TransitionFrontierSyncAction::BlocksPeerQueryInit {
166 peer_id,
167 hash,
168 });
169 }
170 None if retry_hashes.is_empty() => break,
171 None => {}
172 }
173 }
174 }
175 TransitionFrontierSyncAction::BlocksPeerQueryInit { hash, peer_id } => {
176 let p2p = p2p_ready!(store.state().p2p, meta.time());
177 let Some(rpc_id) = p2p
178 .get_ready_peer(peer_id)
179 .map(|v| v.channels.next_local_rpc_id())
180 else {
181 return;
182 };
183
184 store.dispatch(P2pChannelsRpcAction::RequestSend {
185 peer_id: *peer_id,
186 id: rpc_id,
187 request: Box::new(P2pRpcRequest::Block(hash.clone())),
188 on_init: Some(redux::callback!(
189 on_send_p2p_block_rpc_request(
190 (peer_id: PeerId, rpc_id: P2pRpcId, request: P2pRpcRequest)
191 ) -> crate::Action {
192 let P2pRpcRequest::Block(hash) = request else {
193 unreachable!()
194 };
195 TransitionFrontierSyncAction::BlocksPeerQueryPending {
196 hash,
197 peer_id,
198 rpc_id,
199 }
200 }
201 )),
202 });
203 }
204 TransitionFrontierSyncAction::BlocksPeerQueryRetry { hash, peer_id } => {
205 let p2p = p2p_ready!(store.state().p2p, meta.time());
206 let Some(rpc_id) = p2p
207 .get_ready_peer(peer_id)
208 .map(|v| v.channels.next_local_rpc_id())
209 else {
210 return;
211 };
212
213 store.dispatch(P2pChannelsRpcAction::RequestSend {
214 peer_id: *peer_id,
215 id: rpc_id,
216 request: Box::new(P2pRpcRequest::Block(hash.clone())),
217 on_init: Some(redux::callback!(
218 on_send_p2p_block_rpc_request_retry(
219 (peer_id: PeerId, rpc_id: P2pRpcId, request: P2pRpcRequest)
220 ) -> crate::Action {
221 let P2pRpcRequest::Block(hash) = request else {
222 unreachable!()
223 };
224 TransitionFrontierSyncAction::BlocksPeerQueryPending {
225 hash,
226 peer_id,
227 rpc_id,
228 }
229 }
230 )),
231 });
232 }
233 TransitionFrontierSyncAction::BlocksPeerQueryPending { .. } => {}
234 TransitionFrontierSyncAction::BlocksPeerQueryError { .. } => {
235 store.dispatch(TransitionFrontierSyncAction::BlocksPeersQuery);
236 }
237 TransitionFrontierSyncAction::BlocksPeerQuerySuccess { response, .. } => {
238 store.dispatch(TransitionFrontierSyncAction::BlocksPeersQuery);
239 store.dispatch(TransitionFrontierSyncAction::BlocksFetchSuccess {
240 hash: response.hash.clone(),
241 });
242 }
243 TransitionFrontierSyncAction::BlocksFetchSuccess { .. } => {
244 let _ = store;
245 store.dispatch(TransitionFrontierSyncAction::BlocksNextApplyInit {});
246 }
247 TransitionFrontierSyncAction::BlocksNextApplyInit => {
248 let Some((block, pred_block)) = store
249 .state()
250 .transition_frontier
251 .sync
252 .blocks_apply_next()
253 .map(|v| (v.0.clone(), v.1.clone()))
254 else {
255 return;
256 };
257 let hash = block.hash.clone();
258
259 let is_our_block;
260
261 if let Some(stats) = store.service.stats() {
262 stats.block_producer().block_apply_start(meta.time(), &hash);
263 is_our_block = stats.block_producer().is_our_just_produced_block(&hash);
266 } else {
267 is_our_block = false;
268 }
269
270 let skip_verification = is_our_block
274 || super::CATCHUP_BLOCK_VERIFY_TAIL_LENGTH
275 < store.state().transition_frontier.sync.pending_count();
276
277 store.dispatch(LedgerWriteAction::Init {
278 request: LedgerWriteRequest::BlockApply {
279 block,
280 pred_block,
281 skip_verification,
282 },
283 on_init: redux::callback!(
284 on_block_next_apply_init(request: LedgerWriteRequest) -> crate::Action {
285 let LedgerWriteRequest::BlockApply {
286 block,
287 pred_block: _,
288 skip_verification: _,
289 } = request
290 else {
291 unreachable!()
292 };
293 let hash = block.hash().clone();
294 TransitionFrontierSyncAction::BlocksNextApplyPending { hash }
295 }
296 ),
297 });
298 }
299 TransitionFrontierSyncAction::BlocksNextApplyPending { .. } => {}
300 TransitionFrontierSyncAction::BlocksNextApplyError { hash, error } => {
301 let Some((best_tip, failed_block)) = None.or_else(|| {
302 Some((
303 store.state().transition_frontier.sync.best_tip()?.clone(),
304 store
305 .state()
306 .transition_frontier
307 .sync
308 .block_state(hash)?
309 .block()?,
310 ))
311 }) else {
312 return;
313 };
314 let error = SyncError::BlockApplyFailed(failed_block.clone(), error.clone());
315 store.dispatch(TransitionFrontierAction::SyncFailed { best_tip, error });
316 store.dispatch(P2pNetworkPubsubAction::RejectMessage {
318 message_id: Some(p2p::BroadcastMessageId::BlockHash { hash: hash.clone() }),
319 peer_id: None,
320 reason: "Failed to apply block".to_owned(),
321 });
322 }
323 TransitionFrontierSyncAction::BlocksNextApplySuccess {
324 hash,
325 just_emitted_a_proof: _,
326 } => {
327 if let Some(stats) = store.service.stats() {
328 stats.block_producer().block_apply_end(meta.time(), hash);
329 }
330
331 if !store.dispatch(TransitionFrontierSyncAction::BlocksNextApplyInit) {
332 store.dispatch(TransitionFrontierSyncAction::BlocksSuccess);
333 }
334 }
335 TransitionFrontierSyncAction::BlocksSendToArchive { data, .. } => {
336 store.service().send_to_archive(data.clone());
337 }
338 TransitionFrontierSyncAction::BlocksSuccess => {}
339 TransitionFrontierSyncAction::CommitInit => {
345 let transition_frontier = &store.state.get().transition_frontier;
346 let TransitionFrontierSyncState::BlocksSuccess {
347 chain,
348 root_snarked_ledger_updates,
349 needed_protocol_states,
350 ..
351 } = &transition_frontier.sync
352 else {
353 return;
354 };
355 let Some(new_root) = chain.first() else {
356 return;
357 };
358 let Some(new_best_tip) = chain.last() else {
359 return;
360 };
361 let ledgers_to_keep = chain
362 .iter()
363 .map(|block| &block.block)
364 .collect::<LedgersToKeep>();
365 let mut root_snarked_ledger_updates = root_snarked_ledger_updates.clone();
366 if transition_frontier
367 .best_chain
368 .iter()
369 .any(|b| b.hash() == new_root.hash())
370 {
371 let old_chain = transition_frontier
372 .best_chain
373 .iter()
374 .map(AppliedBlock::block_with_hash);
375 root_snarked_ledger_updates
376 .extend_with_needed(new_root.block_with_hash(), old_chain);
377 }
378
379 let needed_protocol_states = if root_snarked_ledger_updates.is_empty() {
380 Default::default()
383 } else {
384 needed_protocol_states
385 .iter()
386 .chain(&transition_frontier.needed_protocol_states)
387 .map(|(k, v)| (k.clone(), v.clone()))
388 .collect()
389 };
390
391 store.dispatch(LedgerWriteAction::Init {
392 request: LedgerWriteRequest::Commit {
393 ledgers_to_keep,
394 root_snarked_ledger_updates,
395 needed_protocol_states,
396 new_root: new_root.clone(),
397 new_best_tip: new_best_tip.clone(),
398 },
399 on_init: redux::callback!(
400 on_frontier_commit_init(_request: LedgerWriteRequest) -> crate::Action {
401 TransitionFrontierSyncAction::CommitPending
402 }
403 ),
404 });
405 }
406 TransitionFrontierSyncAction::CommitPending => {}
407 TransitionFrontierSyncAction::CommitSuccess { .. } => {
408 unreachable!("handled in parent effects to avoid cloning")
409 }
410 TransitionFrontierSyncAction::Ledger(_) => {}
411 }
412 }
413}
414
415fn sync_best_tip(state: &crate::State) -> ArcBlockWithHash {
419 state.transition_frontier.sync.best_tip().unwrap().clone()
420}
421
422fn maybe_copy_ledgers_for_sync<S>(
424 store: &mut Store<S>,
425 previous_root_snarked_ledger_hash: Option<LedgerHash>,
426 best_tip: &ArcBlockWithHash,
427) -> Result<bool, String>
428where
429 S: TransitionFrontierSyncLedgerSnarkedService,
430{
431 let sync = &store.state().transition_frontier.sync;
432
433 match sync {
434 TransitionFrontierSyncState::StakingLedgerPending(_) => {
435 prepare_staking_epoch_ledger_for_sync(store, best_tip)
436 }
437 TransitionFrontierSyncState::NextEpochLedgerPending(_) => {
438 prepare_next_epoch_ledger_for_sync(store, best_tip)
439 }
440
441 TransitionFrontierSyncState::RootLedgerPending(_) => {
442 prepare_transition_frontier_root_ledger_for_sync(
443 store,
444 previous_root_snarked_ledger_hash,
445 best_tip,
446 )
447 }
448 _ => Ok(true),
449 }
450}
451
452fn prepare_staking_epoch_ledger_for_sync<S>(
455 store: &mut Store<S>,
456 best_tip: &ArcBlockWithHash,
457) -> Result<bool, String>
458where
459 S: TransitionFrontierSyncLedgerSnarkedService,
460{
461 let target = SyncLedgerTarget::staking_epoch(best_tip).snarked_ledger_hash;
462 let origin = best_tip.genesis_ledger_hash().clone();
463
464 store
465 .service()
466 .copy_snarked_ledger_contents_for_sync(vec![origin], target, false)
467}
468
469fn prepare_next_epoch_ledger_for_sync<S>(
472 store: &mut Store<S>,
473 best_tip: &ArcBlockWithHash,
474) -> Result<bool, String>
475where
476 S: TransitionFrontierSyncLedgerSnarkedService,
477{
478 let sync = &store.state().transition_frontier.sync;
479 let root_block = sync.root_block().unwrap();
480 let Some(next_epoch_sync) = SyncLedgerTarget::next_epoch(best_tip, root_block) else {
481 return Ok(false);
482 };
483 let target = next_epoch_sync.snarked_ledger_hash;
484 let origin = SyncLedgerTarget::staking_epoch(best_tip).snarked_ledger_hash;
485
486 store
487 .service()
488 .copy_snarked_ledger_contents_for_sync(vec![origin], target, false)
489}
490
491fn prepare_transition_frontier_root_ledger_for_sync<S>(
494 store: &mut Store<S>,
495 previous_root_snarked_ledger_hash: Option<LedgerHash>,
496 best_tip: &ArcBlockWithHash,
497) -> Result<bool, String>
498where
499 S: TransitionFrontierSyncLedgerSnarkedService,
500{
501 let sync = &store.state().transition_frontier.sync;
502 let root_block = sync
503 .root_block()
504 .expect("Sync root block cannot be missing");
505
506 let mut candidate_origins: Vec<LedgerHash> =
508 previous_root_snarked_ledger_hash.into_iter().collect();
509 if let Some(next_epoch) = SyncLedgerTarget::next_epoch(best_tip, root_block) {
510 candidate_origins.push(next_epoch.snarked_ledger_hash.clone());
511 }
512 candidate_origins.push(
513 SyncLedgerTarget::staking_epoch(best_tip)
514 .snarked_ledger_hash
515 .clone(),
516 );
517
518 let target = root_block.snarked_ledger_hash().clone();
519
520 store
521 .service()
522 .copy_snarked_ledger_contents_for_sync(candidate_origins, target, false)
523}