p2p/channels/streaming_rpc/
p2p_channels_streaming_rpc_reducer.rs

1use openmina_core::{bug_condition, Substate};
2use redux::ActionWithMeta;
3
4use crate::{
5    channels::{ChannelId, ChannelMsg, MsgId, P2pChannelsEffectfulAction},
6    P2pState,
7};
8
9use super::{
10    staged_ledger_parts::{StagedLedgerPartsReceiveProgress, StagedLedgerPartsSendProgress},
11    P2pChannelsStreamingRpcAction, P2pChannelsStreamingRpcState, P2pStreamingRpcLocalState,
12    P2pStreamingRpcRemoteState, P2pStreamingRpcRequest, P2pStreamingRpcResponseFull,
13    P2pStreamingRpcSendProgress, StreamingRpcChannelMsg,
14};
15
16impl P2pChannelsStreamingRpcState {
17    pub fn reducer<Action, State>(
18        mut state_context: Substate<Action, State, P2pState>,
19        action: ActionWithMeta<P2pChannelsStreamingRpcAction>,
20    ) -> Result<(), String>
21    where
22        State: crate::P2pStateTrait,
23        Action: crate::P2pActionTrait<State>,
24    {
25        let (action, meta) = action.split();
26        let peer_id = *action.peer_id();
27        let p2p_state = state_context.get_substate_mut()?;
28
29        let channels_state = &mut p2p_state
30            .get_ready_peer_mut(&peer_id)
31            .ok_or_else(|| format!("Invalid state for: {action:?}"))?
32            .channels;
33
34        let next_local_rpc_id = &mut channels_state.next_local_rpc_id;
35        let streaming_rpc_state = &mut channels_state.streaming_rpc;
36
37        match action {
38            P2pChannelsStreamingRpcAction::Init { .. } => {
39                *streaming_rpc_state = Self::Init { time: meta.time() };
40
41                let dispatcher = state_context.into_dispatcher();
42                dispatcher.push(P2pChannelsEffectfulAction::InitChannel {
43                    peer_id,
44                    id: ChannelId::StreamingRpc,
45                    on_success: redux::callback!(
46                        on_streaming_rpc_channel_init(peer_id: crate::PeerId) -> crate::P2pAction {
47                            P2pChannelsStreamingRpcAction::Pending { peer_id }
48                        }
49                    ),
50                });
51                Ok(())
52            }
53            P2pChannelsStreamingRpcAction::Pending { .. } => {
54                *streaming_rpc_state = Self::Pending { time: meta.time() };
55                Ok(())
56            }
57            P2pChannelsStreamingRpcAction::Ready { .. } => {
58                *streaming_rpc_state = Self::Ready {
59                    time: meta.time(),
60                    local: P2pStreamingRpcLocalState::WaitingForRequest { time: meta.time() },
61                    remote: P2pStreamingRpcRemoteState::WaitingForRequest { time: meta.time() },
62                    remote_last_responded: redux::Timestamp::ZERO,
63                };
64
65                let (dispatcher, state) = state_context.into_dispatcher_and_state();
66                let p2p_state: &P2pState = state.substate()?;
67
68                if let Some(callback) = &p2p_state.callbacks.on_p2p_channels_streaming_rpc_ready {
69                    dispatcher.push_callback(callback.clone(), ());
70                }
71                Ok(())
72            }
73            P2pChannelsStreamingRpcAction::RequestSend {
74                id,
75                request,
76                on_init,
77                ..
78            } => {
79                let Self::Ready { local, .. } = streaming_rpc_state else {
80                    bug_condition!(
81                        "`P2pChannelsStreamingRpcAction::RequestSend` with state {:?}",
82                        streaming_rpc_state
83                    );
84                    return Ok(());
85                };
86
87                *next_local_rpc_id += 1;
88                *local = P2pStreamingRpcLocalState::Requested {
89                    time: meta.time(),
90                    id,
91                    request: request.clone(),
92                    progress: match &*request {
93                        P2pStreamingRpcRequest::StagedLedgerParts(_) => {
94                            Into::into(StagedLedgerPartsReceiveProgress::BasePending {
95                                time: meta.time(),
96                            })
97                        }
98                    },
99                };
100
101                let dispatcher = state_context.into_dispatcher();
102                dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
103                    peer_id,
104                    msg_id: MsgId::first(),
105                    msg: StreamingRpcChannelMsg::Request(id, *request.clone()).into(),
106                });
107                if let Some(callback) = on_init {
108                    dispatcher.push_callback(callback, (peer_id, id, *request));
109                }
110                Ok(())
111            }
112            P2pChannelsStreamingRpcAction::Timeout { id, .. } => {
113                let (dispatcher, state) = state_context.into_dispatcher_and_state();
114                let p2p_state: &P2pState = state.substate()?;
115
116                if let Some(callback) = &p2p_state.callbacks.on_p2p_channels_streaming_rpc_timeout {
117                    dispatcher.push_callback(callback.clone(), (peer_id, id));
118                }
119
120                Ok(())
121            }
122            P2pChannelsStreamingRpcAction::ResponseNextPartGet { id, .. } => {
123                let Self::Ready {
124                    local: P2pStreamingRpcLocalState::Requested { progress, .. },
125                    ..
126                } = streaming_rpc_state
127                else {
128                    bug_condition!("{:?} with state {:?}", action, streaming_rpc_state);
129                    return Ok(());
130                };
131
132                if !progress.set_next_pending(meta.time()) {
133                    bug_condition!("progress state already pending: {progress:?}");
134                }
135
136                if !progress.is_part_pending() {
137                    bug_condition!("progress state is not pending {:?}", progress);
138                }
139
140                let dispatcher = state_context.into_dispatcher();
141                dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
142                    peer_id,
143                    msg_id: MsgId::first(),
144                    msg: ChannelMsg::StreamingRpc(StreamingRpcChannelMsg::Next(id)),
145                });
146                Ok(())
147            }
148            P2pChannelsStreamingRpcAction::ResponsePartReceived { response, id, .. } => {
149                let Self::Ready {
150                    local: P2pStreamingRpcLocalState::Requested { progress, .. },
151                    ..
152                } = streaming_rpc_state
153                else {
154                    bug_condition!(
155                        "`P2pChannelsStreamingRpcAction::ResponsePartReceived` with state {:?}",
156                        streaming_rpc_state
157                    );
158                    return Ok(());
159                };
160                if !progress.update(meta.time(), response) {
161                    bug_condition!("progress response mismatch! {progress:?}");
162                }
163
164                let (dispatcher, state) = state_context.into_dispatcher_and_state();
165                let state: &P2pState = state.substate()?;
166                let Some(peer) = state.get_ready_peer(&peer_id) else {
167                    return Ok(());
168                };
169
170                if let Some(response) = peer.channels.streaming_rpc.local_done_response() {
171                    dispatcher.push(P2pChannelsStreamingRpcAction::ResponseReceived {
172                        peer_id,
173                        id,
174                        response: Some(response),
175                    });
176                    return Ok(());
177                }
178                dispatcher.push(P2pChannelsStreamingRpcAction::ResponseNextPartGet { peer_id, id });
179                Ok(())
180            }
181            P2pChannelsStreamingRpcAction::ResponseReceived {
182                id: rpc_id,
183                response,
184                ..
185            } => {
186                let Self::Ready { local, .. } = streaming_rpc_state else {
187                    bug_condition!(
188                        "`P2pChannelsStreamingRpcAction::ResponseReceived` with state {:?}",
189                        streaming_rpc_state
190                    );
191                    return Ok(());
192                };
193                let P2pStreamingRpcLocalState::Requested { id, request, .. } = local else {
194                    bug_condition!(
195                        "`P2pChannelsStreamingRpcAction::ResponseReceived` with state {:?}",
196                        streaming_rpc_state
197                    );
198                    return Ok(());
199                };
200                *local = P2pStreamingRpcLocalState::Responded {
201                    time: meta.time(),
202                    id: *id,
203                    request: std::mem::take(request),
204                };
205
206                let (dispatcher, state) = state_context.into_dispatcher_and_state();
207                let p2p_state: &P2pState = state.substate()?;
208
209                if let Some(callback) = &p2p_state
210                    .callbacks
211                    .on_p2p_channels_streaming_rpc_response_received
212                {
213                    dispatcher.push_callback(callback.clone(), (peer_id, rpc_id, response))
214                }
215
216                Ok(())
217            }
218            P2pChannelsStreamingRpcAction::RequestReceived { id, request, .. } => {
219                let Self::Ready { remote, .. } = streaming_rpc_state else {
220                    bug_condition!(
221                        "`P2pChannelsStreamingRpcAction::RequestReceived` with state {:?}",
222                        streaming_rpc_state
223                    );
224                    return Ok(());
225                };
226                *remote = P2pStreamingRpcRemoteState::Requested {
227                    time: meta.time(),
228                    id,
229                    request,
230                    progress: StagedLedgerPartsSendProgress::LedgerGetIdle { time: meta.time() }
231                        .into(),
232                };
233                // async ledger request will be triggered by `LedgerReadAction::FindTodos`.
234                Ok(())
235            }
236            P2pChannelsStreamingRpcAction::ResponsePending { .. } => {
237                let Self::Ready {
238                    remote:
239                        P2pStreamingRpcRemoteState::Requested {
240                            request, progress, ..
241                        },
242                    ..
243                } = streaming_rpc_state
244                else {
245                    bug_condition!("{:?} with state {:?}", action, streaming_rpc_state);
246                    return Ok(());
247                };
248                match &**request {
249                    P2pStreamingRpcRequest::StagedLedgerParts(_) => {
250                        *progress =
251                            StagedLedgerPartsSendProgress::LedgerGetPending { time: meta.time() }
252                                .into();
253                    }
254                }
255                Ok(())
256            }
257            P2pChannelsStreamingRpcAction::ResponseSendInit { response, id, .. } => {
258                let Self::Ready {
259                    remote:
260                        P2pStreamingRpcRemoteState::Requested {
261                            request, progress, ..
262                        },
263                    ..
264                } = streaming_rpc_state
265                else {
266                    bug_condition!(
267                        "`P2pChannelsStreamingRpcAction::ResponseSendInit` with state {:?}",
268                        streaming_rpc_state
269                    );
270                    return Ok(());
271                };
272                match (&**request, &response) {
273                    (_, Some(P2pStreamingRpcResponseFull::StagedLedgerParts(data))) => {
274                        *progress = StagedLedgerPartsSendProgress::LedgerGetSuccess {
275                            time: meta.time(),
276                            data: Some(data.clone()),
277                        }
278                        .into();
279                    }
280                    (P2pStreamingRpcRequest::StagedLedgerParts(_), None) => {
281                        *progress =
282                            StagedLedgerPartsSendProgress::Success { time: meta.time() }.into();
283                    } // _ => todo!("unexpected response send call: {response:?}"),
284                }
285
286                let dispatcher = state_context.into_dispatcher();
287                if response.is_none() {
288                    let msg = StreamingRpcChannelMsg::Response(id, None).into();
289                    dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
290                        peer_id,
291                        msg_id: MsgId::first(),
292                        msg,
293                    });
294                    dispatcher.push(P2pChannelsStreamingRpcAction::ResponseSent { peer_id, id });
295                    return Ok(());
296                }
297
298                dispatcher
299                    .push(P2pChannelsStreamingRpcAction::ResponsePartNextSend { peer_id, id });
300                Ok(())
301            }
302            P2pChannelsStreamingRpcAction::ResponsePartNextSend { id, .. } => {
303                let (dispatcher, state) = state_context.into_dispatcher_and_state();
304                let state: &P2pState = state.substate()?;
305
306                let Some(response) = state
307                    .get_ready_peer(&peer_id)
308                    .and_then(|peer| peer.channels.streaming_rpc.remote_next_msg().map(Box::new))
309                else {
310                    return Ok(());
311                };
312
313                dispatcher.push(P2pChannelsStreamingRpcAction::ResponsePartSend {
314                    peer_id,
315                    id,
316                    response,
317                });
318
319                Ok(())
320            }
321            P2pChannelsStreamingRpcAction::ResponsePartSend { id, response, .. } => {
322                let Self::Ready {
323                    remote: P2pStreamingRpcRemoteState::Requested { progress, .. },
324                    ..
325                } = streaming_rpc_state
326                else {
327                    bug_condition!(
328                        "`P2pChannelsStreamingRpcAction::ResponsePartSend` with state {:?}",
329                        streaming_rpc_state
330                    );
331                    return Ok(());
332                };
333                match progress {
334                    P2pStreamingRpcSendProgress::StagedLedgerParts(progress) => {
335                        *progress = match progress {
336                            StagedLedgerPartsSendProgress::LedgerGetSuccess {
337                                data: Some(data),
338                                ..
339                            } => StagedLedgerPartsSendProgress::BaseSent {
340                                time: meta.time(),
341                                data: data.clone(),
342                            },
343                            StagedLedgerPartsSendProgress::BaseSent { data, .. } => {
344                                StagedLedgerPartsSendProgress::ScanStateBaseSent {
345                                    time: meta.time(),
346                                    data: data.clone(),
347                                }
348                            }
349                            StagedLedgerPartsSendProgress::ScanStateBaseSent { data, .. } => {
350                                StagedLedgerPartsSendProgress::PreviousIncompleteZkappUpdatesSent {
351                                    time: meta.time(),
352                                    data: data.clone(),
353                                }
354                            }
355                            StagedLedgerPartsSendProgress::PreviousIncompleteZkappUpdatesSent {
356                                data,
357                                ..
358                            } => StagedLedgerPartsSendProgress::ScanStateTreesSending {
359                                time: meta.time(),
360                                data: data.clone(),
361                                tree_index: 0,
362                            },
363                            StagedLedgerPartsSendProgress::ScanStateTreesSending {
364                                data,
365                                tree_index,
366                                ..
367                            } => StagedLedgerPartsSendProgress::ScanStateTreesSending {
368                                time: meta.time(),
369                                data: data.clone(),
370                                tree_index: *tree_index + 1,
371                            },
372                            progress => {
373                                bug_condition!("unexpected state during `P2pStreamingRpcSendProgress::StagedLedgerParts`: {progress:?}");
374                                return Ok(());
375                            }
376                        };
377
378                        if let StagedLedgerPartsSendProgress::ScanStateTreesSending {
379                            data,
380                            tree_index,
381                            ..
382                        } = progress
383                        {
384                            let target_index = data.scan_state.scan_state.trees.1.len();
385                            if *tree_index >= target_index {
386                                *progress =
387                                    StagedLedgerPartsSendProgress::Success { time: meta.time() };
388                            }
389                        }
390                    }
391                }
392
393                let dispatcher = state_context.into_dispatcher();
394
395                let msg = StreamingRpcChannelMsg::Response(id, Some(*response)).into();
396                dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
397                    peer_id,
398                    msg_id: MsgId::first(),
399                    msg,
400                });
401                dispatcher.push(P2pChannelsStreamingRpcAction::ResponseSent { peer_id, id });
402                Ok(())
403            }
404            P2pChannelsStreamingRpcAction::ResponseSent { id, .. } => {
405                let (remote, request) = match streaming_rpc_state {
406                    Self::Ready { remote, .. } => match remote {
407                        P2pStreamingRpcRemoteState::Requested { request, .. } => {
408                            let request = std::mem::take(request);
409                            (remote, request)
410                        }
411                        _ => {
412                            bug_condition!(
413                                "`P2pChannelsStreamingRpcAction::ResponseSent` with state {:?}",
414                                streaming_rpc_state
415                            );
416                            return Ok(());
417                        }
418                    },
419                    _ => {
420                        bug_condition!(
421                            "`P2pChannelsStreamingRpcAction::ResponseSent` with state {:?}",
422                            streaming_rpc_state
423                        );
424                        return Ok(());
425                    }
426                };
427                *remote = P2pStreamingRpcRemoteState::Responded {
428                    time: meta.time(),
429                    id,
430                    request,
431                };
432
433                Ok(())
434            }
435        }
436    }
437}