p2p/channels/streaming_rpc/rpcs/
mod.rs

1pub mod staged_ledger_parts;
2use staged_ledger_parts::{
3    StagedLedgerPartsReceiveProgress, StagedLedgerPartsResponse, StagedLedgerPartsResponseFull,
4    StagedLedgerPartsSendProgress,
5};
6
7use std::time::Duration;
8
9use binprot_derive::{BinProtRead, BinProtWrite};
10use derive_more::From;
11use mina_p2p_messages::v2;
12use serde::{Deserialize, Serialize};
13
14use crate::P2pTimeouts;
15
16#[derive(Serialize, Deserialize, Debug, Ord, PartialOrd, Eq, PartialEq, Clone)]
17pub enum P2pStreamingRpcKind {
18    StagedLedgerParts,
19}
20
21#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, PartialEq, Clone)]
22pub enum P2pStreamingRpcRequest {
23    StagedLedgerParts(v2::StateHash),
24}
25
26#[derive(Serialize, Deserialize, From, Debug, Clone)]
27pub enum P2pStreamingRpcResponseFull {
28    StagedLedgerParts(StagedLedgerPartsResponseFull),
29}
30
31#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, From, Debug, Clone)]
32pub enum P2pStreamingRpcResponse {
33    StagedLedgerParts(StagedLedgerPartsResponse),
34}
35
36#[derive(Serialize, Deserialize, From, Debug, Clone)]
37pub enum P2pStreamingRpcSendProgress {
38    StagedLedgerParts(StagedLedgerPartsSendProgress),
39}
40
41#[derive(Serialize, Deserialize, From, Debug, Clone)]
42pub enum P2pStreamingRpcReceiveProgress {
43    StagedLedgerParts(StagedLedgerPartsReceiveProgress),
44}
45
46impl P2pStreamingRpcKind {
47    pub fn timeout(self, _config: &P2pTimeouts) -> Option<Duration> {
48        match self {
49            // TODO(binier): use config
50            Self::StagedLedgerParts => Some(Duration::from_secs(30)),
51        }
52    }
53}
54
55impl P2pStreamingRpcRequest {
56    pub fn kind(&self) -> P2pStreamingRpcKind {
57        match self {
58            Self::StagedLedgerParts(_) => P2pStreamingRpcKind::StagedLedgerParts,
59        }
60    }
61}
62
63impl Default for P2pStreamingRpcRequest {
64    fn default() -> Self {
65        Self::StagedLedgerParts(v2::StateHash::zero())
66    }
67}
68
69impl std::fmt::Display for P2pStreamingRpcRequest {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        write!(f, "{:?}", self.kind())?;
72        match self {
73            Self::StagedLedgerParts(block_hash) => write!(f, ", {block_hash}"),
74        }
75    }
76}
77
78impl P2pStreamingRpcResponseFull {
79    pub fn kind(&self) -> P2pStreamingRpcKind {
80        match self {
81            Self::StagedLedgerParts(_) => P2pStreamingRpcKind::StagedLedgerParts,
82        }
83    }
84}
85
86impl P2pStreamingRpcResponse {
87    pub fn kind(&self) -> P2pStreamingRpcKind {
88        match self {
89            Self::StagedLedgerParts(_) => P2pStreamingRpcKind::StagedLedgerParts,
90        }
91    }
92}
93
94impl P2pStreamingRpcSendProgress {
95    pub fn kind(&self) -> P2pStreamingRpcKind {
96        match self {
97            Self::StagedLedgerParts(_) => P2pStreamingRpcKind::StagedLedgerParts,
98        }
99    }
100
101    pub fn external_data_todo(&self) -> bool {
102        match self {
103            Self::StagedLedgerParts(v) => {
104                matches!(v, StagedLedgerPartsSendProgress::LedgerGetIdle { .. })
105            }
106        }
107    }
108
109    pub fn external_data_pending(&self) -> bool {
110        match self {
111            Self::StagedLedgerParts(v) => {
112                matches!(v, StagedLedgerPartsSendProgress::LedgerGetPending { .. })
113            }
114        }
115    }
116
117    pub fn next_msg(&self) -> Option<P2pStreamingRpcResponse> {
118        match self {
119            Self::StagedLedgerParts(v) => v.next_msg().map(Into::into),
120        }
121    }
122
123    pub fn is_done(&self) -> bool {
124        match self {
125            Self::StagedLedgerParts(s) => {
126                matches!(s, StagedLedgerPartsSendProgress::Success { .. })
127            }
128        }
129    }
130}
131
132impl P2pStreamingRpcReceiveProgress {
133    pub fn kind(&self) -> P2pStreamingRpcKind {
134        match self {
135            Self::StagedLedgerParts(_) => P2pStreamingRpcKind::StagedLedgerParts,
136        }
137    }
138
139    pub fn is_done(&self) -> bool {
140        match self {
141            Self::StagedLedgerParts(s) => {
142                matches!(s, StagedLedgerPartsReceiveProgress::Success { .. })
143            }
144        }
145    }
146
147    pub fn last_updated(&self) -> redux::Timestamp {
148        match self {
149            Self::StagedLedgerParts(s) => s.last_updated(),
150        }
151    }
152
153    pub fn update(&mut self, time: redux::Timestamp, resp: P2pStreamingRpcResponse) -> bool {
154        match (self, resp) {
155            (
156                Self::StagedLedgerParts(progress),
157                P2pStreamingRpcResponse::StagedLedgerParts(resp),
158            ) => progress.update(time, resp),
159            // _ => false,
160        }
161    }
162
163    pub fn is_part_pending(&self) -> bool {
164        match self {
165            Self::StagedLedgerParts(progress) => progress.is_part_pending(),
166        }
167    }
168
169    pub fn set_next_pending(&mut self, time: redux::Timestamp) -> bool {
170        match self {
171            Self::StagedLedgerParts(progress) => progress.set_next_pending(time),
172        }
173    }
174}