p2p/channels/rpc/
p2p_channels_rpc_reducer.rs

1use super::{
2    P2pChannelsRpcAction, P2pChannelsRpcState, P2pRpcLocalState, P2pRpcRemotePendingRequestState,
3    P2pRpcRemoteState, P2pRpcResponse, RpcChannelMsg, MAX_P2P_RPC_REMOTE_CONCURRENT_REQUESTS,
4};
5use crate::{
6    channels::{ChannelId, ChannelMsg, MsgId, P2pChannelsEffectfulAction},
7    P2pNetworkRpcAction, P2pPeerAction, P2pState,
8};
9use openmina_core::{block::BlockWithHash, bug_condition, error, Substate};
10use redux::ActionWithMeta;
11use std::collections::VecDeque;
12
13impl P2pChannelsRpcState {
14    /// Substate is accessed
15    pub fn reducer<Action, State>(
16        mut state_context: Substate<Action, State, P2pState>,
17        action: ActionWithMeta<P2pChannelsRpcAction>,
18    ) -> Result<(), String>
19    where
20        State: crate::P2pStateTrait,
21        Action: crate::P2pActionTrait<State>,
22    {
23        let (action, meta) = action.split();
24        let p2p_state = state_context.get_substate_mut()?;
25        let peer_id = *action.peer_id();
26        let is_libp2p = p2p_state.is_libp2p_peer(&peer_id);
27        let peer_state = &mut p2p_state
28            .get_ready_peer_mut(&peer_id)
29            .ok_or_else(|| format!("Peer state not found for: {action:?}"))?
30            .channels;
31
32        let next_local_rpc_id = &mut peer_state.next_local_rpc_id;
33        let rpc_state = &mut peer_state.rpc;
34
35        match action {
36            P2pChannelsRpcAction::Init { .. } => {
37                *rpc_state = Self::Init { time: meta.time() };
38
39                let dispatcher = state_context.into_dispatcher();
40
41                dispatcher.push(P2pChannelsEffectfulAction::InitChannel {
42                    peer_id,
43                    id: ChannelId::Rpc,
44                    on_success: redux::callback!(
45                        on_rpc_channel_init(peer_id: crate::PeerId) -> crate::P2pAction {
46                            P2pChannelsRpcAction::Pending { peer_id }
47                        }
48                    ),
49                });
50                Ok(())
51            }
52            P2pChannelsRpcAction::Pending { .. } => {
53                *rpc_state = Self::Pending { time: meta.time() };
54                Ok(())
55            }
56            P2pChannelsRpcAction::Ready { .. } => {
57                *rpc_state = Self::Ready {
58                    time: meta.time(),
59                    local: P2pRpcLocalState::WaitingForRequest { time: meta.time() },
60                    remote: P2pRpcRemoteState {
61                        pending_requests: VecDeque::with_capacity(
62                            MAX_P2P_RPC_REMOTE_CONCURRENT_REQUESTS,
63                        ),
64                        last_responded: redux::Timestamp::ZERO,
65                    },
66                };
67
68                let (dispatcher, state) = state_context.into_dispatcher_and_state();
69                let p2p_state: &P2pState = state.substate()?;
70
71                if let Some(callback) = &p2p_state.callbacks.on_p2p_channels_rpc_ready {
72                    dispatcher.push_callback(callback.clone(), peer_id);
73                }
74                Ok(())
75            }
76            P2pChannelsRpcAction::RequestSend {
77                id,
78                request,
79                on_init,
80                ..
81            } => {
82                let Self::Ready { local, .. } = rpc_state else {
83                    bug_condition!(
84                        "Invalid state for `P2pChannelsRpcAction::RequestSend`, state: {:?}",
85                        rpc_state
86                    );
87                    return Ok(());
88                };
89                *next_local_rpc_id += 1;
90                *local = P2pRpcLocalState::Requested {
91                    time: meta.time(),
92                    id,
93                    request: request.clone(),
94                };
95
96                let dispatcher = state_context.into_dispatcher();
97
98                #[cfg(feature = "p2p-libp2p")]
99                if is_libp2p {
100                    if let Some((query, data)) =
101                        super::libp2p::internal_request_into_libp2p(*request.clone(), id)
102                    {
103                        dispatcher.push(P2pNetworkRpcAction::OutgoingQuery {
104                            peer_id,
105                            query,
106                            data,
107                        });
108                    }
109                    if let Some(on_init) = on_init {
110                        dispatcher.push_callback(on_init, (peer_id, id, *request));
111                    }
112
113                    return Ok(());
114                }
115
116                dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
117                    peer_id,
118                    msg_id: MsgId::first(),
119                    msg: ChannelMsg::Rpc(RpcChannelMsg::Request(id, *request.clone())),
120                });
121
122                if let Some(callback) = on_init {
123                    dispatcher.push_callback(callback, (peer_id, id, *request));
124                }
125                Ok(())
126            }
127            P2pChannelsRpcAction::Timeout { id, .. } => {
128                let (dispatcher, state) = state_context.into_dispatcher_and_state();
129                let p2p_state: &P2pState = state.substate()?;
130
131                if let Some(callback) = &p2p_state.callbacks.on_p2p_channels_rpc_timeout {
132                    dispatcher.push_callback(callback.clone(), (peer_id, id));
133                }
134
135                Ok(())
136            }
137            P2pChannelsRpcAction::ResponseReceived {
138                response,
139                id: rpc_id,
140                ..
141            } => {
142                let Self::Ready { local, .. } = rpc_state else {
143                    bug_condition!(
144                        "Invalid state for `P2pChannelsRpcAction::ResponseReceived`, state: {:?}",
145                        rpc_state
146                    );
147                    return Ok(());
148                };
149                let P2pRpcLocalState::Requested { id, request, .. } = local else {
150                    bug_condition!(
151                        "Invalid state for `P2pChannelsRpcAction::ResponseReceived`, state: {:?}",
152                        rpc_state
153                    );
154                    return Ok(());
155                };
156                *local = P2pRpcLocalState::Responded {
157                    time: meta.time(),
158                    id: *id,
159                    request: std::mem::take(request),
160                };
161
162                let (dispatcher, state) = state_context.into_dispatcher_and_state();
163                let p2p_state: &P2pState = state.substate()?;
164
165                if let Some(P2pRpcResponse::BestTipWithProof(resp)) = response.as_deref() {
166                    let Ok(best_tip) = BlockWithHash::try_new(resp.best_tip.clone()) else {
167                        error!(meta.time(); "P2pChannelsRpcAction::ResponseReceived: Invalid bigint in block");
168                        return Ok(());
169                    };
170
171                    dispatcher.push(P2pPeerAction::BestTipUpdate { peer_id, best_tip });
172                }
173
174                if let Some(callback) = &p2p_state.callbacks.on_p2p_channels_rpc_response_received {
175                    dispatcher.push_callback(callback.clone(), (peer_id, rpc_id, response));
176                }
177                Ok(())
178            }
179            P2pChannelsRpcAction::RequestReceived { id, request, .. } => {
180                let Self::Ready { remote, .. } = rpc_state else {
181                    bug_condition!(
182                        "Invalid state for `P2pChannelsRpcAction::RequestReceived`, state: {:?}",
183                        rpc_state
184                    );
185                    return Ok(());
186                };
187                remote
188                    .pending_requests
189                    .push_back(P2pRpcRemotePendingRequestState {
190                        time: meta.time(),
191                        id,
192                        request: *request.clone(),
193                        is_pending: false,
194                    });
195
196                let (dispatcher, state) = state_context.into_dispatcher_and_state();
197                let p2p_state: &P2pState = state.substate()?;
198
199                if let Some(callback) = &p2p_state.callbacks.on_p2p_channels_rpc_request_received {
200                    dispatcher.push_callback(callback.clone(), (peer_id, id, request));
201                }
202                Ok(())
203            }
204            P2pChannelsRpcAction::ResponsePending { id, .. } => {
205                let Self::Ready { remote, .. } = rpc_state else {
206                    bug_condition!(
207                        "Invalid state for `P2pChannelsRpcAction::ResponsePending`, state: {:?}",
208                        rpc_state
209                    );
210                    return Ok(());
211                };
212                if let Some(req) = remote.pending_requests.iter_mut().find(|r| r.id == id) {
213                    req.is_pending = true;
214                }
215                Ok(())
216            }
217            P2pChannelsRpcAction::ResponseSend { id, response, .. } => {
218                let Self::Ready { remote, .. } = rpc_state else {
219                    bug_condition!(
220                        "Invalid state for `P2pChannelsRpcAction::ResponseSend`, state: {:?}",
221                        rpc_state
222                    );
223                    return Ok(());
224                };
225
226                if let Some(pos) = remote.pending_requests.iter().position(|r| r.id == id) {
227                    remote.pending_requests.remove(pos);
228                    remote.last_responded = meta.time();
229                }
230
231                let dispatcher = state_context.into_dispatcher();
232
233                #[cfg(feature = "p2p-libp2p")]
234                if is_libp2p {
235                    if let Some(response) = response {
236                        if let Some((response, data)) =
237                            super::libp2p::internal_response_into_libp2p(*response, id)
238                        {
239                            dispatcher.push(P2pNetworkRpcAction::OutgoingResponse {
240                                peer_id,
241                                response,
242                                data,
243                            });
244                        }
245                    }
246
247                    return Ok(());
248                }
249
250                dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
251                    peer_id,
252                    msg_id: MsgId::first(),
253                    msg: ChannelMsg::Rpc(RpcChannelMsg::Response(id, response.map(|v| *v))),
254                });
255                Ok(())
256            }
257        }
258    }
259}