1use ark_ff::fields::arithmetic::InvalidBigInt;
2use mina_p2p_messages::{
3 gossip::GossipNetMessageV2,
4 v2::{MinaLedgerSyncLedgerAnswerStableV2, StateHash},
5};
6use openmina_core::{
7 block::{prevalidate::BlockPrevalidationError, BlockWithHash},
8 bug_condition, log,
9 transaction::TransactionWithHash,
10};
11use p2p::{
12 channels::{
13 best_tip::P2pChannelsBestTipAction,
14 rpc::{BestTipWithProof, P2pChannelsRpcAction, P2pRpcRequest, P2pRpcResponse},
15 streaming_rpc::P2pStreamingRpcResponseFull,
16 },
17 disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
18 P2pNetworkPubsubAction, PeerId,
19};
20use redux::{ActionMeta, ActionWithMeta, Dispatcher};
21
22use crate::{
23 p2p_ready,
24 snark_pool::candidate::SnarkPoolCandidateAction,
25 transaction_pool::candidate::TransactionPoolCandidateAction,
26 transition_frontier::{
27 candidate::{allow_block_too_late, TransitionFrontierCandidateAction},
28 sync::{
29 ledger::{
30 snarked::{
31 PeerLedgerQueryError, PeerLedgerQueryResponse,
32 TransitionFrontierSyncLedgerSnarkedAction,
33 },
34 staged::{
35 PeerStagedLedgerPartsFetchError, TransitionFrontierSyncLedgerStagedAction,
36 },
37 },
38 PeerBlockFetchError, TransitionFrontierSyncAction,
39 },
40 },
41 watched_accounts::{
42 WatchedAccountLedgerInitialState, WatchedAccountsLedgerInitialStateGetError,
43 },
44 Action, State, WatchedAccountsAction,
45};
46
47use super::P2pCallbacksAction;
48
49fn get_rpc_request<'a>(state: &'a State, peer_id: &PeerId) -> Option<&'a P2pRpcRequest> {
50 state
51 .p2p
52 .get_ready_peer(peer_id)
53 .and_then(|s| s.channels.rpc.local_responded_request())
54 .map(|(_, req)| req)
55}
56
57impl crate::State {
58 pub fn p2p_callback_reducer(
59 state_context: crate::Substate<Self>,
60 action: ActionWithMeta<&P2pCallbacksAction>,
61 ) {
62 let (action, meta) = action.split();
63 let time = meta.time();
64 let (dispatcher, state) = state_context.into_dispatcher_and_state();
65
66 match action {
67 P2pCallbacksAction::P2pChannelsRpcReady { peer_id } => {
68 let peer_id = *peer_id;
69
70 if state.p2p.get_peer(&peer_id).is_some_and(|p| p.is_libp2p) {
71 dispatcher.push(P2pChannelsRpcAction::RequestSend {
74 peer_id,
75 id: 0,
76 request: Box::new(P2pRpcRequest::BestTipWithProof),
77 on_init: None,
78 });
79 }
80
81 dispatcher.push(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery);
82 dispatcher.push(TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchInit);
83 dispatcher.push(TransitionFrontierSyncAction::BlocksPeersQuery);
84 }
85 P2pCallbacksAction::P2pChannelsRpcTimeout { peer_id, id } => {
86 let peer_id = *peer_id;
87 let rpc_id = *id;
88 let Some(peer) = state.p2p.get_ready_peer(&peer_id) else {
89 bug_condition!("get_ready_peer({:?}) returned None", peer_id);
90 return;
91 };
92
93 let Some(rpc_kind) = peer.channels.rpc.pending_local_rpc_kind() else {
94 bug_condition!("peer: {:?} pending_local_rpc_kind() returned None", peer_id);
95 return;
96 };
97
98 dispatcher.push(
99 TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsError {
100 peer_id,
101 rpc_id,
102 error: PeerLedgerQueryError::Timeout,
103 },
104 );
105 dispatcher.push(
106 TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressError {
107 peer_id,
108 rpc_id,
109 error: PeerLedgerQueryError::Timeout,
110 },
111 );
112 dispatcher.push(
113 TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchError {
114 peer_id,
115 rpc_id,
116 error: PeerStagedLedgerPartsFetchError::Timeout,
117 },
118 );
119 dispatcher.push(TransitionFrontierSyncAction::BlocksPeerQueryError {
120 peer_id,
121 rpc_id,
122 error: PeerBlockFetchError::Timeout,
123 });
124 dispatcher.push(P2pDisconnectionAction::Init {
125 peer_id,
126 reason: P2pDisconnectionReason::TransitionFrontierRpcTimeout(rpc_kind),
127 });
128 }
129 P2pCallbacksAction::P2pChannelsRpcResponseReceived {
130 peer_id,
131 id,
132 response,
133 } => {
134 let request = || get_rpc_request(state, peer_id);
135 State::handle_rpc_channels_response(
136 dispatcher, meta, *id, *peer_id, request, response,
137 );
138 dispatcher.push(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery);
139 dispatcher.push(TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchInit);
140 dispatcher.push(TransitionFrontierSyncAction::BlocksPeersQuery);
141 }
142 P2pCallbacksAction::P2pChannelsRpcRequestReceived {
143 peer_id,
144 id,
145 request,
146 } => {
147 State::handle_rpc_channels_request(
148 dispatcher,
149 state,
150 meta,
151 *request.clone(),
152 *peer_id,
153 *id,
154 );
155 }
156 P2pCallbacksAction::P2pChannelsStreamingRpcReady => {
157 dispatcher.push(TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchInit);
158 }
159 P2pCallbacksAction::P2pChannelsStreamingRpcTimeout { peer_id, id } => {
160 let peer_id = *peer_id;
161 let rpc_id = *id;
162
163 let Some(peer) = state.p2p.get_ready_peer(&peer_id) else {
164 bug_condition!("get_ready_peer({:?}) returned None", peer_id);
165 return;
166 };
167 let Some(rpc_kind) = peer.channels.streaming_rpc.pending_local_rpc_kind() else {
168 bug_condition!("peer: {:?} pending_local_rpc_kind() returned None", peer_id);
169 return;
170 };
171 dispatcher.push(
172 TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchError {
173 peer_id,
174 rpc_id,
175 error: PeerStagedLedgerPartsFetchError::Timeout,
176 },
177 );
178 dispatcher.push(P2pDisconnectionAction::Init {
179 peer_id,
180 reason: P2pDisconnectionReason::TransitionFrontierStreamingRpcTimeout(rpc_kind),
181 });
182 }
183 P2pCallbacksAction::P2pChannelsStreamingRpcResponseReceived {
184 peer_id,
185 id,
186 response,
187 } => {
188 let peer_id = *peer_id;
189 let rpc_id = *id;
190
191 match response {
192 None => {
193 dispatcher.push(
194 TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchError {
195 peer_id,
196 rpc_id,
197 error: PeerStagedLedgerPartsFetchError::DataUnavailable,
198 },
199 );
200 }
201 Some(P2pStreamingRpcResponseFull::StagedLedgerParts(parts)) => {
202 dispatcher.push(
203 TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchSuccess {
204 peer_id,
205 rpc_id,
206 parts: parts.clone(),
207 },
208 );
209 }
210 }
211 dispatcher.push(TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchInit);
212 }
213 P2pCallbacksAction::P2pDisconnection { peer_id } => {
214 let peer_id = *peer_id;
215
216 if let Some(s) = state.transition_frontier.sync.ledger() {
217 s.snarked()
218 .map(|s| {
219 s.peer_address_query_pending_rpc_ids(&peer_id)
220 .collect::<Vec<_>>()
221 })
222 .unwrap_or_default()
223 .into_iter()
224 .for_each(|rpc_id| {
225 dispatcher.push(
226 TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressError {
227 peer_id,
228 rpc_id,
229 error: PeerLedgerQueryError::Disconnected,
230 },
231 );
232 });
233
234 if let Some(rpc_id) = s
235 .snarked()
236 .and_then(|s| s.peer_num_accounts_rpc_id(&peer_id))
237 {
238 dispatcher.push(
239 TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsError {
240 peer_id,
241 rpc_id,
242 error: PeerLedgerQueryError::Disconnected,
243 },
244 );
245 }
246
247 if let Some(rpc_id) = s.staged().and_then(|s| s.parts_fetch_rpc_id(&peer_id)) {
248 dispatcher.push(
249 TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchError {
250 peer_id,
251 rpc_id,
252 error: PeerStagedLedgerPartsFetchError::Disconnected,
253 },
254 )
255 }
256 }
257
258 state
259 .transition_frontier
260 .sync
261 .blocks_fetch_from_peer_pending_rpc_ids(&peer_id)
262 .for_each(|rpc_id| {
263 dispatcher.push(TransitionFrontierSyncAction::BlocksPeerQueryError {
264 peer_id,
265 rpc_id,
266 error: PeerBlockFetchError::Disconnected,
267 });
268 });
269
270 state
271 .watched_accounts
272 .iter()
273 .filter_map(|(pub_key, a)| match &a.initial_state {
274 WatchedAccountLedgerInitialState::Pending {
275 peer_id: account_peer_id,
276 ..
277 } => {
278 if account_peer_id == &peer_id {
279 Some(WatchedAccountsAction::LedgerInitialStateGetError {
280 pub_key: pub_key.clone(),
281 error:
282 WatchedAccountsLedgerInitialStateGetError::PeerDisconnected,
283 })
284 } else {
285 None
286 }
287 }
288 _ => None,
289 })
290 .for_each(|action| dispatcher.push(action));
291
292 dispatcher.push(TransactionPoolCandidateAction::PeerPrune { peer_id });
293 dispatcher.push(SnarkPoolCandidateAction::PeerPrune { peer_id });
294 }
295 P2pCallbacksAction::RpcRespondBestTip { peer_id } => {
296 let Some(best_tip) = state.transition_frontier.best_tip() else {
297 bug_condition!("Best tip not found");
298 return;
299 };
300
301 dispatcher.push(P2pChannelsBestTipAction::ResponseSend {
302 peer_id: *peer_id,
303 best_tip: best_tip.clone(),
304 });
305 }
306 P2pCallbacksAction::P2pPubsubValidateMessage { message_id } => {
307 let Some(message_content) = state.p2p.ready().and_then(|p2p| {
308 p2p.network
309 .scheduler
310 .broadcast_state
311 .mcache
312 .get_message(message_id)
313 }) else {
314 bug_condition!("Failed to find message for id: {:?}", message_id);
315 return;
316 };
317
318 let pre_validation_result = match message_content {
319 GossipNetMessageV2::NewState(new_best_tip) => {
320 match BlockWithHash::try_new(new_best_tip.clone()) {
321 Ok(block) => {
322 let allow_block_too_late = allow_block_too_late(state, &block);
323 match state.prevalidate_block(&block, allow_block_too_late) {
324 Ok(()) => PreValidationResult::Continue,
325 Err(error)
326 if matches!(
327 error,
328 BlockPrevalidationError::ReceivedTooEarly { .. }
329 ) =>
330 {
331 PreValidationResult::Ignore {
332 reason: format!(
333 "Block prevalidation failed: {:?}",
334 error
335 ),
336 }
337 }
338 Err(error) => PreValidationResult::Reject {
339 reason: format!("Block prevalidation failed: {:?}", error),
340 },
341 }
342 }
343 Err(_) => {
344 log::error!(time; "P2pCallbacksAction::P2pPubsubValidateMessage: Invalid bigint in block");
345 PreValidationResult::Reject{reason: "P2pCallbacksAction::P2pPubsubValidateMessage: Invalid bigint in block".to_owned()}
346 }
347 }
348 }
349 _ => {
350 PreValidationResult::Continue
352 }
353 };
354
355 match pre_validation_result {
356 PreValidationResult::Continue => {
357 dispatcher.push(P2pNetworkPubsubAction::ValidateIncomingMessage {
358 message_id: *message_id,
359 });
360 }
361 PreValidationResult::Reject { reason } => {
362 dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
363 message_id: Some(p2p::BroadcastMessageId::MessageId {
364 message_id: *message_id,
365 }),
366 peer_id: None,
367 reason,
368 });
369 }
370 PreValidationResult::Ignore { reason } => {
371 dispatcher.push(P2pNetworkPubsubAction::IgnoreMessage {
372 message_id: Some(p2p::BroadcastMessageId::MessageId {
373 message_id: *message_id,
374 }),
375 reason,
376 });
377 }
378 }
379 }
380 }
381 }
382
383 fn handle_rpc_channels_request(
384 dispatcher: &mut Dispatcher<Action, State>,
385 state: &State,
386 meta: ActionMeta,
387 request: P2pRpcRequest,
388 peer_id: PeerId,
389 id: u64,
390 ) {
391 match request {
392 P2pRpcRequest::BestTipWithProof => {
393 let best_chain = &state.transition_frontier.best_chain;
394 let response = None.or_else(|| {
395 let best_tip = best_chain.last()?;
396 let mut chain_iter = best_chain.iter();
397 let root_block = chain_iter.next()?;
398 let Ok(body_hashes) = chain_iter
400 .map(|b| b.header().protocol_state.body.try_hash())
401 .collect::<Result<_, _>>()
402 else {
403 openmina_core::error!(meta.time(); "P2pRpcRequest::BestTipWithProof: invalid protocol state");
404 return None;
405 };
406
407 Some(BestTipWithProof {
408 best_tip: best_tip.block().clone(),
409 proof: (body_hashes, root_block.block().clone()),
410 })
411 });
412 let response = response.map(P2pRpcResponse::BestTipWithProof).map(Box::new);
413 dispatcher.push(P2pChannelsRpcAction::ResponseSend {
414 peer_id,
415 id,
416 response,
417 });
418 }
419 P2pRpcRequest::Block(hash) => {
420 let best_chain = &state.transition_frontier.best_chain;
421 let response = best_chain
422 .iter()
423 .rev()
424 .find(|b| b.hash() == &hash)
425 .map(|b| b.block().clone())
426 .map(P2pRpcResponse::Block)
427 .map(Box::new);
428 dispatcher.push(P2pChannelsRpcAction::ResponseSend {
429 peer_id,
430 id,
431 response,
432 });
433 }
434 P2pRpcRequest::LedgerQuery(..) => {
435 }
438 P2pRpcRequest::StagedLedgerAuxAndPendingCoinbasesAtBlock(..) => {
439 }
442 P2pRpcRequest::Transaction(hash) => {
443 let tx = state.transaction_pool.get(&hash);
444 let response = tx
445 .map(|v| v.forget_check())
446 .map(|tx| (&tx).into())
447 .map(P2pRpcResponse::Transaction)
448 .map(Box::new);
449
450 dispatcher.push(P2pChannelsRpcAction::ResponseSend {
451 peer_id,
452 id,
453 response,
454 });
455 }
456 P2pRpcRequest::Snark(job_id) => {
457 let job = state.snark_pool.get(&job_id);
458 let response = job
459 .and_then(|job| job.snark.as_ref())
460 .map(|snark| snark.work.clone())
461 .map(P2pRpcResponse::Snark)
462 .map(Box::new);
463
464 dispatcher.push(P2pChannelsRpcAction::ResponseSend {
465 peer_id,
466 id,
467 response,
468 });
469 }
470 P2pRpcRequest::InitialPeers => {
471 let p2p = p2p_ready!(state.p2p, meta.time());
472 let peers = p2p
473 .peers
474 .iter()
475 .filter_map(|(_, v)| v.dial_opts.clone())
476 .collect();
477 let response = Some(Box::new(P2pRpcResponse::InitialPeers(peers)));
478
479 dispatcher.push(P2pChannelsRpcAction::ResponseSend {
480 peer_id,
481 id,
482 response,
483 });
484 }
485 }
486 }
487
488 fn handle_rpc_channels_response<'a>(
489 dispatcher: &mut Dispatcher<Action, State>,
490 meta: ActionMeta,
491 id: u64,
492 peer_id: PeerId,
493 request: impl FnOnce() -> Option<&'a P2pRpcRequest>,
494 response: &Option<Box<P2pRpcResponse>>,
495 ) {
496 match response.as_deref() {
497 None => {
498 match request() {
499 Some(P2pRpcRequest::Transaction(hash)) => {
500 let hash = hash.clone();
501 dispatcher
502 .push(TransactionPoolCandidateAction::FetchError { peer_id, hash });
503 return;
504 }
505 Some(P2pRpcRequest::Snark(job_id)) => {
506 let job_id = job_id.clone();
507 dispatcher
508 .push(SnarkPoolCandidateAction::WorkFetchError { peer_id, job_id });
509 return;
510 }
511 _ => {}
512 }
513
514 dispatcher.push(
515 TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsError {
516 peer_id,
517 rpc_id: id,
518 error: PeerLedgerQueryError::DataUnavailable,
519 },
520 );
521 dispatcher.push(
522 TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressError {
523 peer_id,
524 rpc_id: id,
525 error: PeerLedgerQueryError::DataUnavailable,
526 },
527 );
528 dispatcher.push(
529 TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchError {
530 peer_id,
531 rpc_id: id,
532 error: PeerStagedLedgerPartsFetchError::DataUnavailable,
533 },
534 );
535 dispatcher.push(TransitionFrontierSyncAction::BlocksPeerQueryError {
536 peer_id,
537 rpc_id: id,
538 error: PeerBlockFetchError::DataUnavailable,
539 });
540 }
541 Some(P2pRpcResponse::BestTipWithProof(resp)) => {
542 let (body_hashes, root_block) = &resp.proof;
543
544 let (Ok(best_tip), Ok(root_block)) = (
545 BlockWithHash::try_new(resp.best_tip.clone()),
546 BlockWithHash::try_new(root_block.clone()),
547 ) else {
548 openmina_core::error!(meta.time(); "P2pRpcResponse::BestTipWithProof: invalid blocks");
549 return;
550 };
551
552 let Ok(hashes) = body_hashes
554 .iter()
555 .take(body_hashes.len().saturating_sub(1))
556 .scan(root_block.hash.clone(), |pred_hash, body_hash| {
557 *pred_hash = match StateHash::try_from_hashes(pred_hash, body_hash) {
558 Ok(hash) => hash,
559 Err(_) => return Some(Err(InvalidBigInt)),
560 };
561 Some(Ok(pred_hash.clone()))
562 })
563 .collect::<Result<Vec<_>, _>>()
564 else {
565 openmina_core::error!(meta.time(); "P2pRpcResponse::BestTipWithProof: invalid hashes");
566 return;
567 };
568
569 if let Some(pred_hash) = hashes.last() {
570 let expected_hash = &best_tip.block.header.protocol_state.previous_state_hash;
571
572 if pred_hash != expected_hash {
573 openmina_core::warn!(meta.time();
574 kind = "P2pRpcBestTipHashMismatch",
575 response = serde_json::to_string(&resp).ok(),
576 expected_hash = expected_hash.to_string(),
577 calculated_hash = pred_hash.to_string());
578 return;
579 }
580 }
581 dispatcher.push(TransitionFrontierCandidateAction::BlockChainProofUpdate {
582 hash: best_tip.hash,
583 chain_proof: (hashes, root_block),
584 });
585 }
586 Some(P2pRpcResponse::LedgerQuery(answer)) => match answer {
587 MinaLedgerSyncLedgerAnswerStableV2::ChildHashesAre(left, right) => {
588 dispatcher.push(
589 TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressSuccess {
590 peer_id,
591 rpc_id: id,
592 response: PeerLedgerQueryResponse::ChildHashes(
593 left.clone(),
594 right.clone(),
595 ),
596 },
597 );
598 }
599 MinaLedgerSyncLedgerAnswerStableV2::ContentsAre(accounts) => {
600 dispatcher.push(
601 TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressSuccess {
602 peer_id,
603 rpc_id: id,
604 response: PeerLedgerQueryResponse::ChildAccounts(
605 accounts.iter().cloned().collect(),
606 ),
607 },
608 );
609 }
610 MinaLedgerSyncLedgerAnswerStableV2::NumAccounts(count, contents_hash) => {
611 dispatcher.push(
612 TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsSuccess {
613 peer_id,
614 rpc_id: id,
615 response: PeerLedgerQueryResponse::NumAccounts(
616 count.as_u64(),
617 contents_hash.clone(),
618 ),
619 },
620 );
621 }
622 },
623 Some(P2pRpcResponse::StagedLedgerAuxAndPendingCoinbasesAtBlock(parts)) => {
624 dispatcher.push(
625 TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchSuccess {
626 peer_id,
627 rpc_id: id,
628 parts: parts.clone(),
629 },
630 );
631 }
632 Some(P2pRpcResponse::Block(block)) => {
633 let Ok(block) = BlockWithHash::try_new(block.clone()) else {
634 openmina_core::error!(meta.time(); "P2pRpcResponse::Block: invalid block");
635 return;
636 };
637 dispatcher.push(TransitionFrontierSyncAction::BlocksPeerQuerySuccess {
638 peer_id,
639 rpc_id: id,
640 response: block,
641 });
642 }
643 Some(P2pRpcResponse::Transaction(transaction)) => {
644 match TransactionWithHash::try_new(transaction.clone()) {
645 Err(err) => bug_condition!("tx hashing failed: {err}"),
646 Ok(transaction) => {
647 dispatcher.push(TransactionPoolCandidateAction::FetchSuccess {
648 peer_id,
649 transaction,
650 })
651 }
652 }
653 }
654 Some(P2pRpcResponse::Snark(snark)) => {
655 dispatcher.push(SnarkPoolCandidateAction::WorkFetchSuccess {
656 peer_id,
657 work: snark.clone(),
658 });
659 }
660 Some(P2pRpcResponse::InitialPeers(_)) => {}
661 }
662 }
663}
664
665enum PreValidationResult {
666 Continue,
667 Reject { reason: String },
668 Ignore { reason: String },
669}