p2p/network/rpc/
p2p_network_rpc_state.rs

1use std::{
2    collections::{BTreeMap, VecDeque},
3    str,
4    time::Duration,
5};
6
7use binprot::BinProtWrite;
8use serde::{Deserialize, Serialize};
9
10use mina_p2p_messages::{
11    rpc_kernel::{MessageHeader, QueryHeader, ResponseHeader},
12    string::CharString,
13    versioned::Ver,
14};
15
16use crate::{channels::rpc::P2pRpcId, Data};
17
18use super::super::*;
19
20const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(10);
21
22#[serde_with::serde_as]
23#[derive(Serialize, Deserialize, Debug, Clone, MallocSizeOf)]
24pub struct P2pNetworkRpcState {
25    pub addr: ConnectionAddr,
26    pub stream_id: StreamId,
27    pub last_id: P2pRpcId,
28    #[ignore_malloc_size_of = "primitive"]
29    pub last_heartbeat_sent: Option<redux::Timestamp>,
30    pub pending: Option<QueryHeader>,
31    #[serde_as(as = "Vec<(_, _)>")]
32    #[ignore_malloc_size_of = "TODO(vlad)"]
33    pub total_stats: BTreeMap<(CharString, Ver), usize>,
34    pub is_incoming: bool,
35    pub buffer: Vec<u8>,
36    pub incoming: VecDeque<RpcMessage>,
37    pub error: Option<P2pNetworkRpcError>,
38}
39
40impl P2pNetworkRpcState {
41    pub fn new(addr: ConnectionAddr, stream_id: StreamId) -> Self {
42        P2pNetworkRpcState {
43            addr,
44            stream_id,
45            last_id: 0,
46            last_heartbeat_sent: None,
47            pending: None,
48            total_stats: BTreeMap::default(),
49            is_incoming: false,
50            buffer: vec![],
51            incoming: Default::default(),
52            error: None,
53        }
54    }
55
56    pub fn should_send_heartbeat(&self, now: redux::Timestamp) -> bool {
57        self.last_heartbeat_sent.is_none_or(|last_sent| {
58            now.checked_sub(last_sent)
59                .is_some_and(|dur| dur >= HEARTBEAT_INTERVAL)
60        })
61    }
62}
63
64#[derive(Serialize, Deserialize, Debug, Clone, MallocSizeOf)]
65pub enum RpcMessage {
66    Handshake,
67    Heartbeat,
68    Query { header: QueryHeader, bytes: Data },
69    Response { header: ResponseHeader, bytes: Data },
70}
71
72const HANDSHAKE_ID: P2pRpcId = P2pRpcId::from_le_bytes(*b"RPC\x00\x00\x00\x00\x00");
73
74impl RpcMessage {
75    pub fn into_bytes(self) -> Vec<u8> {
76        let mut v = vec![0; 8];
77        match self {
78            Self::Handshake => {
79                MessageHeader::Response(ResponseHeader { id: HANDSHAKE_ID })
80                    .binprot_write(&mut v)
81                    .unwrap_or_default();
82                v.extend_from_slice(b"\x01");
83            }
84            Self::Heartbeat => {
85                MessageHeader::Heartbeat
86                    .binprot_write(&mut v)
87                    .unwrap_or_default();
88            }
89            Self::Query { header, bytes } => {
90                MessageHeader::Query(header)
91                    .binprot_write(&mut v)
92                    .unwrap_or_default();
93                v.extend_from_slice(&bytes);
94            }
95            Self::Response { header, bytes } => {
96                MessageHeader::Response(header)
97                    .binprot_write(&mut v)
98                    .unwrap_or_default();
99                v.extend_from_slice(&bytes);
100            }
101        }
102
103        let len_bytes = ((v.len() - 8) as u64).to_le_bytes();
104        v[..8].clone_from_slice(&len_bytes);
105        v
106    }
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error, MallocSizeOf)]
110pub enum P2pNetworkRpcError {
111    #[error("error reading binprot message: {0}")]
112    Binprot(String),
113    #[error("message {0} with size {1} exceeds limit of {2}")]
114    Limit(
115        String,
116        usize,
117        #[ignore_malloc_size_of = "primitive"] Limit<usize>,
118    ),
119}