node/p2p/callbacks/
p2p_callbacks_reducer.rs

1use ark_ff::fields::arithmetic::InvalidBigInt;
2use mina_p2p_messages::{
3    gossip::GossipNetMessageV2,
4    v2::{MinaLedgerSyncLedgerAnswerStableV2, StateHash},
5};
6use openmina_core::{
7    block::{prevalidate::BlockPrevalidationError, BlockWithHash},
8    bug_condition, log,
9    transaction::TransactionWithHash,
10};
11use p2p::{
12    channels::{
13        best_tip::P2pChannelsBestTipAction,
14        rpc::{BestTipWithProof, P2pChannelsRpcAction, P2pRpcRequest, P2pRpcResponse},
15        streaming_rpc::P2pStreamingRpcResponseFull,
16    },
17    disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
18    P2pNetworkPubsubAction, PeerId,
19};
20use redux::{ActionMeta, ActionWithMeta, Dispatcher};
21
22use crate::{
23    p2p_ready,
24    snark_pool::candidate::SnarkPoolCandidateAction,
25    transaction_pool::candidate::TransactionPoolCandidateAction,
26    transition_frontier::{
27        candidate::{allow_block_too_late, TransitionFrontierCandidateAction},
28        sync::{
29            ledger::{
30                snarked::{
31                    PeerLedgerQueryError, PeerLedgerQueryResponse,
32                    TransitionFrontierSyncLedgerSnarkedAction,
33                },
34                staged::{
35                    PeerStagedLedgerPartsFetchError, TransitionFrontierSyncLedgerStagedAction,
36                },
37            },
38            PeerBlockFetchError, TransitionFrontierSyncAction,
39        },
40    },
41    watched_accounts::{
42        WatchedAccountLedgerInitialState, WatchedAccountsLedgerInitialStateGetError,
43    },
44    Action, State, WatchedAccountsAction,
45};
46
47use super::P2pCallbacksAction;
48
49fn get_rpc_request<'a>(state: &'a State, peer_id: &PeerId) -> Option<&'a P2pRpcRequest> {
50    state
51        .p2p
52        .get_ready_peer(peer_id)
53        .and_then(|s| s.channels.rpc.local_responded_request())
54        .map(|(_, req)| req)
55}
56
57impl crate::State {
58    pub fn p2p_callback_reducer(
59        state_context: crate::Substate<Self>,
60        action: ActionWithMeta<&P2pCallbacksAction>,
61    ) {
62        let (action, meta) = action.split();
63        let time = meta.time();
64        let (dispatcher, state) = state_context.into_dispatcher_and_state();
65
66        match action {
67            P2pCallbacksAction::P2pChannelsRpcReady { peer_id } => {
68                let peer_id = *peer_id;
69
70                if state.p2p.get_peer(&peer_id).is_some_and(|p| p.is_libp2p) {
71                    // for webrtc peers, we don't need to send this rpc, as we
72                    // will receive current best tip in best tip channel anyways.
73                    dispatcher.push(P2pChannelsRpcAction::RequestSend {
74                        peer_id,
75                        id: 0,
76                        request: Box::new(P2pRpcRequest::BestTipWithProof),
77                        on_init: None,
78                    });
79                }
80
81                dispatcher.push(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery);
82                dispatcher.push(TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchInit);
83                dispatcher.push(TransitionFrontierSyncAction::BlocksPeersQuery);
84            }
85            P2pCallbacksAction::P2pChannelsRpcTimeout { peer_id, id } => {
86                let peer_id = *peer_id;
87                let rpc_id = *id;
88                let Some(peer) = state.p2p.get_ready_peer(&peer_id) else {
89                    bug_condition!("get_ready_peer({:?}) returned None", peer_id);
90                    return;
91                };
92
93                let Some(rpc_kind) = peer.channels.rpc.pending_local_rpc_kind() else {
94                    bug_condition!("peer: {:?} pending_local_rpc_kind() returned None", peer_id);
95                    return;
96                };
97
98                dispatcher.push(
99                    TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsError {
100                        peer_id,
101                        rpc_id,
102                        error: PeerLedgerQueryError::Timeout,
103                    },
104                );
105                dispatcher.push(
106                    TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressError {
107                        peer_id,
108                        rpc_id,
109                        error: PeerLedgerQueryError::Timeout,
110                    },
111                );
112                dispatcher.push(
113                    TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchError {
114                        peer_id,
115                        rpc_id,
116                        error: PeerStagedLedgerPartsFetchError::Timeout,
117                    },
118                );
119                dispatcher.push(TransitionFrontierSyncAction::BlocksPeerQueryError {
120                    peer_id,
121                    rpc_id,
122                    error: PeerBlockFetchError::Timeout,
123                });
124                dispatcher.push(P2pDisconnectionAction::Init {
125                    peer_id,
126                    reason: P2pDisconnectionReason::TransitionFrontierRpcTimeout(rpc_kind),
127                });
128            }
129            P2pCallbacksAction::P2pChannelsRpcResponseReceived {
130                peer_id,
131                id,
132                response,
133            } => {
134                let request = || get_rpc_request(state, peer_id);
135                State::handle_rpc_channels_response(
136                    dispatcher, meta, *id, *peer_id, request, response,
137                );
138                dispatcher.push(TransitionFrontierSyncLedgerSnarkedAction::PeersQuery);
139                dispatcher.push(TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchInit);
140                dispatcher.push(TransitionFrontierSyncAction::BlocksPeersQuery);
141            }
142            P2pCallbacksAction::P2pChannelsRpcRequestReceived {
143                peer_id,
144                id,
145                request,
146            } => {
147                State::handle_rpc_channels_request(
148                    dispatcher,
149                    state,
150                    meta,
151                    *request.clone(),
152                    *peer_id,
153                    *id,
154                );
155            }
156            P2pCallbacksAction::P2pChannelsStreamingRpcReady => {
157                dispatcher.push(TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchInit);
158            }
159            P2pCallbacksAction::P2pChannelsStreamingRpcTimeout { peer_id, id } => {
160                let peer_id = *peer_id;
161                let rpc_id = *id;
162
163                let Some(peer) = state.p2p.get_ready_peer(&peer_id) else {
164                    bug_condition!("get_ready_peer({:?}) returned None", peer_id);
165                    return;
166                };
167                let Some(rpc_kind) = peer.channels.streaming_rpc.pending_local_rpc_kind() else {
168                    bug_condition!("peer: {:?} pending_local_rpc_kind() returned None", peer_id);
169                    return;
170                };
171                dispatcher.push(
172                    TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchError {
173                        peer_id,
174                        rpc_id,
175                        error: PeerStagedLedgerPartsFetchError::Timeout,
176                    },
177                );
178                dispatcher.push(P2pDisconnectionAction::Init {
179                    peer_id,
180                    reason: P2pDisconnectionReason::TransitionFrontierStreamingRpcTimeout(rpc_kind),
181                });
182            }
183            P2pCallbacksAction::P2pChannelsStreamingRpcResponseReceived {
184                peer_id,
185                id,
186                response,
187            } => {
188                let peer_id = *peer_id;
189                let rpc_id = *id;
190
191                match response {
192                    None => {
193                        dispatcher.push(
194                            TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchError {
195                                peer_id,
196                                rpc_id,
197                                error: PeerStagedLedgerPartsFetchError::DataUnavailable,
198                            },
199                        );
200                    }
201                    Some(P2pStreamingRpcResponseFull::StagedLedgerParts(parts)) => {
202                        dispatcher.push(
203                            TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchSuccess {
204                                peer_id,
205                                rpc_id,
206                                parts: parts.clone(),
207                            },
208                        );
209                    }
210                }
211                dispatcher.push(TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchInit);
212            }
213            P2pCallbacksAction::P2pDisconnection { peer_id } => {
214                let peer_id = *peer_id;
215
216                if let Some(s) = state.transition_frontier.sync.ledger() {
217                    s.snarked()
218                        .map(|s| {
219                            s.peer_address_query_pending_rpc_ids(&peer_id)
220                                .collect::<Vec<_>>()
221                        })
222                        .unwrap_or_default()
223                        .into_iter()
224                        .for_each(|rpc_id| {
225                            dispatcher.push(
226                                TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressError {
227                                    peer_id,
228                                    rpc_id,
229                                    error: PeerLedgerQueryError::Disconnected,
230                                },
231                            );
232                        });
233
234                    if let Some(rpc_id) = s
235                        .snarked()
236                        .and_then(|s| s.peer_num_accounts_rpc_id(&peer_id))
237                    {
238                        dispatcher.push(
239                            TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsError {
240                                peer_id,
241                                rpc_id,
242                                error: PeerLedgerQueryError::Disconnected,
243                            },
244                        );
245                    }
246
247                    if let Some(rpc_id) = s.staged().and_then(|s| s.parts_fetch_rpc_id(&peer_id)) {
248                        dispatcher.push(
249                            TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchError {
250                                peer_id,
251                                rpc_id,
252                                error: PeerStagedLedgerPartsFetchError::Disconnected,
253                            },
254                        )
255                    }
256                }
257
258                state
259                    .transition_frontier
260                    .sync
261                    .blocks_fetch_from_peer_pending_rpc_ids(&peer_id)
262                    .for_each(|rpc_id| {
263                        dispatcher.push(TransitionFrontierSyncAction::BlocksPeerQueryError {
264                            peer_id,
265                            rpc_id,
266                            error: PeerBlockFetchError::Disconnected,
267                        });
268                    });
269
270                state
271                    .watched_accounts
272                    .iter()
273                    .filter_map(|(pub_key, a)| match &a.initial_state {
274                        WatchedAccountLedgerInitialState::Pending {
275                            peer_id: account_peer_id,
276                            ..
277                        } => {
278                            if account_peer_id == &peer_id {
279                                Some(WatchedAccountsAction::LedgerInitialStateGetError {
280                                    pub_key: pub_key.clone(),
281                                    error:
282                                        WatchedAccountsLedgerInitialStateGetError::PeerDisconnected,
283                                })
284                            } else {
285                                None
286                            }
287                        }
288                        _ => None,
289                    })
290                    .for_each(|action| dispatcher.push(action));
291
292                dispatcher.push(TransactionPoolCandidateAction::PeerPrune { peer_id });
293                dispatcher.push(SnarkPoolCandidateAction::PeerPrune { peer_id });
294            }
295            P2pCallbacksAction::RpcRespondBestTip { peer_id } => {
296                let Some(best_tip) = state.transition_frontier.best_tip() else {
297                    bug_condition!("Best tip not found");
298                    return;
299                };
300
301                dispatcher.push(P2pChannelsBestTipAction::ResponseSend {
302                    peer_id: *peer_id,
303                    best_tip: best_tip.clone(),
304                });
305            }
306            P2pCallbacksAction::P2pPubsubValidateMessage { message_id } => {
307                let Some(message_content) = state.p2p.ready().and_then(|p2p| {
308                    p2p.network
309                        .scheduler
310                        .broadcast_state
311                        .mcache
312                        .get_message(message_id)
313                }) else {
314                    bug_condition!("Failed to find message for id: {:?}", message_id);
315                    return;
316                };
317
318                let pre_validation_result = match message_content {
319                    GossipNetMessageV2::NewState(new_best_tip) => {
320                        match BlockWithHash::try_new(new_best_tip.clone()) {
321                            Ok(block) => {
322                                let allow_block_too_late = allow_block_too_late(state, &block);
323                                match state.prevalidate_block(&block, allow_block_too_late) {
324                                    Ok(()) => PreValidationResult::Continue,
325                                    Err(error)
326                                        if matches!(
327                                            error,
328                                            BlockPrevalidationError::ReceivedTooEarly { .. }
329                                        ) =>
330                                    {
331                                        PreValidationResult::Ignore {
332                                            reason: format!(
333                                                "Block prevalidation failed: {:?}",
334                                                error
335                                            ),
336                                        }
337                                    }
338                                    Err(error) => PreValidationResult::Reject {
339                                        reason: format!("Block prevalidation failed: {:?}", error),
340                                    },
341                                }
342                            }
343                            Err(_) => {
344                                log::error!(time; "P2pCallbacksAction::P2pPubsubValidateMessage: Invalid bigint in block");
345                                PreValidationResult::Reject{reason: "P2pCallbacksAction::P2pPubsubValidateMessage: Invalid bigint in block".to_owned()}
346                            }
347                        }
348                    }
349                    _ => {
350                        // TODO: add pre validation for Snark pool and Transaction pool diffs
351                        PreValidationResult::Continue
352                    }
353                };
354
355                match pre_validation_result {
356                    PreValidationResult::Continue => {
357                        dispatcher.push(P2pNetworkPubsubAction::ValidateIncomingMessage {
358                            message_id: *message_id,
359                        });
360                    }
361                    PreValidationResult::Reject { reason } => {
362                        dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
363                            message_id: Some(p2p::BroadcastMessageId::MessageId {
364                                message_id: *message_id,
365                            }),
366                            peer_id: None,
367                            reason,
368                        });
369                    }
370                    PreValidationResult::Ignore { reason } => {
371                        dispatcher.push(P2pNetworkPubsubAction::IgnoreMessage {
372                            message_id: Some(p2p::BroadcastMessageId::MessageId {
373                                message_id: *message_id,
374                            }),
375                            reason,
376                        });
377                    }
378                }
379            }
380        }
381    }
382
383    fn handle_rpc_channels_request(
384        dispatcher: &mut Dispatcher<Action, State>,
385        state: &State,
386        meta: ActionMeta,
387        request: P2pRpcRequest,
388        peer_id: PeerId,
389        id: u64,
390    ) {
391        match request {
392            P2pRpcRequest::BestTipWithProof => {
393                let best_chain = &state.transition_frontier.best_chain;
394                let response = None.or_else(|| {
395                    let best_tip = best_chain.last()?;
396                    let mut chain_iter = best_chain.iter();
397                    let root_block = chain_iter.next()?;
398                    // TODO(binier): cache body hashes
399                    let Ok(body_hashes) = chain_iter
400                        .map(|b| b.header().protocol_state.body.try_hash())
401                        .collect::<Result<_, _>>()
402                    else {
403                        openmina_core::error!(meta.time(); "P2pRpcRequest::BestTipWithProof: invalid protocol state");
404                        return None;
405                    };
406
407                    Some(BestTipWithProof {
408                        best_tip: best_tip.block().clone(),
409                        proof: (body_hashes, root_block.block().clone()),
410                    })
411                });
412                let response = response.map(P2pRpcResponse::BestTipWithProof).map(Box::new);
413                dispatcher.push(P2pChannelsRpcAction::ResponseSend {
414                    peer_id,
415                    id,
416                    response,
417                });
418            }
419            P2pRpcRequest::Block(hash) => {
420                let best_chain = &state.transition_frontier.best_chain;
421                let response = best_chain
422                    .iter()
423                    .rev()
424                    .find(|b| b.hash() == &hash)
425                    .map(|b| b.block().clone())
426                    .map(P2pRpcResponse::Block)
427                    .map(Box::new);
428                dispatcher.push(P2pChannelsRpcAction::ResponseSend {
429                    peer_id,
430                    id,
431                    response,
432                });
433            }
434            P2pRpcRequest::LedgerQuery(..) => {
435                // async ledger request will be triggered
436                // by `LedgerReadAction::FindTodos`.
437            }
438            P2pRpcRequest::StagedLedgerAuxAndPendingCoinbasesAtBlock(..) => {
439                // async ledger request will be triggered
440                // by `LedgerReadAction::FindTodos`.
441            }
442            P2pRpcRequest::Transaction(hash) => {
443                let tx = state.transaction_pool.get(&hash);
444                let response = tx
445                    .map(|v| v.forget_check())
446                    .map(|tx| (&tx).into())
447                    .map(P2pRpcResponse::Transaction)
448                    .map(Box::new);
449
450                dispatcher.push(P2pChannelsRpcAction::ResponseSend {
451                    peer_id,
452                    id,
453                    response,
454                });
455            }
456            P2pRpcRequest::Snark(job_id) => {
457                let job = state.snark_pool.get(&job_id);
458                let response = job
459                    .and_then(|job| job.snark.as_ref())
460                    .map(|snark| snark.work.clone())
461                    .map(P2pRpcResponse::Snark)
462                    .map(Box::new);
463
464                dispatcher.push(P2pChannelsRpcAction::ResponseSend {
465                    peer_id,
466                    id,
467                    response,
468                });
469            }
470            P2pRpcRequest::InitialPeers => {
471                let p2p = p2p_ready!(state.p2p, meta.time());
472                let peers = p2p
473                    .peers
474                    .iter()
475                    .filter_map(|(_, v)| v.dial_opts.clone())
476                    .collect();
477                let response = Some(Box::new(P2pRpcResponse::InitialPeers(peers)));
478
479                dispatcher.push(P2pChannelsRpcAction::ResponseSend {
480                    peer_id,
481                    id,
482                    response,
483                });
484            }
485        }
486    }
487
488    fn handle_rpc_channels_response<'a>(
489        dispatcher: &mut Dispatcher<Action, State>,
490        meta: ActionMeta,
491        id: u64,
492        peer_id: PeerId,
493        request: impl FnOnce() -> Option<&'a P2pRpcRequest>,
494        response: &Option<Box<P2pRpcResponse>>,
495    ) {
496        match response.as_deref() {
497            None => {
498                match request() {
499                    Some(P2pRpcRequest::Transaction(hash)) => {
500                        let hash = hash.clone();
501                        dispatcher
502                            .push(TransactionPoolCandidateAction::FetchError { peer_id, hash });
503                        return;
504                    }
505                    Some(P2pRpcRequest::Snark(job_id)) => {
506                        let job_id = job_id.clone();
507                        dispatcher
508                            .push(SnarkPoolCandidateAction::WorkFetchError { peer_id, job_id });
509                        return;
510                    }
511                    _ => {}
512                }
513
514                dispatcher.push(
515                    TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsError {
516                        peer_id,
517                        rpc_id: id,
518                        error: PeerLedgerQueryError::DataUnavailable,
519                    },
520                );
521                dispatcher.push(
522                    TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressError {
523                        peer_id,
524                        rpc_id: id,
525                        error: PeerLedgerQueryError::DataUnavailable,
526                    },
527                );
528                dispatcher.push(
529                    TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchError {
530                        peer_id,
531                        rpc_id: id,
532                        error: PeerStagedLedgerPartsFetchError::DataUnavailable,
533                    },
534                );
535                dispatcher.push(TransitionFrontierSyncAction::BlocksPeerQueryError {
536                    peer_id,
537                    rpc_id: id,
538                    error: PeerBlockFetchError::DataUnavailable,
539                });
540            }
541            Some(P2pRpcResponse::BestTipWithProof(resp)) => {
542                let (body_hashes, root_block) = &resp.proof;
543
544                let (Ok(best_tip), Ok(root_block)) = (
545                    BlockWithHash::try_new(resp.best_tip.clone()),
546                    BlockWithHash::try_new(root_block.clone()),
547                ) else {
548                    openmina_core::error!(meta.time(); "P2pRpcResponse::BestTipWithProof: invalid blocks");
549                    return;
550                };
551
552                // reconstruct hashes
553                let Ok(hashes) = body_hashes
554                    .iter()
555                    .take(body_hashes.len().saturating_sub(1))
556                    .scan(root_block.hash.clone(), |pred_hash, body_hash| {
557                        *pred_hash = match StateHash::try_from_hashes(pred_hash, body_hash) {
558                            Ok(hash) => hash,
559                            Err(_) => return Some(Err(InvalidBigInt)),
560                        };
561                        Some(Ok(pred_hash.clone()))
562                    })
563                    .collect::<Result<Vec<_>, _>>()
564                else {
565                    openmina_core::error!(meta.time(); "P2pRpcResponse::BestTipWithProof: invalid hashes");
566                    return;
567                };
568
569                if let Some(pred_hash) = hashes.last() {
570                    let expected_hash = &best_tip.block.header.protocol_state.previous_state_hash;
571
572                    if pred_hash != expected_hash {
573                        openmina_core::warn!(meta.time();
574                        kind = "P2pRpcBestTipHashMismatch",
575                        response = serde_json::to_string(&resp).ok(),
576                        expected_hash = expected_hash.to_string(),
577                        calculated_hash = pred_hash.to_string());
578                        return;
579                    }
580                }
581                dispatcher.push(TransitionFrontierCandidateAction::BlockChainProofUpdate {
582                    hash: best_tip.hash,
583                    chain_proof: (hashes, root_block),
584                });
585            }
586            Some(P2pRpcResponse::LedgerQuery(answer)) => match answer {
587                MinaLedgerSyncLedgerAnswerStableV2::ChildHashesAre(left, right) => {
588                    dispatcher.push(
589                        TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressSuccess {
590                            peer_id,
591                            rpc_id: id,
592                            response: PeerLedgerQueryResponse::ChildHashes(
593                                left.clone(),
594                                right.clone(),
595                            ),
596                        },
597                    );
598                }
599                MinaLedgerSyncLedgerAnswerStableV2::ContentsAre(accounts) => {
600                    dispatcher.push(
601                        TransitionFrontierSyncLedgerSnarkedAction::PeerQueryAddressSuccess {
602                            peer_id,
603                            rpc_id: id,
604                            response: PeerLedgerQueryResponse::ChildAccounts(
605                                accounts.iter().cloned().collect(),
606                            ),
607                        },
608                    );
609                }
610                MinaLedgerSyncLedgerAnswerStableV2::NumAccounts(count, contents_hash) => {
611                    dispatcher.push(
612                        TransitionFrontierSyncLedgerSnarkedAction::PeerQueryNumAccountsSuccess {
613                            peer_id,
614                            rpc_id: id,
615                            response: PeerLedgerQueryResponse::NumAccounts(
616                                count.as_u64(),
617                                contents_hash.clone(),
618                            ),
619                        },
620                    );
621                }
622            },
623            Some(P2pRpcResponse::StagedLedgerAuxAndPendingCoinbasesAtBlock(parts)) => {
624                dispatcher.push(
625                    TransitionFrontierSyncLedgerStagedAction::PartsPeerFetchSuccess {
626                        peer_id,
627                        rpc_id: id,
628                        parts: parts.clone(),
629                    },
630                );
631            }
632            Some(P2pRpcResponse::Block(block)) => {
633                let Ok(block) = BlockWithHash::try_new(block.clone()) else {
634                    openmina_core::error!(meta.time(); "P2pRpcResponse::Block: invalid block");
635                    return;
636                };
637                dispatcher.push(TransitionFrontierSyncAction::BlocksPeerQuerySuccess {
638                    peer_id,
639                    rpc_id: id,
640                    response: block,
641                });
642            }
643            Some(P2pRpcResponse::Transaction(transaction)) => {
644                match TransactionWithHash::try_new(transaction.clone()) {
645                    Err(err) => bug_condition!("tx hashing failed: {err}"),
646                    Ok(transaction) => {
647                        dispatcher.push(TransactionPoolCandidateAction::FetchSuccess {
648                            peer_id,
649                            transaction,
650                        })
651                    }
652                }
653            }
654            Some(P2pRpcResponse::Snark(snark)) => {
655                dispatcher.push(SnarkPoolCandidateAction::WorkFetchSuccess {
656                    peer_id,
657                    work: snark.clone(),
658                });
659            }
660            Some(P2pRpcResponse::InitialPeers(_)) => {}
661        }
662    }
663}
664
665enum PreValidationResult {
666    Continue,
667    Reject { reason: String },
668    Ignore { reason: String },
669}