p2p/channels/streaming_rpc/
p2p_channels_streaming_rpc_state.rs1use serde::{Deserialize, Serialize};
2
3use crate::P2pTimeouts;
4
5use super::{
6 staged_ledger_parts::StagedLedgerPartsReceiveProgress, P2pStreamingRpcId, P2pStreamingRpcKind,
7 P2pStreamingRpcReceiveProgress, P2pStreamingRpcRequest, P2pStreamingRpcResponse,
8 P2pStreamingRpcResponseFull, P2pStreamingRpcSendProgress,
9};
10
11#[derive(Serialize, Deserialize, Debug, Clone)]
12pub enum P2pChannelsStreamingRpcState {
13 Disabled,
14 Enabled,
15 Init {
16 time: redux::Timestamp,
17 },
18 Pending {
19 time: redux::Timestamp,
20 },
21 Ready {
22 time: redux::Timestamp,
23 local: P2pStreamingRpcLocalState,
25 remote: P2pStreamingRpcRemoteState,
27 remote_last_responded: redux::Timestamp,
28 },
29}
30
31#[derive(Serialize, Deserialize, Debug, Clone)]
32pub enum P2pStreamingRpcLocalState {
33 WaitingForRequest {
34 time: redux::Timestamp,
35 },
36 Requested {
37 time: redux::Timestamp,
38 id: P2pStreamingRpcId,
39 request: Box<P2pStreamingRpcRequest>,
40 progress: P2pStreamingRpcReceiveProgress,
41 },
42 Responded {
43 time: redux::Timestamp,
44 id: P2pStreamingRpcId,
45 request: Box<P2pStreamingRpcRequest>,
46 },
47}
48
49#[derive(Serialize, Deserialize, Debug, Clone)]
50pub enum P2pStreamingRpcRemoteState {
51 WaitingForRequest {
52 time: redux::Timestamp,
53 },
54 Requested {
55 time: redux::Timestamp,
56 id: P2pStreamingRpcId,
57 request: Box<P2pStreamingRpcRequest>,
58 progress: P2pStreamingRpcSendProgress,
59 },
60 Responded {
61 time: redux::Timestamp,
62 id: P2pStreamingRpcId,
63 request: Box<P2pStreamingRpcRequest>,
64 },
65}
66
67impl P2pChannelsStreamingRpcState {
68 pub fn is_ready(&self) -> bool {
69 matches!(self, Self::Ready { .. })
70 }
71
72 pub fn can_send_request(&self) -> bool {
73 match self {
74 Self::Ready { local, .. } => matches!(
75 local,
76 P2pStreamingRpcLocalState::WaitingForRequest { .. }
77 | P2pStreamingRpcLocalState::Responded { .. }
78 ),
79 _ => false,
80 }
81 }
82
83 pub fn is_timed_out(
84 &self,
85 rpc_id: P2pStreamingRpcId,
86 now: redux::Timestamp,
87 config: &P2pTimeouts,
88 ) -> bool {
89 match self {
90 Self::Ready {
91 local:
92 P2pStreamingRpcLocalState::Requested {
93 id,
94 request,
95 progress,
96 ..
97 },
98 ..
99 } => {
100 rpc_id == *id
101 && request
102 .kind()
103 .timeout(config)
104 .and_then(|timeout| {
105 let dur = now.checked_sub(progress.last_updated())?;
106 Some(dur >= timeout)
107 })
108 .unwrap_or(false)
109 }
110 _ => false,
111 }
112 }
113
114 pub fn pending_local_rpc_id(&self) -> Option<P2pStreamingRpcId> {
115 match self {
116 Self::Ready {
117 local: P2pStreamingRpcLocalState::Requested { id, .. },
118 ..
119 } => Some(*id),
120 _ => None,
121 }
122 }
123
124 pub fn pending_local_rpc(&self) -> Option<&P2pStreamingRpcRequest> {
125 match self {
126 Self::Ready {
127 local: P2pStreamingRpcLocalState::Requested { request, .. },
128 ..
129 } => Some(request),
130 _ => None,
131 }
132 }
133
134 pub fn pending_local_rpc_kind(&self) -> Option<P2pStreamingRpcKind> {
135 self.pending_local_rpc().map(|req| req.kind())
136 }
137
138 pub fn pending_local_rpc_progress(&self) -> Option<&P2pStreamingRpcReceiveProgress> {
139 match self {
140 Self::Ready {
141 local: P2pStreamingRpcLocalState::Requested { progress, .. },
142 ..
143 } => Some(progress),
144 _ => None,
145 }
146 }
147
148 pub(super) fn local_done_response(&self) -> Option<P2pStreamingRpcResponseFull> {
149 match self {
150 Self::Ready {
151 local:
152 P2pStreamingRpcLocalState::Requested {
153 progress:
154 P2pStreamingRpcReceiveProgress::StagedLedgerParts(
155 StagedLedgerPartsReceiveProgress::Success { data, .. },
156 ),
157 ..
158 },
159 ..
160 } => Some(data.clone().into()),
161 _ => None,
162 }
163 }
164
165 pub fn local_responded_request(&self) -> Option<(P2pStreamingRpcId, &P2pStreamingRpcRequest)> {
166 match self {
167 Self::Ready {
168 local: P2pStreamingRpcLocalState::Responded { id, request, .. },
169 ..
170 } => Some((*id, request)),
171 _ => None,
172 }
173 }
174
175 #[allow(unused)]
176 pub(super) fn remote_request(&self) -> Option<&P2pStreamingRpcRequest> {
177 match self {
178 Self::Ready {
179 remote: P2pStreamingRpcRemoteState::Requested { request, .. },
180 ..
181 } => Some(request),
182 _ => None,
183 }
184 }
185
186 pub fn remote_todo_request(&self) -> Option<(P2pStreamingRpcId, &P2pStreamingRpcRequest)> {
187 match self {
188 Self::Ready {
189 remote:
190 P2pStreamingRpcRemoteState::Requested {
191 id,
192 request,
193 progress,
194 ..
195 },
196 ..
197 } if progress.external_data_todo() => Some((*id, request)),
198 _ => None,
199 }
200 }
201
202 pub fn remote_pending_request(&self) -> Option<(P2pStreamingRpcId, &P2pStreamingRpcRequest)> {
203 match self {
204 Self::Ready {
205 remote:
206 P2pStreamingRpcRemoteState::Requested {
207 id,
208 request,
209 progress,
210 ..
211 },
212 ..
213 } if progress.external_data_pending() => Some((*id, request)),
214 _ => None,
215 }
216 }
217
218 pub fn remote_next_msg(&self) -> Option<P2pStreamingRpcResponse> {
219 match self {
220 Self::Ready {
221 remote: P2pStreamingRpcRemoteState::Requested { progress, .. },
222 ..
223 } => progress.next_msg(),
224 _ => None,
225 }
226 }
227
228 pub fn remote_last_responded(&self) -> redux::Timestamp {
229 match self {
230 Self::Ready {
231 remote_last_responded,
232 ..
233 } => *remote_last_responded,
234 _ => redux::Timestamp::ZERO,
235 }
236 }
237}