mina_node/transition_frontier/sync/
transition_frontier_sync_effects.rs1use crate::p2p::{
2 channels::rpc::{P2pChannelsRpcAction, P2pRpcId},
3 P2pNetworkPubsubAction, PeerId,
4};
5use mina_core::block::{AppliedBlock, ArcBlockWithHash};
6use mina_p2p_messages::v2::LedgerHash;
7use redux::ActionMeta;
8
9use crate::{
10 ledger::write::{LedgerWriteAction, LedgerWriteRequest, LedgersToKeep},
11 p2p::channels::rpc::P2pRpcRequest,
12 p2p_ready,
13 service::TransitionFrontierSyncLedgerSnarkedService,
14 Service, Store, TransitionFrontierAction,
15};
16
17use super::{
18 ledger::{
19 snarked::TransitionFrontierSyncLedgerSnarkedAction,
20 staged::TransitionFrontierSyncLedgerStagedAction, SyncLedgerTarget,
21 TransitionFrontierSyncLedgerAction,
22 },
23 SyncError, TransitionFrontierSyncAction, TransitionFrontierSyncState,
24};
25
26impl TransitionFrontierSyncAction {
27 pub fn effects<S>(&self, meta: &ActionMeta, store: &mut Store<S>)
28 where
29 S: Service,
30 {
31 match self {
32 TransitionFrontierSyncAction::Init { best_tip, .. } => {
33 let protocol_state_body = &best_tip.block.header.protocol_state.body;
34 let genesis_ledger_hash = &protocol_state_body.blockchain_state.genesis_ledger_hash;
35 let staking_epoch_ledger_hash = &protocol_state_body
36 .consensus_state
37 .staking_epoch_data
38 .ledger
39 .hash;
40 let next_epoch_ledger_hash = &protocol_state_body
41 .consensus_state
42 .next_epoch_data
43 .ledger
44 .hash;
45
46 if genesis_ledger_hash != staking_epoch_ledger_hash {
51 store.dispatch(TransitionFrontierSyncAction::LedgerStakingPending);
52 } else if genesis_ledger_hash != next_epoch_ledger_hash {
53 store.dispatch(TransitionFrontierSyncAction::LedgerNextEpochPending);
54 } else {
55 store.dispatch(TransitionFrontierSyncAction::LedgerRootPending);
56 }
57 }
58 TransitionFrontierSyncAction::BestTipUpdate {
59 previous_root_snarked_ledger_hash,
60 best_tip,
61 on_success,
62 ..
63 } => {
64 maybe_copy_ledgers_for_sync(
67 store,
68 previous_root_snarked_ledger_hash.clone(),
69 best_tip,
70 )
71 .unwrap();
72
73 store.dispatch(TransitionFrontierSyncLedgerAction::Init);
75 store.dispatch(TransitionFrontierSyncLedgerStagedAction::PartsFetchPending);
78 store.dispatch(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery);
79 store.dispatch(TransitionFrontierSyncAction::BlocksPeersQuery);
81 store.dispatch(TransitionFrontierSyncAction::BlocksNextApplyInit);
83
84 if let Some(callback) = on_success {
86 store.dispatch_callback(callback.clone(), ());
87 }
88 }
89 TransitionFrontierSyncAction::LedgerStakingPending => {
98 prepare_staking_epoch_ledger_for_sync(store, &sync_best_tip(store.state()))
99 .unwrap();
100
101 store.dispatch(TransitionFrontierSyncLedgerAction::Init);
102 }
103 TransitionFrontierSyncAction::LedgerStakingSuccess => {
104 if store.dispatch(TransitionFrontierSyncAction::LedgerNextEpochPending) {
105 } else if store.dispatch(TransitionFrontierSyncAction::LedgerRootPending) {
106 }
107 }
108 TransitionFrontierSyncAction::LedgerNextEpochPending => {
109 prepare_next_epoch_ledger_for_sync(store, &sync_best_tip(store.state())).unwrap();
110
111 store.dispatch(TransitionFrontierSyncLedgerAction::Init);
112 }
113 TransitionFrontierSyncAction::LedgerNextEpochSuccess => {
114 store.dispatch(TransitionFrontierSyncAction::LedgerRootPending);
115 }
116 TransitionFrontierSyncAction::LedgerRootPending => {
117 prepare_transition_frontier_root_ledger_for_sync(
118 store,
119 None,
120 &sync_best_tip(store.state()),
121 )
122 .unwrap();
123
124 store.dispatch(TransitionFrontierSyncLedgerAction::Init);
125 }
126 TransitionFrontierSyncAction::LedgerRootSuccess => {
127 store.dispatch(TransitionFrontierSyncAction::BlocksPending);
128 }
129 TransitionFrontierSyncAction::BlocksPending => {
130 if !store.dispatch(TransitionFrontierSyncAction::BlocksSuccess) {
131 store.dispatch(TransitionFrontierSyncAction::BlocksPeersQuery);
132 }
133 }
134 TransitionFrontierSyncAction::BlocksPeersQuery => {
135 let p2p = p2p_ready!(store.state().p2p, meta.time());
136 let mut peer_ids = p2p
138 .ready_peers_iter()
139 .filter(|(_, p)| p.channels.rpc.can_send_request())
140 .map(|(id, p)| (*id, p.connected_since))
141 .collect::<Vec<_>>();
142 peer_ids.sort_by(|(_, t1), (_, t2)| t2.cmp(t1));
143
144 let mut retry_hashes = store
145 .state()
146 .transition_frontier
147 .sync
148 .blocks_fetch_retry_iter()
149 .collect::<Vec<_>>();
150 retry_hashes.reverse();
151
152 for (peer_id, _) in peer_ids {
153 if let Some(hash) = retry_hashes.last() {
154 if store.dispatch(TransitionFrontierSyncAction::BlocksPeerQueryRetry {
155 peer_id,
156 hash: hash.clone(),
157 }) {
158 retry_hashes.pop();
159 continue;
160 }
161 }
162
163 match store.state().transition_frontier.sync.blocks_fetch_next() {
164 Some(hash) => {
165 store.dispatch(TransitionFrontierSyncAction::BlocksPeerQueryInit {
166 peer_id,
167 hash,
168 });
169 }
170 None if retry_hashes.is_empty() => break,
171 None => {}
172 }
173 }
174 }
175 TransitionFrontierSyncAction::BlocksPeerQueryInit { hash, peer_id } => {
176 let p2p = p2p_ready!(store.state().p2p, meta.time());
177 let Some(rpc_id) = p2p
178 .get_ready_peer(peer_id)
179 .map(|v| v.channels.next_local_rpc_id())
180 else {
181 return;
182 };
183
184 store.dispatch(P2pChannelsRpcAction::RequestSend {
185 peer_id: *peer_id,
186 id: rpc_id,
187 request: Box::new(P2pRpcRequest::Block(hash.clone())),
188 on_init: Some(redux::callback!(
189 on_send_p2p_block_rpc_request(
190 (peer_id: PeerId, rpc_id: P2pRpcId, request: P2pRpcRequest)
191 ) -> crate::Action {
192 let P2pRpcRequest::Block(hash) = request else {
193 unreachable!()
194 };
195 TransitionFrontierSyncAction::BlocksPeerQueryPending {
196 hash,
197 peer_id,
198 rpc_id,
199 }
200 }
201 )),
202 });
203 }
204 TransitionFrontierSyncAction::BlocksPeerQueryRetry { hash, peer_id } => {
205 let p2p = p2p_ready!(store.state().p2p, meta.time());
206 let Some(rpc_id) = p2p
207 .get_ready_peer(peer_id)
208 .map(|v| v.channels.next_local_rpc_id())
209 else {
210 return;
211 };
212
213 store.dispatch(P2pChannelsRpcAction::RequestSend {
214 peer_id: *peer_id,
215 id: rpc_id,
216 request: Box::new(P2pRpcRequest::Block(hash.clone())),
217 on_init: Some(redux::callback!(
218 on_send_p2p_block_rpc_request_retry(
219 (peer_id: PeerId, rpc_id: P2pRpcId, request: P2pRpcRequest)
220 ) -> crate::Action {
221 let P2pRpcRequest::Block(hash) = request else {
222 unreachable!()
223 };
224 TransitionFrontierSyncAction::BlocksPeerQueryPending {
225 hash,
226 peer_id,
227 rpc_id,
228 }
229 }
230 )),
231 });
232 }
233 TransitionFrontierSyncAction::BlocksPeerQueryPending { .. } => {}
234 TransitionFrontierSyncAction::BlocksPeerQueryError { .. } => {
235 store.dispatch(TransitionFrontierSyncAction::BlocksPeersQuery);
236 }
237 TransitionFrontierSyncAction::BlocksPeerQuerySuccess { response, .. } => {
238 store.dispatch(TransitionFrontierSyncAction::BlocksPeersQuery);
239 store.dispatch(TransitionFrontierSyncAction::BlocksFetchSuccess {
240 hash: response.hash.clone(),
241 });
242 }
243 TransitionFrontierSyncAction::BlocksFetchSuccess { .. } => {
244 let _ = store;
245 store.dispatch(TransitionFrontierSyncAction::BlocksNextApplyInit {});
246 }
247 TransitionFrontierSyncAction::BlocksNextApplyInit => {
248 let Some((block, pred_block)) = store
249 .state()
250 .transition_frontier
251 .sync
252 .blocks_apply_next()
253 .map(|v| (v.0.clone(), v.1.clone()))
254 else {
255 return;
256 };
257 let hash = block.hash.clone();
258
259 let is_our_block;
260
261 if let Some(stats) = store.service.stats() {
262 stats.block_producer().block_apply_start(meta.time(), &hash);
263 is_our_block = stats.block_producer().is_our_just_produced_block(&hash);
266 } else {
267 is_our_block = false;
268 }
269
270 let skip_verification = is_our_block
274 || super::CATCHUP_BLOCK_VERIFY_TAIL_LENGTH
275 < store.state().transition_frontier.sync.pending_count();
276
277 store.dispatch(LedgerWriteAction::Init {
278 request: LedgerWriteRequest::BlockApply {
279 block,
280 pred_block,
281 skip_verification,
282 },
283 on_init: redux::callback!(
284 on_block_next_apply_init(request: LedgerWriteRequest) -> crate::Action {
285 let LedgerWriteRequest::BlockApply {
286 block,
287 pred_block: _,
288 skip_verification: _,
289 } = request
290 else {
291 unreachable!()
292 };
293 let hash = block.hash().clone();
294 TransitionFrontierSyncAction::BlocksNextApplyPending { hash }
295 }
296 ),
297 });
298 }
299 TransitionFrontierSyncAction::BlocksNextApplyPending { .. } => {}
300 TransitionFrontierSyncAction::BlocksNextApplyError { hash, error } => {
301 let Some((best_tip, failed_block)) = None.or_else(|| {
302 Some((
303 store.state().transition_frontier.sync.best_tip()?.clone(),
304 store
305 .state()
306 .transition_frontier
307 .sync
308 .block_state(hash)?
309 .block()?,
310 ))
311 }) else {
312 return;
313 };
314 let error = SyncError::BlockApplyFailed(failed_block.clone(), error.clone());
315 store.dispatch(TransitionFrontierAction::SyncFailed { best_tip, error });
316 store.dispatch(P2pNetworkPubsubAction::RejectMessage {
318 message_id: Some(crate::p2p::BroadcastMessageId::BlockHash {
319 hash: hash.clone(),
320 }),
321 peer_id: None,
322 reason: "Failed to apply block".to_owned(),
323 });
324 }
325 TransitionFrontierSyncAction::BlocksNextApplySuccess {
326 hash,
327 just_emitted_a_proof: _,
328 } => {
329 if let Some(stats) = store.service.stats() {
330 stats.block_producer().block_apply_end(meta.time(), hash);
331 }
332
333 if !store.dispatch(TransitionFrontierSyncAction::BlocksNextApplyInit) {
334 store.dispatch(TransitionFrontierSyncAction::BlocksSuccess);
335 }
336 }
337 TransitionFrontierSyncAction::BlocksSendToArchive { data, .. } => {
338 store.service().send_to_archive(data.clone());
339 }
340 TransitionFrontierSyncAction::BlocksSuccess => {}
341 TransitionFrontierSyncAction::CommitInit => {
347 let transition_frontier = &store.state.get().transition_frontier;
348 let TransitionFrontierSyncState::BlocksSuccess {
349 chain,
350 root_snarked_ledger_updates,
351 needed_protocol_states,
352 ..
353 } = &transition_frontier.sync
354 else {
355 return;
356 };
357 let Some(new_root) = chain.first() else {
358 return;
359 };
360 let Some(new_best_tip) = chain.last() else {
361 return;
362 };
363 let ledgers_to_keep = chain
364 .iter()
365 .map(|block| &block.block)
366 .collect::<LedgersToKeep>();
367 let mut root_snarked_ledger_updates = root_snarked_ledger_updates.clone();
368 if transition_frontier
369 .best_chain
370 .iter()
371 .any(|b| b.hash() == new_root.hash())
372 {
373 let old_chain = transition_frontier
374 .best_chain
375 .iter()
376 .map(AppliedBlock::block_with_hash);
377 root_snarked_ledger_updates
378 .extend_with_needed(new_root.block_with_hash(), old_chain);
379 }
380
381 let needed_protocol_states = if root_snarked_ledger_updates.is_empty() {
382 Default::default()
385 } else {
386 needed_protocol_states
387 .iter()
388 .chain(&transition_frontier.needed_protocol_states)
389 .map(|(k, v)| (k.clone(), v.clone()))
390 .collect()
391 };
392
393 store.dispatch(LedgerWriteAction::Init {
394 request: LedgerWriteRequest::Commit {
395 ledgers_to_keep,
396 root_snarked_ledger_updates,
397 needed_protocol_states,
398 new_root: new_root.clone(),
399 new_best_tip: new_best_tip.clone(),
400 },
401 on_init: redux::callback!(
402 on_frontier_commit_init(_request: LedgerWriteRequest) -> crate::Action {
403 TransitionFrontierSyncAction::CommitPending
404 }
405 ),
406 });
407 }
408 TransitionFrontierSyncAction::CommitPending => {}
409 TransitionFrontierSyncAction::CommitSuccess { .. } => {
410 unreachable!("handled in parent effects to avoid cloning")
411 }
412 TransitionFrontierSyncAction::Ledger(_) => {}
413 }
414 }
415}
416
417fn sync_best_tip(state: &crate::State) -> ArcBlockWithHash {
421 state.transition_frontier.sync.best_tip().unwrap().clone()
422}
423
424fn maybe_copy_ledgers_for_sync<S>(
426 store: &mut Store<S>,
427 previous_root_snarked_ledger_hash: Option<LedgerHash>,
428 best_tip: &ArcBlockWithHash,
429) -> Result<bool, String>
430where
431 S: TransitionFrontierSyncLedgerSnarkedService,
432{
433 let sync = &store.state().transition_frontier.sync;
434
435 match sync {
436 TransitionFrontierSyncState::StakingLedgerPending(_) => {
437 prepare_staking_epoch_ledger_for_sync(store, best_tip)
438 }
439 TransitionFrontierSyncState::NextEpochLedgerPending(_) => {
440 prepare_next_epoch_ledger_for_sync(store, best_tip)
441 }
442
443 TransitionFrontierSyncState::RootLedgerPending(_) => {
444 prepare_transition_frontier_root_ledger_for_sync(
445 store,
446 previous_root_snarked_ledger_hash,
447 best_tip,
448 )
449 }
450 _ => Ok(true),
451 }
452}
453
454fn prepare_staking_epoch_ledger_for_sync<S>(
457 store: &mut Store<S>,
458 best_tip: &ArcBlockWithHash,
459) -> Result<bool, String>
460where
461 S: TransitionFrontierSyncLedgerSnarkedService,
462{
463 let target = SyncLedgerTarget::staking_epoch(best_tip).snarked_ledger_hash;
464 let origin = best_tip.genesis_ledger_hash().clone();
465
466 store
467 .service()
468 .copy_snarked_ledger_contents_for_sync(vec![origin], target, false)
469}
470
471fn prepare_next_epoch_ledger_for_sync<S>(
474 store: &mut Store<S>,
475 best_tip: &ArcBlockWithHash,
476) -> Result<bool, String>
477where
478 S: TransitionFrontierSyncLedgerSnarkedService,
479{
480 let sync = &store.state().transition_frontier.sync;
481 let root_block = sync.root_block().unwrap();
482 let Some(next_epoch_sync) = SyncLedgerTarget::next_epoch(best_tip, root_block) else {
483 return Ok(false);
484 };
485 let target = next_epoch_sync.snarked_ledger_hash;
486 let origin = SyncLedgerTarget::staking_epoch(best_tip).snarked_ledger_hash;
487
488 store
489 .service()
490 .copy_snarked_ledger_contents_for_sync(vec![origin], target, false)
491}
492
493fn prepare_transition_frontier_root_ledger_for_sync<S>(
496 store: &mut Store<S>,
497 previous_root_snarked_ledger_hash: Option<LedgerHash>,
498 best_tip: &ArcBlockWithHash,
499) -> Result<bool, String>
500where
501 S: TransitionFrontierSyncLedgerSnarkedService,
502{
503 let sync = &store.state().transition_frontier.sync;
504 let root_block = sync
505 .root_block()
506 .expect("Sync root block cannot be missing");
507
508 let mut candidate_origins: Vec<LedgerHash> =
510 previous_root_snarked_ledger_hash.into_iter().collect();
511 if let Some(next_epoch) = SyncLedgerTarget::next_epoch(best_tip, root_block) {
512 candidate_origins.push(next_epoch.snarked_ledger_hash.clone());
513 }
514 candidate_origins.push(
515 SyncLedgerTarget::staking_epoch(best_tip)
516 .snarked_ledger_hash
517 .clone(),
518 );
519
520 let target = root_block.snarked_ledger_hash().clone();
521
522 store
523 .service()
524 .copy_snarked_ledger_contents_for_sync(candidate_origins, target, false)
525}