p2p/channels/streaming_rpc/
p2p_channels_streaming_rpc_actions.rs

1use openmina_core::ActionEvent;
2use redux::Timestamp;
3use serde::{Deserialize, Serialize};
4
5use crate::{P2pState, PeerId};
6
7use super::{
8    P2pChannelsStreamingRpcState, P2pStreamingRpcId, P2pStreamingRpcLocalState,
9    P2pStreamingRpcRemoteState, P2pStreamingRpcRequest, P2pStreamingRpcResponse,
10    P2pStreamingRpcResponseFull,
11};
12
13pub type P2pChannelsStreamingRpcActionWithMetaRef<'a> =
14    redux::ActionWithMeta<&'a P2pChannelsStreamingRpcAction>;
15
16pub const MAX_P2P_RPC_REMOTE_CONCURRENT_REQUESTS: usize = 5;
17
18#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
19#[action_event(fields(display(peer_id)))]
20pub enum P2pChannelsStreamingRpcAction {
21    Init {
22        peer_id: PeerId,
23    },
24    Pending {
25        peer_id: PeerId,
26    },
27    Ready {
28        peer_id: PeerId,
29    },
30    RequestSend {
31        peer_id: PeerId,
32        id: P2pStreamingRpcId,
33        request: Box<P2pStreamingRpcRequest>,
34        on_init: Option<redux::Callback<(PeerId, P2pStreamingRpcId, P2pStreamingRpcRequest)>>,
35    },
36    Timeout {
37        peer_id: PeerId,
38        id: P2pStreamingRpcId,
39    },
40    ResponseNextPartGet {
41        peer_id: PeerId,
42        id: P2pStreamingRpcId,
43    },
44    ResponsePartReceived {
45        peer_id: PeerId,
46        id: P2pStreamingRpcId,
47        response: P2pStreamingRpcResponse,
48    },
49    ResponseReceived {
50        peer_id: PeerId,
51        id: P2pStreamingRpcId,
52        response: Option<P2pStreamingRpcResponseFull>,
53    },
54    RequestReceived {
55        peer_id: PeerId,
56        id: P2pStreamingRpcId,
57        request: Box<P2pStreamingRpcRequest>,
58    },
59    /// Response for the request sent by peer is pending. Dispatched when
60    /// we need data from an async component, like ledger, for constructing
61    /// the response.
62    ResponsePending {
63        peer_id: PeerId,
64        id: P2pStreamingRpcId,
65    },
66    ResponseSendInit {
67        peer_id: PeerId,
68        id: P2pStreamingRpcId,
69        response: Option<P2pStreamingRpcResponseFull>,
70    },
71    ResponsePartNextSend {
72        peer_id: PeerId,
73        id: P2pStreamingRpcId,
74    },
75    ResponsePartSend {
76        peer_id: PeerId,
77        id: P2pStreamingRpcId,
78        response: Box<P2pStreamingRpcResponse>,
79    },
80    ResponseSent {
81        peer_id: PeerId,
82        id: P2pStreamingRpcId,
83    },
84}
85
86impl P2pChannelsStreamingRpcAction {
87    pub fn peer_id(&self) -> &PeerId {
88        match self {
89            Self::Init { peer_id }
90            | Self::Pending { peer_id }
91            | Self::Ready { peer_id }
92            | Self::RequestSend { peer_id, .. }
93            | Self::Timeout { peer_id, .. }
94            | Self::ResponseNextPartGet { peer_id, .. }
95            | Self::ResponsePartReceived { peer_id, .. }
96            | Self::ResponseReceived { peer_id, .. }
97            | Self::RequestReceived { peer_id, .. }
98            | Self::ResponsePending { peer_id, .. }
99            | Self::ResponseSendInit { peer_id, .. }
100            | Self::ResponsePartNextSend { peer_id, .. }
101            | Self::ResponsePartSend { peer_id, .. }
102            | Self::ResponseSent { peer_id, .. } => peer_id,
103        }
104    }
105}
106
107impl redux::EnablingCondition<P2pState> for P2pChannelsStreamingRpcAction {
108    fn is_enabled(&self, state: &P2pState, time: Timestamp) -> bool {
109        match self {
110            P2pChannelsStreamingRpcAction::Init { peer_id } => state
111                .peers
112                .get(peer_id)
113                .filter(|p| !p.is_libp2p())
114                .and_then(|p| p.status.as_ready())
115                .is_some_and(|p| {
116                    matches!(
117                        p.channels.streaming_rpc,
118                        P2pChannelsStreamingRpcState::Enabled
119                    )
120                }),
121            P2pChannelsStreamingRpcAction::Pending { peer_id } => {
122                state.get_ready_peer(peer_id).is_some_and(|p| {
123                    matches!(
124                        p.channels.streaming_rpc,
125                        P2pChannelsStreamingRpcState::Init { .. }
126                    )
127                })
128            }
129            P2pChannelsStreamingRpcAction::Ready { peer_id } => {
130                state.get_ready_peer(peer_id).is_some_and(|p| {
131                    matches!(
132                        p.channels.streaming_rpc,
133                        P2pChannelsStreamingRpcState::Pending { .. }
134                    )
135                })
136            }
137            P2pChannelsStreamingRpcAction::RequestSend { peer_id, id, .. } => {
138                state.get_ready_peer(peer_id).is_some_and(|p| {
139                    matches!(
140                        &p.channels.streaming_rpc,
141                        P2pChannelsStreamingRpcState::Ready {
142                            local: P2pStreamingRpcLocalState::WaitingForRequest { .. }
143                                | P2pStreamingRpcLocalState::Responded { .. },
144                            ..
145                        } if p.channels.next_local_rpc_id() == *id
146                    )
147                })
148            }
149            P2pChannelsStreamingRpcAction::Timeout { peer_id, id } => {
150                state.get_ready_peer(peer_id).is_some_and(|p| {
151                    matches!(
152                        &p.channels.streaming_rpc,
153                        P2pChannelsStreamingRpcState::Ready {
154                            local: P2pStreamingRpcLocalState::Requested {
155                                id: rpc_id, .. },
156                            ..
157                        } if rpc_id == id
158                    )
159                }) && state.is_peer_streaming_rpc_timed_out(peer_id, *id, time)
160            }
161            P2pChannelsStreamingRpcAction::ResponseNextPartGet { peer_id, id, .. } => state
162                .get_ready_peer(peer_id)
163                .is_some_and(|p| match &p.channels.streaming_rpc {
164                    P2pChannelsStreamingRpcState::Ready {
165                        local:
166                            P2pStreamingRpcLocalState::Requested {
167                                id: rpc_id,
168                                progress,
169                                ..
170                            },
171                        ..
172                    } => rpc_id == id && !progress.is_done() && !progress.is_part_pending(),
173                    _ => false,
174                }),
175            P2pChannelsStreamingRpcAction::ResponsePartReceived {
176                peer_id,
177                id,
178                response,
179            } => state
180                .get_ready_peer(peer_id)
181                .is_some_and(|p| match &p.channels.streaming_rpc {
182                    P2pChannelsStreamingRpcState::Ready {
183                        local:
184                            P2pStreamingRpcLocalState::Requested {
185                                id: rpc_id,
186                                request,
187                                ..
188                            },
189                        ..
190                    } => rpc_id == id && response.kind() == request.kind(),
191                    _ => false,
192                }),
193            P2pChannelsStreamingRpcAction::ResponseReceived {
194                peer_id,
195                id,
196                response,
197            } => state
198                .get_ready_peer(peer_id)
199                .is_some_and(|p| match &p.channels.streaming_rpc {
200                    P2pChannelsStreamingRpcState::Ready {
201                        local:
202                            P2pStreamingRpcLocalState::Requested {
203                                id: rpc_id,
204                                request,
205                                progress,
206                                ..
207                            },
208                        ..
209                    } => {
210                        rpc_id == id
211                            && (response.is_none() || progress.is_done())
212                            && response
213                                .as_ref()
214                                .is_none_or(|resp| resp.kind() == request.kind())
215                    }
216                    _ => false,
217                }),
218            P2pChannelsStreamingRpcAction::RequestReceived { peer_id, .. } => state
219                .get_ready_peer(peer_id)
220                .is_some_and(|p| match &p.channels.streaming_rpc {
221                    P2pChannelsStreamingRpcState::Ready { remote, .. } => {
222                        matches!(
223                            remote,
224                            P2pStreamingRpcRemoteState::WaitingForRequest { .. }
225                                | P2pStreamingRpcRemoteState::Responded { .. }
226                        )
227                    }
228                    _ => false,
229                }),
230            P2pChannelsStreamingRpcAction::ResponsePending { peer_id, id } => state
231                .get_ready_peer(peer_id)
232                .and_then(|p| p.channels.streaming_rpc.remote_todo_request())
233                .is_some_and(|(rpc_id, _)| rpc_id == *id),
234            P2pChannelsStreamingRpcAction::ResponseSendInit {
235                peer_id,
236                id,
237                response,
238            } => state
239                .get_ready_peer(peer_id)
240                .and_then(|p| p.channels.streaming_rpc.remote_pending_request())
241                .is_some_and(|(rpc_id, req)| {
242                    rpc_id == *id
243                        && response
244                            .as_ref()
245                            .is_none_or(|resp| resp.kind() == req.kind())
246                }),
247            P2pChannelsStreamingRpcAction::ResponsePartNextSend { peer_id, id } => state
248                .get_ready_peer(peer_id)
249                .is_some_and(|p| match &p.channels.streaming_rpc {
250                    P2pChannelsStreamingRpcState::Ready {
251                        remote:
252                            P2pStreamingRpcRemoteState::Requested {
253                                id: rpc_id,
254                                progress,
255                                ..
256                            },
257                        ..
258                    } => rpc_id == id && !progress.is_done(),
259                    _ => false,
260                }),
261            P2pChannelsStreamingRpcAction::ResponsePartSend {
262                peer_id,
263                id,
264                response,
265            } => state
266                .get_ready_peer(peer_id)
267                .is_some_and(|p| match &p.channels.streaming_rpc {
268                    P2pChannelsStreamingRpcState::Ready {
269                        remote:
270                            P2pStreamingRpcRemoteState::Requested {
271                                id: rpc_id,
272                                request,
273                                progress,
274                                ..
275                            },
276                        ..
277                    } => rpc_id == id && !progress.is_done() && response.kind() == request.kind(),
278                    _ => false,
279                }),
280            P2pChannelsStreamingRpcAction::ResponseSent { peer_id, id } => state
281                .get_ready_peer(peer_id)
282                .is_some_and(|p| match &p.channels.streaming_rpc {
283                    P2pChannelsStreamingRpcState::Ready {
284                        remote:
285                            P2pStreamingRpcRemoteState::Requested {
286                                id: rpc_id,
287                                progress,
288                                ..
289                            },
290                        ..
291                    } => rpc_id == id && progress.is_done(),
292                    _ => false,
293                }),
294        }
295    }
296}
297
298use crate::channels::P2pChannelsAction;
299
300impl From<P2pChannelsStreamingRpcAction> for crate::P2pAction {
301    fn from(a: P2pChannelsStreamingRpcAction) -> Self {
302        Self::Channels(P2pChannelsAction::StreamingRpc(a))
303    }
304}