p2p/channels/streaming_rpc/
p2p_channels_streaming_rpc_state.rs

1use serde::{Deserialize, Serialize};
2
3use crate::P2pTimeouts;
4
5use super::{
6    staged_ledger_parts::StagedLedgerPartsReceiveProgress, P2pStreamingRpcId, P2pStreamingRpcKind,
7    P2pStreamingRpcReceiveProgress, P2pStreamingRpcRequest, P2pStreamingRpcResponse,
8    P2pStreamingRpcResponseFull, P2pStreamingRpcSendProgress,
9};
10
11#[derive(Serialize, Deserialize, Debug, Clone)]
12pub enum P2pChannelsStreamingRpcState {
13    Disabled,
14    Enabled,
15    Init {
16        time: redux::Timestamp,
17    },
18    Pending {
19        time: redux::Timestamp,
20    },
21    Ready {
22        time: redux::Timestamp,
23        /// We are the requestors here.
24        local: P2pStreamingRpcLocalState,
25        /// We are the responders here.
26        remote: P2pStreamingRpcRemoteState,
27        remote_last_responded: redux::Timestamp,
28    },
29}
30
31#[derive(Serialize, Deserialize, Debug, Clone)]
32pub enum P2pStreamingRpcLocalState {
33    WaitingForRequest {
34        time: redux::Timestamp,
35    },
36    Requested {
37        time: redux::Timestamp,
38        id: P2pStreamingRpcId,
39        request: Box<P2pStreamingRpcRequest>,
40        progress: P2pStreamingRpcReceiveProgress,
41    },
42    Responded {
43        time: redux::Timestamp,
44        id: P2pStreamingRpcId,
45        request: Box<P2pStreamingRpcRequest>,
46    },
47}
48
49#[derive(Serialize, Deserialize, Debug, Clone)]
50pub enum P2pStreamingRpcRemoteState {
51    WaitingForRequest {
52        time: redux::Timestamp,
53    },
54    Requested {
55        time: redux::Timestamp,
56        id: P2pStreamingRpcId,
57        request: Box<P2pStreamingRpcRequest>,
58        progress: P2pStreamingRpcSendProgress,
59    },
60    Responded {
61        time: redux::Timestamp,
62        id: P2pStreamingRpcId,
63        request: Box<P2pStreamingRpcRequest>,
64    },
65}
66
67impl P2pChannelsStreamingRpcState {
68    pub fn is_ready(&self) -> bool {
69        matches!(self, Self::Ready { .. })
70    }
71
72    pub fn can_send_request(&self) -> bool {
73        match self {
74            Self::Ready { local, .. } => matches!(
75                local,
76                P2pStreamingRpcLocalState::WaitingForRequest { .. }
77                    | P2pStreamingRpcLocalState::Responded { .. }
78            ),
79            _ => false,
80        }
81    }
82
83    pub fn is_timed_out(
84        &self,
85        rpc_id: P2pStreamingRpcId,
86        now: redux::Timestamp,
87        config: &P2pTimeouts,
88    ) -> bool {
89        match self {
90            Self::Ready {
91                local:
92                    P2pStreamingRpcLocalState::Requested {
93                        id,
94                        request,
95                        progress,
96                        ..
97                    },
98                ..
99            } => {
100                rpc_id == *id
101                    && request
102                        .kind()
103                        .timeout(config)
104                        .and_then(|timeout| {
105                            let dur = now.checked_sub(progress.last_updated())?;
106                            Some(dur >= timeout)
107                        })
108                        .unwrap_or(false)
109            }
110            _ => false,
111        }
112    }
113
114    pub fn pending_local_rpc_id(&self) -> Option<P2pStreamingRpcId> {
115        match self {
116            Self::Ready {
117                local: P2pStreamingRpcLocalState::Requested { id, .. },
118                ..
119            } => Some(*id),
120            _ => None,
121        }
122    }
123
124    pub fn pending_local_rpc(&self) -> Option<&P2pStreamingRpcRequest> {
125        match self {
126            Self::Ready {
127                local: P2pStreamingRpcLocalState::Requested { request, .. },
128                ..
129            } => Some(request),
130            _ => None,
131        }
132    }
133
134    pub fn pending_local_rpc_kind(&self) -> Option<P2pStreamingRpcKind> {
135        self.pending_local_rpc().map(|req| req.kind())
136    }
137
138    pub fn pending_local_rpc_progress(&self) -> Option<&P2pStreamingRpcReceiveProgress> {
139        match self {
140            Self::Ready {
141                local: P2pStreamingRpcLocalState::Requested { progress, .. },
142                ..
143            } => Some(progress),
144            _ => None,
145        }
146    }
147
148    pub(super) fn local_done_response(&self) -> Option<P2pStreamingRpcResponseFull> {
149        match self {
150            Self::Ready {
151                local:
152                    P2pStreamingRpcLocalState::Requested {
153                        progress:
154                            P2pStreamingRpcReceiveProgress::StagedLedgerParts(
155                                StagedLedgerPartsReceiveProgress::Success { data, .. },
156                            ),
157                        ..
158                    },
159                ..
160            } => Some(data.clone().into()),
161            _ => None,
162        }
163    }
164
165    pub fn local_responded_request(&self) -> Option<(P2pStreamingRpcId, &P2pStreamingRpcRequest)> {
166        match self {
167            Self::Ready {
168                local: P2pStreamingRpcLocalState::Responded { id, request, .. },
169                ..
170            } => Some((*id, request)),
171            _ => None,
172        }
173    }
174
175    #[allow(unused)]
176    pub(super) fn remote_request(&self) -> Option<&P2pStreamingRpcRequest> {
177        match self {
178            Self::Ready {
179                remote: P2pStreamingRpcRemoteState::Requested { request, .. },
180                ..
181            } => Some(request),
182            _ => None,
183        }
184    }
185
186    pub fn remote_todo_request(&self) -> Option<(P2pStreamingRpcId, &P2pStreamingRpcRequest)> {
187        match self {
188            Self::Ready {
189                remote:
190                    P2pStreamingRpcRemoteState::Requested {
191                        id,
192                        request,
193                        progress,
194                        ..
195                    },
196                ..
197            } if progress.external_data_todo() => Some((*id, request)),
198            _ => None,
199        }
200    }
201
202    pub fn remote_pending_request(&self) -> Option<(P2pStreamingRpcId, &P2pStreamingRpcRequest)> {
203        match self {
204            Self::Ready {
205                remote:
206                    P2pStreamingRpcRemoteState::Requested {
207                        id,
208                        request,
209                        progress,
210                        ..
211                    },
212                ..
213            } if progress.external_data_pending() => Some((*id, request)),
214            _ => None,
215        }
216    }
217
218    pub fn remote_next_msg(&self) -> Option<P2pStreamingRpcResponse> {
219        match self {
220            Self::Ready {
221                remote: P2pStreamingRpcRemoteState::Requested { progress, .. },
222                ..
223            } => progress.next_msg(),
224            _ => None,
225        }
226    }
227
228    pub fn remote_last_responded(&self) -> redux::Timestamp {
229        match self {
230            Self::Ready {
231                remote_last_responded,
232                ..
233            } => *remote_last_responded,
234            _ => redux::Timestamp::ZERO,
235        }
236    }
237}