p2p/channels/streaming_rpc/
p2p_channels_streaming_rpc_actions.rs1use openmina_core::ActionEvent;
2use redux::Timestamp;
3use serde::{Deserialize, Serialize};
4
5use crate::{P2pState, PeerId};
6
7use super::{
8 P2pChannelsStreamingRpcState, P2pStreamingRpcId, P2pStreamingRpcLocalState,
9 P2pStreamingRpcRemoteState, P2pStreamingRpcRequest, P2pStreamingRpcResponse,
10 P2pStreamingRpcResponseFull,
11};
12
13pub type P2pChannelsStreamingRpcActionWithMetaRef<'a> =
14 redux::ActionWithMeta<&'a P2pChannelsStreamingRpcAction>;
15
16pub const MAX_P2P_RPC_REMOTE_CONCURRENT_REQUESTS: usize = 5;
17
18#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
19#[action_event(fields(display(peer_id)))]
20pub enum P2pChannelsStreamingRpcAction {
21 Init {
22 peer_id: PeerId,
23 },
24 Pending {
25 peer_id: PeerId,
26 },
27 Ready {
28 peer_id: PeerId,
29 },
30 RequestSend {
31 peer_id: PeerId,
32 id: P2pStreamingRpcId,
33 request: Box<P2pStreamingRpcRequest>,
34 on_init: Option<redux::Callback<(PeerId, P2pStreamingRpcId, P2pStreamingRpcRequest)>>,
35 },
36 Timeout {
37 peer_id: PeerId,
38 id: P2pStreamingRpcId,
39 },
40 ResponseNextPartGet {
41 peer_id: PeerId,
42 id: P2pStreamingRpcId,
43 },
44 ResponsePartReceived {
45 peer_id: PeerId,
46 id: P2pStreamingRpcId,
47 response: P2pStreamingRpcResponse,
48 },
49 ResponseReceived {
50 peer_id: PeerId,
51 id: P2pStreamingRpcId,
52 response: Option<P2pStreamingRpcResponseFull>,
53 },
54 RequestReceived {
55 peer_id: PeerId,
56 id: P2pStreamingRpcId,
57 request: Box<P2pStreamingRpcRequest>,
58 },
59 ResponsePending {
63 peer_id: PeerId,
64 id: P2pStreamingRpcId,
65 },
66 ResponseSendInit {
67 peer_id: PeerId,
68 id: P2pStreamingRpcId,
69 response: Option<P2pStreamingRpcResponseFull>,
70 },
71 ResponsePartNextSend {
72 peer_id: PeerId,
73 id: P2pStreamingRpcId,
74 },
75 ResponsePartSend {
76 peer_id: PeerId,
77 id: P2pStreamingRpcId,
78 response: Box<P2pStreamingRpcResponse>,
79 },
80 ResponseSent {
81 peer_id: PeerId,
82 id: P2pStreamingRpcId,
83 },
84}
85
86impl P2pChannelsStreamingRpcAction {
87 pub fn peer_id(&self) -> &PeerId {
88 match self {
89 Self::Init { peer_id }
90 | Self::Pending { peer_id }
91 | Self::Ready { peer_id }
92 | Self::RequestSend { peer_id, .. }
93 | Self::Timeout { peer_id, .. }
94 | Self::ResponseNextPartGet { peer_id, .. }
95 | Self::ResponsePartReceived { peer_id, .. }
96 | Self::ResponseReceived { peer_id, .. }
97 | Self::RequestReceived { peer_id, .. }
98 | Self::ResponsePending { peer_id, .. }
99 | Self::ResponseSendInit { peer_id, .. }
100 | Self::ResponsePartNextSend { peer_id, .. }
101 | Self::ResponsePartSend { peer_id, .. }
102 | Self::ResponseSent { peer_id, .. } => peer_id,
103 }
104 }
105}
106
107impl redux::EnablingCondition<P2pState> for P2pChannelsStreamingRpcAction {
108 fn is_enabled(&self, state: &P2pState, time: Timestamp) -> bool {
109 match self {
110 P2pChannelsStreamingRpcAction::Init { peer_id } => state
111 .peers
112 .get(peer_id)
113 .filter(|p| !p.is_libp2p())
114 .and_then(|p| p.status.as_ready())
115 .is_some_and(|p| {
116 matches!(
117 p.channels.streaming_rpc,
118 P2pChannelsStreamingRpcState::Enabled
119 )
120 }),
121 P2pChannelsStreamingRpcAction::Pending { peer_id } => {
122 state.get_ready_peer(peer_id).is_some_and(|p| {
123 matches!(
124 p.channels.streaming_rpc,
125 P2pChannelsStreamingRpcState::Init { .. }
126 )
127 })
128 }
129 P2pChannelsStreamingRpcAction::Ready { peer_id } => {
130 state.get_ready_peer(peer_id).is_some_and(|p| {
131 matches!(
132 p.channels.streaming_rpc,
133 P2pChannelsStreamingRpcState::Pending { .. }
134 )
135 })
136 }
137 P2pChannelsStreamingRpcAction::RequestSend { peer_id, id, .. } => {
138 state.get_ready_peer(peer_id).is_some_and(|p| {
139 matches!(
140 &p.channels.streaming_rpc,
141 P2pChannelsStreamingRpcState::Ready {
142 local: P2pStreamingRpcLocalState::WaitingForRequest { .. }
143 | P2pStreamingRpcLocalState::Responded { .. },
144 ..
145 } if p.channels.next_local_rpc_id() == *id
146 )
147 })
148 }
149 P2pChannelsStreamingRpcAction::Timeout { peer_id, id } => {
150 state.get_ready_peer(peer_id).is_some_and(|p| {
151 matches!(
152 &p.channels.streaming_rpc,
153 P2pChannelsStreamingRpcState::Ready {
154 local: P2pStreamingRpcLocalState::Requested {
155 id: rpc_id, .. },
156 ..
157 } if rpc_id == id
158 )
159 }) && state.is_peer_streaming_rpc_timed_out(peer_id, *id, time)
160 }
161 P2pChannelsStreamingRpcAction::ResponseNextPartGet { peer_id, id, .. } => state
162 .get_ready_peer(peer_id)
163 .is_some_and(|p| match &p.channels.streaming_rpc {
164 P2pChannelsStreamingRpcState::Ready {
165 local:
166 P2pStreamingRpcLocalState::Requested {
167 id: rpc_id,
168 progress,
169 ..
170 },
171 ..
172 } => rpc_id == id && !progress.is_done() && !progress.is_part_pending(),
173 _ => false,
174 }),
175 P2pChannelsStreamingRpcAction::ResponsePartReceived {
176 peer_id,
177 id,
178 response,
179 } => state
180 .get_ready_peer(peer_id)
181 .is_some_and(|p| match &p.channels.streaming_rpc {
182 P2pChannelsStreamingRpcState::Ready {
183 local:
184 P2pStreamingRpcLocalState::Requested {
185 id: rpc_id,
186 request,
187 ..
188 },
189 ..
190 } => rpc_id == id && response.kind() == request.kind(),
191 _ => false,
192 }),
193 P2pChannelsStreamingRpcAction::ResponseReceived {
194 peer_id,
195 id,
196 response,
197 } => state
198 .get_ready_peer(peer_id)
199 .is_some_and(|p| match &p.channels.streaming_rpc {
200 P2pChannelsStreamingRpcState::Ready {
201 local:
202 P2pStreamingRpcLocalState::Requested {
203 id: rpc_id,
204 request,
205 progress,
206 ..
207 },
208 ..
209 } => {
210 rpc_id == id
211 && (response.is_none() || progress.is_done())
212 && response
213 .as_ref()
214 .is_none_or(|resp| resp.kind() == request.kind())
215 }
216 _ => false,
217 }),
218 P2pChannelsStreamingRpcAction::RequestReceived { peer_id, .. } => state
219 .get_ready_peer(peer_id)
220 .is_some_and(|p| match &p.channels.streaming_rpc {
221 P2pChannelsStreamingRpcState::Ready { remote, .. } => {
222 matches!(
223 remote,
224 P2pStreamingRpcRemoteState::WaitingForRequest { .. }
225 | P2pStreamingRpcRemoteState::Responded { .. }
226 )
227 }
228 _ => false,
229 }),
230 P2pChannelsStreamingRpcAction::ResponsePending { peer_id, id } => state
231 .get_ready_peer(peer_id)
232 .and_then(|p| p.channels.streaming_rpc.remote_todo_request())
233 .is_some_and(|(rpc_id, _)| rpc_id == *id),
234 P2pChannelsStreamingRpcAction::ResponseSendInit {
235 peer_id,
236 id,
237 response,
238 } => state
239 .get_ready_peer(peer_id)
240 .and_then(|p| p.channels.streaming_rpc.remote_pending_request())
241 .is_some_and(|(rpc_id, req)| {
242 rpc_id == *id
243 && response
244 .as_ref()
245 .is_none_or(|resp| resp.kind() == req.kind())
246 }),
247 P2pChannelsStreamingRpcAction::ResponsePartNextSend { peer_id, id } => state
248 .get_ready_peer(peer_id)
249 .is_some_and(|p| match &p.channels.streaming_rpc {
250 P2pChannelsStreamingRpcState::Ready {
251 remote:
252 P2pStreamingRpcRemoteState::Requested {
253 id: rpc_id,
254 progress,
255 ..
256 },
257 ..
258 } => rpc_id == id && !progress.is_done(),
259 _ => false,
260 }),
261 P2pChannelsStreamingRpcAction::ResponsePartSend {
262 peer_id,
263 id,
264 response,
265 } => state
266 .get_ready_peer(peer_id)
267 .is_some_and(|p| match &p.channels.streaming_rpc {
268 P2pChannelsStreamingRpcState::Ready {
269 remote:
270 P2pStreamingRpcRemoteState::Requested {
271 id: rpc_id,
272 request,
273 progress,
274 ..
275 },
276 ..
277 } => rpc_id == id && !progress.is_done() && response.kind() == request.kind(),
278 _ => false,
279 }),
280 P2pChannelsStreamingRpcAction::ResponseSent { peer_id, id } => state
281 .get_ready_peer(peer_id)
282 .is_some_and(|p| match &p.channels.streaming_rpc {
283 P2pChannelsStreamingRpcState::Ready {
284 remote:
285 P2pStreamingRpcRemoteState::Requested {
286 id: rpc_id,
287 progress,
288 ..
289 },
290 ..
291 } => rpc_id == id && progress.is_done(),
292 _ => false,
293 }),
294 }
295 }
296}
297
298use crate::channels::P2pChannelsAction;
299
300impl From<P2pChannelsStreamingRpcAction> for crate::P2pAction {
301 fn from(a: P2pChannelsStreamingRpcAction) -> Self {
302 Self::Channels(P2pChannelsAction::StreamingRpc(a))
303 }
304}