p2p/network/rpc/
p2p_network_rpc_reducer.rs

1use std::sync::Arc;
2
3use binprot::BinProtRead;
4use mina_p2p_messages::{
5    rpc,
6    rpc_kernel::{
7        MessageHeader, PayloadBinprotReader as _, QueryHeader, ResponseHeader, RpcMethod,
8        RpcQueryReadError, RpcResponseReadError,
9    },
10    v2,
11    versioned::Ver,
12};
13use openmina_core::{bug_condition, error, fuzz_maybe, fuzzed_maybe, Substate};
14use redux::Dispatcher;
15
16use crate::{
17    channels::rpc::{
18        BestTipWithProof, P2pChannelsRpcAction, P2pRpcRequest, P2pRpcResponse,
19        StagedLedgerAuxAndPendingCoinbases,
20    },
21    connection::outgoing::P2pConnectionOutgoingInitOpts,
22    disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
23    Data, Limit, P2pLimits, P2pNetworkState, P2pNetworkYamuxAction, PeerId,
24};
25
26use self::p2p_network_rpc_state::P2pNetworkRpcError;
27
28use super::*;
29
30impl P2pNetworkRpcState {
31    /// Substate is accessed
32    pub fn reducer<State, Action>(
33        mut state_context: Substate<Action, State, P2pNetworkState>,
34        action: redux::ActionWithMeta<P2pNetworkRpcAction>,
35        limits: &P2pLimits,
36    ) -> Result<(), String>
37    where
38        State: crate::P2pStateTrait,
39        Action: crate::P2pActionTrait<State>,
40    {
41        let (action, meta) = action.split();
42        let rpc_state = state_context
43            .get_substate_mut()?
44            .find_rpc_state_mut(&action)
45            .ok_or_else(|| format!("RPC state not found for action: {:?}", action))
46            .inspect_err(|e| bug_condition!("{}", e))?;
47
48        match action {
49            P2pNetworkRpcAction::Init {
50                incoming,
51                addr,
52                peer_id,
53                stream_id,
54            } => {
55                rpc_state.is_incoming = incoming;
56
57                let dispatcher = state_context.into_dispatcher();
58                dispatcher.push(P2pNetworkRpcAction::OutgoingData {
59                    addr,
60                    peer_id,
61                    stream_id,
62                    data: Data::from(RpcMessage::Handshake.into_bytes()),
63                    fin: false,
64                });
65                Ok(())
66            }
67            P2pNetworkRpcAction::IncomingData {
68                data,
69                addr,
70                peer_id,
71                stream_id,
72            } => {
73                rpc_state.buffer.extend_from_slice(&data);
74                let mut offset = 0;
75                // TODO(akoptelov): there shouldn't be the case where we have multiple incoming messages at once (or at least other than heartbeat)
76                loop {
77                    let Some(buf) = &rpc_state.buffer.get(offset..) else {
78                        bug_condition!("Invalid range `buffer[{offset}..]`");
79                        return Ok(());
80                    };
81                    if let Some(len_bytes) = buf.get(..8).and_then(|s| s.try_into().ok()) {
82                        let len = u64::from_le_bytes(len_bytes) as usize;
83                        if let Err(err) = rpc_state.check_rpc_limit(len, limits) {
84                            rpc_state.error = Some(err);
85                            break;
86                        }
87                        if let Some(mut slice) = buf.get(8..(8 + len)) {
88                            offset += 8 + len;
89                            let msg = match MessageHeader::binprot_read(&mut slice) {
90                                Ok(MessageHeader::Heartbeat) => RpcMessage::Heartbeat,
91                                Ok(MessageHeader::Response(h))
92                                    if h.id == u64::from_le_bytes(*b"RPC\x00\x00\x00\x00\x00") =>
93                                {
94                                    RpcMessage::Handshake
95                                }
96                                Ok(MessageHeader::Query(header)) => RpcMessage::Query {
97                                    header,
98                                    bytes: slice.to_vec().into(),
99                                },
100                                Ok(MessageHeader::Response(header)) => RpcMessage::Response {
101                                    header,
102                                    bytes: slice.to_vec().into(),
103                                },
104                                Err(err) => {
105                                    rpc_state.error =
106                                        Some(P2pNetworkRpcError::Binprot(err.to_string()));
107                                    continue;
108                                }
109                            };
110                            rpc_state.incoming.push_back(msg);
111                            continue;
112                        }
113                    }
114
115                    if offset != 0 {
116                        let Some(buf) = rpc_state.buffer.get(offset..) else {
117                            bug_condition!("Invalid range `buffer[{offset}..]`");
118                            return Ok(());
119                        };
120                        rpc_state.buffer = buf.to_vec();
121                    }
122                    break;
123                }
124
125                let incoming = rpc_state.incoming.front().cloned();
126                let dispatcher = state_context.into_dispatcher();
127
128                if let Some(message) = incoming {
129                    dispatcher.push(P2pNetworkRpcAction::IncomingMessage {
130                        addr,
131                        peer_id,
132                        stream_id,
133                        message,
134                    })
135                }
136
137                Ok(())
138            }
139            ref action @ P2pNetworkRpcAction::IncomingMessage {
140                ref message,
141                addr,
142                peer_id,
143                stream_id,
144            } => {
145                if let RpcMessage::Response { header, .. } = &message {
146                    if let Some(QueryHeader { id, tag, version }) = &rpc_state.pending {
147                        *rpc_state
148                            .total_stats
149                            .entry((tag.clone(), *version))
150                            .or_default() += 1;
151                        if id != &header.id {
152                            error!(meta.time(); "receiving response with wrong id: {}", header.id);
153                        }
154                    } else {
155                        error!(meta.time(); "receiving response without query");
156                    }
157                } else if let RpcMessage::Query { header, .. } = &message {
158                    if rpc_state.pending.is_none() {
159                        rpc_state.pending = Some(header.clone());
160                    } else {
161                        error!(meta.time(); "receiving query while another query is pending");
162                    }
163                }
164
165                rpc_state.incoming.pop_front();
166
167                let (dispatcher, state) = state_context.into_dispatcher_and_state();
168                let network_state: &P2pNetworkState = state.substate()?;
169                let state = network_state
170                    .find_rpc_state(action)
171                    .ok_or_else(|| format!("RPC state not found for action: {:?}", action))?;
172
173                match &message {
174                    RpcMessage::Handshake => {
175                        if !state.is_incoming {
176                            dispatcher.push(P2pChannelsRpcAction::Ready { peer_id });
177                        }
178                    }
179                    RpcMessage::Heartbeat => {}
180                    RpcMessage::Query { header, bytes } => {
181                        if let Err(e) = dispatch_rpc_query(peer_id, header, bytes, dispatcher) {
182                            dispatcher.push(P2pDisconnectionAction::Init {
183                                peer_id,
184                                reason: P2pDisconnectionReason::P2pChannelReceiveFailed(
185                                    e.to_string(),
186                                ),
187                            });
188                        }
189                    }
190                    RpcMessage::Response {
191                        header: ResponseHeader { id },
192                        bytes,
193                    } => {
194                        let query_header = match state.pending.as_ref() {
195                            Some(header) if &header.id == id => header.clone(),
196                            Some(header) => {
197                                error!(meta.time(); "received response with it {} while expecting {id}", header.id);
198                                return Ok(());
199                            }
200                            None => {
201                                error!(meta.time(); "received response while no query is sent");
202                                return Ok(());
203                            }
204                        };
205                        // unset pending
206                        dispatcher.push(P2pNetworkRpcAction::PrunePending { peer_id, stream_id });
207
208                        if let Err(e) =
209                            dispatch_rpc_response(peer_id, &query_header, bytes, dispatcher)
210                        {
211                            dispatcher.push(P2pDisconnectionAction::Init {
212                                peer_id,
213                                reason: P2pDisconnectionReason::P2pChannelReceiveFailed(
214                                    e.to_string(),
215                                ),
216                            });
217                        }
218                    }
219                }
220
221                if let Some(message) = state.incoming.front().cloned() {
222                    dispatcher.push(P2pNetworkRpcAction::IncomingMessage {
223                        addr,
224                        peer_id,
225                        stream_id,
226                        message,
227                    });
228                }
229                Ok(())
230            }
231            P2pNetworkRpcAction::PrunePending { .. } => {
232                rpc_state.pending = None;
233                Ok(())
234            }
235            P2pNetworkRpcAction::HeartbeatSend {
236                addr,
237                peer_id,
238                stream_id,
239            } => {
240                rpc_state.last_heartbeat_sent = Some(meta.time());
241
242                let dispatcher = state_context.into_dispatcher();
243
244                dispatcher.push(P2pNetworkRpcAction::OutgoingData {
245                    addr,
246                    peer_id,
247                    stream_id,
248                    data: Data::from(RpcMessage::Heartbeat.into_bytes()),
249                    fin: false,
250                });
251
252                Ok(())
253            }
254            P2pNetworkRpcAction::OutgoingQuery {
255                query,
256                data,
257                peer_id,
258            } => {
259                rpc_state.last_id = query.id;
260                rpc_state.pending = Some(query.clone());
261
262                let addr = rpc_state.addr;
263                let stream_id = rpc_state.stream_id;
264                let dispatcher = state_context.into_dispatcher();
265                dispatcher.push(P2pNetworkRpcAction::OutgoingData {
266                    addr,
267                    peer_id,
268                    stream_id,
269                    data: Data::from(
270                        RpcMessage::Query {
271                            header: query,
272                            bytes: data,
273                        }
274                        .into_bytes(),
275                    ),
276                    fin: false,
277                });
278
279                Ok(())
280            }
281            P2pNetworkRpcAction::OutgoingData {
282                addr,
283                stream_id,
284                mut data,
285                ..
286            } => {
287                let dispatcher = state_context.into_dispatcher();
288                fuzz_maybe!(&mut data, crate::fuzzer::mutate_rpc_data);
289                let flags = fuzzed_maybe!(Default::default(), crate::fuzzer::mutate_yamux_flags);
290
291                dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
292                    addr,
293                    stream_id,
294                    data,
295                    flags,
296                });
297
298                Ok(())
299            }
300            P2pNetworkRpcAction::OutgoingResponse {
301                peer_id,
302                response,
303                data,
304            } => {
305                if !matches!(rpc_state.pending, Some(QueryHeader { id, .. }) if id == response.id) {
306                    bug_condition!("pending query does not match the response");
307                    return Ok(());
308                }
309                let stream_id = rpc_state.stream_id;
310                let addr = rpc_state.addr;
311                let dispatcher = state_context.into_dispatcher();
312
313                dispatcher.push(P2pNetworkRpcAction::PrunePending { peer_id, stream_id });
314                dispatcher.push(P2pNetworkRpcAction::OutgoingData {
315                    addr,
316                    peer_id,
317                    stream_id,
318                    data: Data::from(
319                        RpcMessage::Response {
320                            header: response,
321                            bytes: data,
322                        }
323                        .into_bytes(),
324                    ),
325                    fin: false,
326                });
327                Ok(())
328            }
329        }
330    }
331
332    fn check_rpc_limit(&self, len: usize, limits: &P2pLimits) -> Result<(), P2pNetworkRpcError> {
333        let (limit, kind): (_, &[u8]) = if self.is_incoming {
334            // only requests are allowed
335            (limits.rpc_query(), b"<query>")
336        } else if let Some(QueryHeader { tag, .. }) = self.pending.as_ref() {
337            use mina_p2p_messages::rpc::*;
338            match tag.as_ref() {
339                GetBestTipV2::NAME => (limits.rpc_get_best_tip(), GetBestTipV2::NAME),
340                AnswerSyncLedgerQueryV2::NAME => (
341                    limits.rpc_answer_sync_ledger_query(),
342                    AnswerSyncLedgerQueryV2::NAME,
343                ),
344                GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::NAME => (
345                    limits.rpc_get_staged_ledger(),
346                    GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::NAME,
347                ),
348                GetTransitionChainV2::NAME => (
349                    limits.rpc_get_transition_chain(),
350                    GetTransitionChainV2::NAME,
351                ),
352                GetSomeInitialPeersV1ForV2::NAME => (
353                    limits.rpc_get_some_initial_peers(),
354                    GetSomeInitialPeersV1ForV2::NAME,
355                ),
356                _ => (Limit::Some(0), b"<unimplemented>"),
357            }
358        } else {
359            (limits.rpc_service_message(), b"<service_messages>")
360        };
361        let kind = String::from_utf8_lossy(kind);
362        if len > limit {
363            Err(P2pNetworkRpcError::Limit(kind.into_owned(), len, limit))
364        } else {
365            Ok(())
366        }
367    }
368}
369
370fn dispatch_rpc_query<'a, State, Action>(
371    peer_id: PeerId,
372    QueryHeader { tag, version, id }: &'a QueryHeader,
373    mut bytes: &[u8],
374    dispatcher: &mut Dispatcher<Action, State>,
375) -> Result<(), RpcQueryError<'a>>
376where
377    State: crate::P2pStateTrait,
378    Action: crate::P2pActionTrait<State>,
379{
380    let id = *id;
381    match (tag.as_ref(), *version) {
382        (rpc::GetBestTipV2::NAME, rpc::GetBestTipV2::VERSION) => {
383            rpc::GetBestTipV2::query_payload(&mut bytes)?;
384            dispatcher.push(P2pChannelsRpcAction::RequestReceived {
385                peer_id,
386                id,
387                request: Box::new(P2pRpcRequest::BestTipWithProof),
388            });
389        }
390        (rpc::AnswerSyncLedgerQueryV2::NAME, rpc::AnswerSyncLedgerQueryV2::VERSION) => {
391            let (hash, query) = rpc::AnswerSyncLedgerQueryV2::query_payload(&mut bytes)?;
392            let hash = v2::LedgerHash::from(v2::MinaBaseLedgerHash0StableV1(hash));
393
394            dispatcher.push(P2pChannelsRpcAction::RequestReceived {
395                peer_id,
396                id,
397                request: Box::new(P2pRpcRequest::LedgerQuery(hash, query)),
398            });
399        }
400        (
401            rpc::GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::NAME,
402            rpc::GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::VERSION,
403        ) => {
404            let hash =
405                rpc::GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::query_payload(&mut bytes)?;
406            let hash = v2::StateHash::from(v2::DataHashLibStateHashStableV1(hash));
407            let request = Box::new(P2pRpcRequest::StagedLedgerAuxAndPendingCoinbasesAtBlock(
408                hash,
409            ));
410
411            dispatcher.push(P2pChannelsRpcAction::RequestReceived {
412                peer_id,
413                id,
414                request,
415            });
416        }
417        (rpc::GetTransitionChainV2::NAME, rpc::GetTransitionChainV2::VERSION) => {
418            let hashes = rpc::GetTransitionChainV2::query_payload(&mut bytes)?;
419            for hash in hashes {
420                let hash = v2::StateHash::from(v2::DataHashLibStateHashStableV1(hash));
421
422                dispatcher.push(P2pChannelsRpcAction::RequestReceived {
423                    peer_id,
424                    id,
425                    request: Box::new(P2pRpcRequest::Block(hash)),
426                });
427            }
428        }
429        (rpc::GetSomeInitialPeersV1ForV2::NAME, rpc::GetSomeInitialPeersV1ForV2::VERSION) => {
430            let () = rpc::GetSomeInitialPeersV1ForV2::query_payload(&mut bytes)?;
431            dispatcher.push(P2pChannelsRpcAction::RequestReceived {
432                peer_id,
433                id,
434                request: Box::new(P2pRpcRequest::InitialPeers),
435            });
436        }
437        (name, version) => return Err(RpcQueryError::Unimplemented(name, version)),
438    }
439    Ok(())
440}
441
442fn dispatch_rpc_response<State, Action>(
443    peer_id: PeerId,
444    QueryHeader { tag, version, id }: &QueryHeader,
445    mut bytes: &[u8],
446    dispatcher: &mut Dispatcher<Action, State>,
447) -> Result<(), RpcResponseError>
448where
449    State: crate::P2pStateTrait,
450    Action: crate::P2pActionTrait<State>,
451{
452    let id = *id;
453    match (tag.as_ref(), *version) {
454        (rpc::GetBestTipV2::NAME, rpc::GetBestTipV2::VERSION) => {
455            let response = rpc::GetBestTipV2::response_payload(&mut bytes)?
456                .map(|resp| BestTipWithProof {
457                    best_tip: resp.data.into(),
458                    proof: (resp.proof.0, resp.proof.1.into()),
459                })
460                .map(P2pRpcResponse::BestTipWithProof)
461                .map(Box::new);
462
463            dispatcher.push(P2pChannelsRpcAction::ResponseReceived {
464                peer_id,
465                id,
466                response,
467            });
468        }
469        (rpc::AnswerSyncLedgerQueryV2::NAME, rpc::AnswerSyncLedgerQueryV2::VERSION) => {
470            let response = Result::from(rpc::AnswerSyncLedgerQueryV2::response_payload(
471                &mut bytes,
472            )?)
473            .map_err(|e| RpcResponseError::Other {
474                rpc_id: rpc::AnswerSyncLedgerQueryV2::rpc_id(),
475                error: e.to_string(),
476            })?;
477            let response = Some(Box::new(P2pRpcResponse::LedgerQuery(response)));
478
479            dispatcher.push(P2pChannelsRpcAction::ResponseReceived {
480                peer_id,
481                id,
482                response,
483            });
484        }
485        (
486            rpc::GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::NAME,
487            rpc::GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::VERSION,
488        ) => {
489            let response =
490                rpc::GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::response_payload(&mut bytes)?;
491            let response = response
492                .map(|(scan_state, hash, pending_coinbase, needed_blocks)| {
493                    let staged_ledger_hash = v2::MinaBaseLedgerHash0StableV1(hash).into();
494                    Arc::new(StagedLedgerAuxAndPendingCoinbases {
495                        scan_state,
496                        staged_ledger_hash,
497                        pending_coinbase,
498                        needed_blocks,
499                    })
500                })
501                .map(P2pRpcResponse::StagedLedgerAuxAndPendingCoinbasesAtBlock)
502                .map(Box::new);
503
504            dispatcher.push(P2pChannelsRpcAction::ResponseReceived {
505                peer_id,
506                id,
507                response,
508            });
509        }
510        (rpc::GetTransitionChainV2::NAME, rpc::GetTransitionChainV2::VERSION) => {
511            let response = rpc::GetTransitionChainV2::response_payload(&mut bytes)?;
512            match response {
513                Some(response) if !response.is_empty() => {
514                    for block in response {
515                        let response = Some(Box::new(P2pRpcResponse::Block(Arc::new(block))));
516                        dispatcher.push(P2pChannelsRpcAction::ResponseReceived {
517                            peer_id,
518                            id,
519                            response,
520                        });
521                    }
522                }
523                _ => {
524                    dispatcher.push(P2pChannelsRpcAction::ResponseReceived {
525                        peer_id,
526                        id,
527                        response: None,
528                    });
529                }
530            }
531        }
532        (rpc::GetSomeInitialPeersV1ForV2::NAME, rpc::GetSomeInitialPeersV1ForV2::VERSION) => {
533            let response = rpc::GetSomeInitialPeersV1ForV2::response_payload(&mut bytes)?;
534            if response.is_empty() {
535                dispatcher.push(P2pChannelsRpcAction::ResponseReceived {
536                    peer_id,
537                    id,
538                    response: None,
539                });
540            } else {
541                let peers = response
542                    .into_iter()
543                    .filter_map(P2pConnectionOutgoingInitOpts::try_from_mina_rpc)
544                    .collect();
545                dispatcher.push(P2pChannelsRpcAction::ResponseReceived {
546                    peer_id,
547                    id,
548                    response: Some(Box::new(P2pRpcResponse::InitialPeers(peers))),
549                });
550            }
551        }
552        _ => {}
553    }
554    Ok(())
555}
556
557#[derive(Debug, thiserror::Error)]
558enum RpcQueryError<'a> {
559    #[error(transparent)]
560    Read(#[from] RpcQueryReadError),
561    #[error("unimplemented rpc {}:{1}", String::from_utf8_lossy(.0))]
562    Unimplemented(&'a [u8], Ver),
563}
564
565#[derive(Debug, thiserror::Error)]
566enum RpcResponseError {
567    #[error(transparent)]
568    Read(#[from] RpcResponseReadError),
569    #[error("rpc response {rpc_id} error: {error}")]
570    Other { rpc_id: String, error: String },
571}