p2p/network/rpc/
p2p_network_rpc_state.rs1use std::{
2 collections::{BTreeMap, VecDeque},
3 str,
4 time::Duration,
5};
6
7use binprot::BinProtWrite;
8use serde::{Deserialize, Serialize};
9
10use mina_p2p_messages::{
11 rpc_kernel::{MessageHeader, QueryHeader, ResponseHeader},
12 string::CharString,
13 versioned::Ver,
14};
15
16use crate::{channels::rpc::P2pRpcId, Data};
17
18use super::super::*;
19
20const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(10);
21
22#[serde_with::serde_as]
23#[derive(Serialize, Deserialize, Debug, Clone, MallocSizeOf)]
24pub struct P2pNetworkRpcState {
25 pub addr: ConnectionAddr,
26 pub stream_id: StreamId,
27 pub last_id: P2pRpcId,
28 #[ignore_malloc_size_of = "primitive"]
29 pub last_heartbeat_sent: Option<redux::Timestamp>,
30 pub pending: Option<QueryHeader>,
31 #[serde_as(as = "Vec<(_, _)>")]
32 #[ignore_malloc_size_of = "TODO(vlad)"]
33 pub total_stats: BTreeMap<(CharString, Ver), usize>,
34 pub is_incoming: bool,
35 pub buffer: Vec<u8>,
36 pub incoming: VecDeque<RpcMessage>,
37 pub error: Option<P2pNetworkRpcError>,
38}
39
40impl P2pNetworkRpcState {
41 pub fn new(addr: ConnectionAddr, stream_id: StreamId) -> Self {
42 P2pNetworkRpcState {
43 addr,
44 stream_id,
45 last_id: 0,
46 last_heartbeat_sent: None,
47 pending: None,
48 total_stats: BTreeMap::default(),
49 is_incoming: false,
50 buffer: vec![],
51 incoming: Default::default(),
52 error: None,
53 }
54 }
55
56 pub fn should_send_heartbeat(&self, now: redux::Timestamp) -> bool {
57 self.last_heartbeat_sent.is_none_or(|last_sent| {
58 now.checked_sub(last_sent)
59 .is_some_and(|dur| dur >= HEARTBEAT_INTERVAL)
60 })
61 }
62}
63
64#[derive(Serialize, Deserialize, Debug, Clone, MallocSizeOf)]
65pub enum RpcMessage {
66 Handshake,
67 Heartbeat,
68 Query { header: QueryHeader, bytes: Data },
69 Response { header: ResponseHeader, bytes: Data },
70}
71
72const HANDSHAKE_ID: P2pRpcId = P2pRpcId::from_le_bytes(*b"RPC\x00\x00\x00\x00\x00");
73
74impl RpcMessage {
75 pub fn into_bytes(self) -> Vec<u8> {
76 let mut v = vec![0; 8];
77 match self {
78 Self::Handshake => {
79 MessageHeader::Response(ResponseHeader { id: HANDSHAKE_ID })
80 .binprot_write(&mut v)
81 .unwrap_or_default();
82 v.extend_from_slice(b"\x01");
83 }
84 Self::Heartbeat => {
85 MessageHeader::Heartbeat
86 .binprot_write(&mut v)
87 .unwrap_or_default();
88 }
89 Self::Query { header, bytes } => {
90 MessageHeader::Query(header)
91 .binprot_write(&mut v)
92 .unwrap_or_default();
93 v.extend_from_slice(&bytes);
94 }
95 Self::Response { header, bytes } => {
96 MessageHeader::Response(header)
97 .binprot_write(&mut v)
98 .unwrap_or_default();
99 v.extend_from_slice(&bytes);
100 }
101 }
102
103 let len_bytes = ((v.len() - 8) as u64).to_le_bytes();
104 v[..8].clone_from_slice(&len_bytes);
105 v
106 }
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error, MallocSizeOf)]
110pub enum P2pNetworkRpcError {
111 #[error("error reading binprot message: {0}")]
112 Binprot(String),
113 #[error("message {0} with size {1} exceeds limit of {2}")]
114 Limit(
115 String,
116 usize,
117 #[ignore_malloc_size_of = "primitive"] Limit<usize>,
118 ),
119}