p2p/channels/streaming_rpc/rpcs/
mod.rs1pub mod staged_ledger_parts;
2use staged_ledger_parts::{
3 StagedLedgerPartsReceiveProgress, StagedLedgerPartsResponse, StagedLedgerPartsResponseFull,
4 StagedLedgerPartsSendProgress,
5};
6
7use std::time::Duration;
8
9use binprot_derive::{BinProtRead, BinProtWrite};
10use derive_more::From;
11use mina_p2p_messages::v2;
12use serde::{Deserialize, Serialize};
13
14use crate::P2pTimeouts;
15
16#[derive(Serialize, Deserialize, Debug, Ord, PartialOrd, Eq, PartialEq, Clone)]
17pub enum P2pStreamingRpcKind {
18 StagedLedgerParts,
19}
20
21#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, PartialEq, Clone)]
22pub enum P2pStreamingRpcRequest {
23 StagedLedgerParts(v2::StateHash),
24}
25
26#[derive(Serialize, Deserialize, From, Debug, Clone)]
27pub enum P2pStreamingRpcResponseFull {
28 StagedLedgerParts(StagedLedgerPartsResponseFull),
29}
30
31#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, From, Debug, Clone)]
32pub enum P2pStreamingRpcResponse {
33 StagedLedgerParts(StagedLedgerPartsResponse),
34}
35
36#[derive(Serialize, Deserialize, From, Debug, Clone)]
37pub enum P2pStreamingRpcSendProgress {
38 StagedLedgerParts(StagedLedgerPartsSendProgress),
39}
40
41#[derive(Serialize, Deserialize, From, Debug, Clone)]
42pub enum P2pStreamingRpcReceiveProgress {
43 StagedLedgerParts(StagedLedgerPartsReceiveProgress),
44}
45
46impl P2pStreamingRpcKind {
47 pub fn timeout(self, _config: &P2pTimeouts) -> Option<Duration> {
48 match self {
49 Self::StagedLedgerParts => Some(Duration::from_secs(30)),
51 }
52 }
53}
54
55impl P2pStreamingRpcRequest {
56 pub fn kind(&self) -> P2pStreamingRpcKind {
57 match self {
58 Self::StagedLedgerParts(_) => P2pStreamingRpcKind::StagedLedgerParts,
59 }
60 }
61}
62
63impl Default for P2pStreamingRpcRequest {
64 fn default() -> Self {
65 Self::StagedLedgerParts(v2::StateHash::zero())
66 }
67}
68
69impl std::fmt::Display for P2pStreamingRpcRequest {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 write!(f, "{:?}", self.kind())?;
72 match self {
73 Self::StagedLedgerParts(block_hash) => write!(f, ", {block_hash}"),
74 }
75 }
76}
77
78impl P2pStreamingRpcResponseFull {
79 pub fn kind(&self) -> P2pStreamingRpcKind {
80 match self {
81 Self::StagedLedgerParts(_) => P2pStreamingRpcKind::StagedLedgerParts,
82 }
83 }
84}
85
86impl P2pStreamingRpcResponse {
87 pub fn kind(&self) -> P2pStreamingRpcKind {
88 match self {
89 Self::StagedLedgerParts(_) => P2pStreamingRpcKind::StagedLedgerParts,
90 }
91 }
92}
93
94impl P2pStreamingRpcSendProgress {
95 pub fn kind(&self) -> P2pStreamingRpcKind {
96 match self {
97 Self::StagedLedgerParts(_) => P2pStreamingRpcKind::StagedLedgerParts,
98 }
99 }
100
101 pub fn external_data_todo(&self) -> bool {
102 match self {
103 Self::StagedLedgerParts(v) => {
104 matches!(v, StagedLedgerPartsSendProgress::LedgerGetIdle { .. })
105 }
106 }
107 }
108
109 pub fn external_data_pending(&self) -> bool {
110 match self {
111 Self::StagedLedgerParts(v) => {
112 matches!(v, StagedLedgerPartsSendProgress::LedgerGetPending { .. })
113 }
114 }
115 }
116
117 pub fn next_msg(&self) -> Option<P2pStreamingRpcResponse> {
118 match self {
119 Self::StagedLedgerParts(v) => v.next_msg().map(Into::into),
120 }
121 }
122
123 pub fn is_done(&self) -> bool {
124 match self {
125 Self::StagedLedgerParts(s) => {
126 matches!(s, StagedLedgerPartsSendProgress::Success { .. })
127 }
128 }
129 }
130}
131
132impl P2pStreamingRpcReceiveProgress {
133 pub fn kind(&self) -> P2pStreamingRpcKind {
134 match self {
135 Self::StagedLedgerParts(_) => P2pStreamingRpcKind::StagedLedgerParts,
136 }
137 }
138
139 pub fn is_done(&self) -> bool {
140 match self {
141 Self::StagedLedgerParts(s) => {
142 matches!(s, StagedLedgerPartsReceiveProgress::Success { .. })
143 }
144 }
145 }
146
147 pub fn last_updated(&self) -> redux::Timestamp {
148 match self {
149 Self::StagedLedgerParts(s) => s.last_updated(),
150 }
151 }
152
153 pub fn update(&mut self, time: redux::Timestamp, resp: P2pStreamingRpcResponse) -> bool {
154 match (self, resp) {
155 (
156 Self::StagedLedgerParts(progress),
157 P2pStreamingRpcResponse::StagedLedgerParts(resp),
158 ) => progress.update(time, resp),
159 }
161 }
162
163 pub fn is_part_pending(&self) -> bool {
164 match self {
165 Self::StagedLedgerParts(progress) => progress.is_part_pending(),
166 }
167 }
168
169 pub fn set_next_pending(&mut self, time: redux::Timestamp) -> bool {
170 match self {
171 Self::StagedLedgerParts(progress) => progress.set_next_pending(time),
172 }
173 }
174}