mina_p2p/channels/streaming_rpc/
p2p_channels_streaming_rpc_state.rs1use serde::{Deserialize, Serialize};
2
3use crate::P2pTimeouts;
4
5use super::{
6 staged_ledger_parts::StagedLedgerPartsReceiveProgress, P2pStreamingRpcId, P2pStreamingRpcKind,
7 P2pStreamingRpcReceiveProgress, P2pStreamingRpcRequest, P2pStreamingRpcResponse,
8 P2pStreamingRpcResponseFull, P2pStreamingRpcSendProgress,
9};
10
11#[derive(Serialize, Deserialize, Debug, Clone)]
12#[expect(
13 clippy::large_enum_variant,
14 reason = "Happy-path variant is the redux state"
15)]
16pub enum P2pChannelsStreamingRpcState {
17 Disabled,
18 Enabled,
19 Init {
20 time: redux::Timestamp,
21 },
22 Pending {
23 time: redux::Timestamp,
24 },
25 Ready {
26 time: redux::Timestamp,
27 local: P2pStreamingRpcLocalState,
29 remote: P2pStreamingRpcRemoteState,
31 remote_last_responded: redux::Timestamp,
32 },
33}
34
35#[derive(Serialize, Deserialize, Debug, Clone)]
36pub enum P2pStreamingRpcLocalState {
37 WaitingForRequest {
38 time: redux::Timestamp,
39 },
40 Requested {
41 time: redux::Timestamp,
42 id: P2pStreamingRpcId,
43 request: Box<P2pStreamingRpcRequest>,
44 progress: P2pStreamingRpcReceiveProgress,
45 },
46 Responded {
47 time: redux::Timestamp,
48 id: P2pStreamingRpcId,
49 request: Box<P2pStreamingRpcRequest>,
50 },
51}
52
53#[derive(Serialize, Deserialize, Debug, Clone)]
54pub enum P2pStreamingRpcRemoteState {
55 WaitingForRequest {
56 time: redux::Timestamp,
57 },
58 Requested {
59 time: redux::Timestamp,
60 id: P2pStreamingRpcId,
61 request: Box<P2pStreamingRpcRequest>,
62 progress: P2pStreamingRpcSendProgress,
63 },
64 Responded {
65 time: redux::Timestamp,
66 id: P2pStreamingRpcId,
67 request: Box<P2pStreamingRpcRequest>,
68 },
69}
70
71impl P2pChannelsStreamingRpcState {
72 pub fn is_ready(&self) -> bool {
73 matches!(self, Self::Ready { .. })
74 }
75
76 pub fn can_send_request(&self) -> bool {
77 match self {
78 Self::Ready { local, .. } => matches!(
79 local,
80 P2pStreamingRpcLocalState::WaitingForRequest { .. }
81 | P2pStreamingRpcLocalState::Responded { .. }
82 ),
83 _ => false,
84 }
85 }
86
87 pub fn is_timed_out(
88 &self,
89 rpc_id: P2pStreamingRpcId,
90 now: redux::Timestamp,
91 config: &P2pTimeouts,
92 ) -> bool {
93 match self {
94 Self::Ready {
95 local:
96 P2pStreamingRpcLocalState::Requested {
97 id,
98 request,
99 progress,
100 ..
101 },
102 ..
103 } => {
104 rpc_id == *id
105 && request
106 .kind()
107 .timeout(config)
108 .and_then(|timeout| {
109 let dur = now.checked_sub(progress.last_updated())?;
110 Some(dur >= timeout)
111 })
112 .unwrap_or(false)
113 }
114 _ => false,
115 }
116 }
117
118 pub fn pending_local_rpc_id(&self) -> Option<P2pStreamingRpcId> {
119 match self {
120 Self::Ready {
121 local: P2pStreamingRpcLocalState::Requested { id, .. },
122 ..
123 } => Some(*id),
124 _ => None,
125 }
126 }
127
128 pub fn pending_local_rpc(&self) -> Option<&P2pStreamingRpcRequest> {
129 match self {
130 Self::Ready {
131 local: P2pStreamingRpcLocalState::Requested { request, .. },
132 ..
133 } => Some(request),
134 _ => None,
135 }
136 }
137
138 pub fn pending_local_rpc_kind(&self) -> Option<P2pStreamingRpcKind> {
139 self.pending_local_rpc().map(|req| req.kind())
140 }
141
142 pub fn pending_local_rpc_progress(&self) -> Option<&P2pStreamingRpcReceiveProgress> {
143 match self {
144 Self::Ready {
145 local: P2pStreamingRpcLocalState::Requested { progress, .. },
146 ..
147 } => Some(progress),
148 _ => None,
149 }
150 }
151
152 pub(super) fn local_done_response(&self) -> Option<P2pStreamingRpcResponseFull> {
153 match self {
154 Self::Ready {
155 local:
156 P2pStreamingRpcLocalState::Requested {
157 progress:
158 P2pStreamingRpcReceiveProgress::StagedLedgerParts(
159 StagedLedgerPartsReceiveProgress::Success { data, .. },
160 ),
161 ..
162 },
163 ..
164 } => Some(data.clone().into()),
165 _ => None,
166 }
167 }
168
169 pub fn local_responded_request(&self) -> Option<(P2pStreamingRpcId, &P2pStreamingRpcRequest)> {
170 match self {
171 Self::Ready {
172 local: P2pStreamingRpcLocalState::Responded { id, request, .. },
173 ..
174 } => Some((*id, request)),
175 _ => None,
176 }
177 }
178
179 #[allow(unused)]
180 pub(super) fn remote_request(&self) -> Option<&P2pStreamingRpcRequest> {
181 match self {
182 Self::Ready {
183 remote: P2pStreamingRpcRemoteState::Requested { request, .. },
184 ..
185 } => Some(request),
186 _ => None,
187 }
188 }
189
190 pub fn remote_todo_request(&self) -> Option<(P2pStreamingRpcId, &P2pStreamingRpcRequest)> {
191 match self {
192 Self::Ready {
193 remote:
194 P2pStreamingRpcRemoteState::Requested {
195 id,
196 request,
197 progress,
198 ..
199 },
200 ..
201 } if progress.external_data_todo() => Some((*id, request)),
202 _ => None,
203 }
204 }
205
206 pub fn remote_pending_request(&self) -> Option<(P2pStreamingRpcId, &P2pStreamingRpcRequest)> {
207 match self {
208 Self::Ready {
209 remote:
210 P2pStreamingRpcRemoteState::Requested {
211 id,
212 request,
213 progress,
214 ..
215 },
216 ..
217 } if progress.external_data_pending() => Some((*id, request)),
218 _ => None,
219 }
220 }
221
222 pub fn remote_next_msg(&self) -> Option<P2pStreamingRpcResponse> {
223 match self {
224 Self::Ready {
225 remote: P2pStreamingRpcRemoteState::Requested { progress, .. },
226 ..
227 } => progress.next_msg(),
228 _ => None,
229 }
230 }
231
232 pub fn remote_last_responded(&self) -> redux::Timestamp {
233 match self {
234 Self::Ready {
235 remote_last_responded,
236 ..
237 } => *remote_last_responded,
238 _ => redux::Timestamp::ZERO,
239 }
240 }
241}