mina_p2p/channels/streaming_rpc/
p2p_channels_streaming_rpc_state.rs

1use serde::{Deserialize, Serialize};
2
3use crate::P2pTimeouts;
4
5use super::{
6    staged_ledger_parts::StagedLedgerPartsReceiveProgress, P2pStreamingRpcId, P2pStreamingRpcKind,
7    P2pStreamingRpcReceiveProgress, P2pStreamingRpcRequest, P2pStreamingRpcResponse,
8    P2pStreamingRpcResponseFull, P2pStreamingRpcSendProgress,
9};
10
11#[derive(Serialize, Deserialize, Debug, Clone)]
12#[expect(
13    clippy::large_enum_variant,
14    reason = "Happy-path variant is the redux state"
15)]
16pub enum P2pChannelsStreamingRpcState {
17    Disabled,
18    Enabled,
19    Init {
20        time: redux::Timestamp,
21    },
22    Pending {
23        time: redux::Timestamp,
24    },
25    Ready {
26        time: redux::Timestamp,
27        /// We are the requestors here.
28        local: P2pStreamingRpcLocalState,
29        /// We are the responders here.
30        remote: P2pStreamingRpcRemoteState,
31        remote_last_responded: redux::Timestamp,
32    },
33}
34
35#[derive(Serialize, Deserialize, Debug, Clone)]
36pub enum P2pStreamingRpcLocalState {
37    WaitingForRequest {
38        time: redux::Timestamp,
39    },
40    Requested {
41        time: redux::Timestamp,
42        id: P2pStreamingRpcId,
43        request: Box<P2pStreamingRpcRequest>,
44        progress: P2pStreamingRpcReceiveProgress,
45    },
46    Responded {
47        time: redux::Timestamp,
48        id: P2pStreamingRpcId,
49        request: Box<P2pStreamingRpcRequest>,
50    },
51}
52
53#[derive(Serialize, Deserialize, Debug, Clone)]
54pub enum P2pStreamingRpcRemoteState {
55    WaitingForRequest {
56        time: redux::Timestamp,
57    },
58    Requested {
59        time: redux::Timestamp,
60        id: P2pStreamingRpcId,
61        request: Box<P2pStreamingRpcRequest>,
62        progress: P2pStreamingRpcSendProgress,
63    },
64    Responded {
65        time: redux::Timestamp,
66        id: P2pStreamingRpcId,
67        request: Box<P2pStreamingRpcRequest>,
68    },
69}
70
71impl P2pChannelsStreamingRpcState {
72    pub fn is_ready(&self) -> bool {
73        matches!(self, Self::Ready { .. })
74    }
75
76    pub fn can_send_request(&self) -> bool {
77        match self {
78            Self::Ready { local, .. } => matches!(
79                local,
80                P2pStreamingRpcLocalState::WaitingForRequest { .. }
81                    | P2pStreamingRpcLocalState::Responded { .. }
82            ),
83            _ => false,
84        }
85    }
86
87    pub fn is_timed_out(
88        &self,
89        rpc_id: P2pStreamingRpcId,
90        now: redux::Timestamp,
91        config: &P2pTimeouts,
92    ) -> bool {
93        match self {
94            Self::Ready {
95                local:
96                    P2pStreamingRpcLocalState::Requested {
97                        id,
98                        request,
99                        progress,
100                        ..
101                    },
102                ..
103            } => {
104                rpc_id == *id
105                    && request
106                        .kind()
107                        .timeout(config)
108                        .and_then(|timeout| {
109                            let dur = now.checked_sub(progress.last_updated())?;
110                            Some(dur >= timeout)
111                        })
112                        .unwrap_or(false)
113            }
114            _ => false,
115        }
116    }
117
118    pub fn pending_local_rpc_id(&self) -> Option<P2pStreamingRpcId> {
119        match self {
120            Self::Ready {
121                local: P2pStreamingRpcLocalState::Requested { id, .. },
122                ..
123            } => Some(*id),
124            _ => None,
125        }
126    }
127
128    pub fn pending_local_rpc(&self) -> Option<&P2pStreamingRpcRequest> {
129        match self {
130            Self::Ready {
131                local: P2pStreamingRpcLocalState::Requested { request, .. },
132                ..
133            } => Some(request),
134            _ => None,
135        }
136    }
137
138    pub fn pending_local_rpc_kind(&self) -> Option<P2pStreamingRpcKind> {
139        self.pending_local_rpc().map(|req| req.kind())
140    }
141
142    pub fn pending_local_rpc_progress(&self) -> Option<&P2pStreamingRpcReceiveProgress> {
143        match self {
144            Self::Ready {
145                local: P2pStreamingRpcLocalState::Requested { progress, .. },
146                ..
147            } => Some(progress),
148            _ => None,
149        }
150    }
151
152    pub(super) fn local_done_response(&self) -> Option<P2pStreamingRpcResponseFull> {
153        match self {
154            Self::Ready {
155                local:
156                    P2pStreamingRpcLocalState::Requested {
157                        progress:
158                            P2pStreamingRpcReceiveProgress::StagedLedgerParts(
159                                StagedLedgerPartsReceiveProgress::Success { data, .. },
160                            ),
161                        ..
162                    },
163                ..
164            } => Some(data.clone().into()),
165            _ => None,
166        }
167    }
168
169    pub fn local_responded_request(&self) -> Option<(P2pStreamingRpcId, &P2pStreamingRpcRequest)> {
170        match self {
171            Self::Ready {
172                local: P2pStreamingRpcLocalState::Responded { id, request, .. },
173                ..
174            } => Some((*id, request)),
175            _ => None,
176        }
177    }
178
179    #[allow(unused)]
180    pub(super) fn remote_request(&self) -> Option<&P2pStreamingRpcRequest> {
181        match self {
182            Self::Ready {
183                remote: P2pStreamingRpcRemoteState::Requested { request, .. },
184                ..
185            } => Some(request),
186            _ => None,
187        }
188    }
189
190    pub fn remote_todo_request(&self) -> Option<(P2pStreamingRpcId, &P2pStreamingRpcRequest)> {
191        match self {
192            Self::Ready {
193                remote:
194                    P2pStreamingRpcRemoteState::Requested {
195                        id,
196                        request,
197                        progress,
198                        ..
199                    },
200                ..
201            } if progress.external_data_todo() => Some((*id, request)),
202            _ => None,
203        }
204    }
205
206    pub fn remote_pending_request(&self) -> Option<(P2pStreamingRpcId, &P2pStreamingRpcRequest)> {
207        match self {
208            Self::Ready {
209                remote:
210                    P2pStreamingRpcRemoteState::Requested {
211                        id,
212                        request,
213                        progress,
214                        ..
215                    },
216                ..
217            } if progress.external_data_pending() => Some((*id, request)),
218            _ => None,
219        }
220    }
221
222    pub fn remote_next_msg(&self) -> Option<P2pStreamingRpcResponse> {
223        match self {
224            Self::Ready {
225                remote: P2pStreamingRpcRemoteState::Requested { progress, .. },
226                ..
227            } => progress.next_msg(),
228            _ => None,
229        }
230    }
231
232    pub fn remote_last_responded(&self) -> redux::Timestamp {
233        match self {
234            Self::Ready {
235                remote_last_responded,
236                ..
237            } => *remote_last_responded,
238            _ => redux::Timestamp::ZERO,
239        }
240    }
241}