1use super::{
2 P2pChannelsRpcAction, P2pChannelsRpcState, P2pRpcLocalState, P2pRpcRemotePendingRequestState,
3 P2pRpcRemoteState, P2pRpcResponse, RpcChannelMsg, MAX_P2P_RPC_REMOTE_CONCURRENT_REQUESTS,
4};
5use crate::{
6 channels::{ChannelId, ChannelMsg, MsgId, P2pChannelsEffectfulAction},
7 P2pNetworkRpcAction, P2pPeerAction, P2pState,
8};
9use openmina_core::{block::BlockWithHash, bug_condition, error, Substate};
10use redux::ActionWithMeta;
11use std::collections::VecDeque;
12
13impl P2pChannelsRpcState {
14 pub fn reducer<Action, State>(
16 mut state_context: Substate<Action, State, P2pState>,
17 action: ActionWithMeta<P2pChannelsRpcAction>,
18 ) -> Result<(), String>
19 where
20 State: crate::P2pStateTrait,
21 Action: crate::P2pActionTrait<State>,
22 {
23 let (action, meta) = action.split();
24 let p2p_state = state_context.get_substate_mut()?;
25 let peer_id = *action.peer_id();
26 let is_libp2p = p2p_state.is_libp2p_peer(&peer_id);
27 let peer_state = &mut p2p_state
28 .get_ready_peer_mut(&peer_id)
29 .ok_or_else(|| format!("Peer state not found for: {action:?}"))?
30 .channels;
31
32 let next_local_rpc_id = &mut peer_state.next_local_rpc_id;
33 let rpc_state = &mut peer_state.rpc;
34
35 match action {
36 P2pChannelsRpcAction::Init { .. } => {
37 *rpc_state = Self::Init { time: meta.time() };
38
39 let dispatcher = state_context.into_dispatcher();
40
41 dispatcher.push(P2pChannelsEffectfulAction::InitChannel {
42 peer_id,
43 id: ChannelId::Rpc,
44 on_success: redux::callback!(
45 on_rpc_channel_init(peer_id: crate::PeerId) -> crate::P2pAction {
46 P2pChannelsRpcAction::Pending { peer_id }
47 }
48 ),
49 });
50 Ok(())
51 }
52 P2pChannelsRpcAction::Pending { .. } => {
53 *rpc_state = Self::Pending { time: meta.time() };
54 Ok(())
55 }
56 P2pChannelsRpcAction::Ready { .. } => {
57 *rpc_state = Self::Ready {
58 time: meta.time(),
59 local: P2pRpcLocalState::WaitingForRequest { time: meta.time() },
60 remote: P2pRpcRemoteState {
61 pending_requests: VecDeque::with_capacity(
62 MAX_P2P_RPC_REMOTE_CONCURRENT_REQUESTS,
63 ),
64 last_responded: redux::Timestamp::ZERO,
65 },
66 };
67
68 let (dispatcher, state) = state_context.into_dispatcher_and_state();
69 let p2p_state: &P2pState = state.substate()?;
70
71 if let Some(callback) = &p2p_state.callbacks.on_p2p_channels_rpc_ready {
72 dispatcher.push_callback(callback.clone(), peer_id);
73 }
74 Ok(())
75 }
76 P2pChannelsRpcAction::RequestSend {
77 id,
78 request,
79 on_init,
80 ..
81 } => {
82 let Self::Ready { local, .. } = rpc_state else {
83 bug_condition!(
84 "Invalid state for `P2pChannelsRpcAction::RequestSend`, state: {:?}",
85 rpc_state
86 );
87 return Ok(());
88 };
89 *next_local_rpc_id += 1;
90 *local = P2pRpcLocalState::Requested {
91 time: meta.time(),
92 id,
93 request: request.clone(),
94 };
95
96 let dispatcher = state_context.into_dispatcher();
97
98 #[cfg(feature = "p2p-libp2p")]
99 if is_libp2p {
100 if let Some((query, data)) =
101 super::libp2p::internal_request_into_libp2p(*request.clone(), id)
102 {
103 dispatcher.push(P2pNetworkRpcAction::OutgoingQuery {
104 peer_id,
105 query,
106 data,
107 });
108 }
109 if let Some(on_init) = on_init {
110 dispatcher.push_callback(on_init, (peer_id, id, *request));
111 }
112
113 return Ok(());
114 }
115
116 dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
117 peer_id,
118 msg_id: MsgId::first(),
119 msg: ChannelMsg::Rpc(RpcChannelMsg::Request(id, *request.clone())),
120 });
121
122 if let Some(callback) = on_init {
123 dispatcher.push_callback(callback, (peer_id, id, *request));
124 }
125 Ok(())
126 }
127 P2pChannelsRpcAction::Timeout { id, .. } => {
128 let (dispatcher, state) = state_context.into_dispatcher_and_state();
129 let p2p_state: &P2pState = state.substate()?;
130
131 if let Some(callback) = &p2p_state.callbacks.on_p2p_channels_rpc_timeout {
132 dispatcher.push_callback(callback.clone(), (peer_id, id));
133 }
134
135 Ok(())
136 }
137 P2pChannelsRpcAction::ResponseReceived {
138 response,
139 id: rpc_id,
140 ..
141 } => {
142 let Self::Ready { local, .. } = rpc_state else {
143 bug_condition!(
144 "Invalid state for `P2pChannelsRpcAction::ResponseReceived`, state: {:?}",
145 rpc_state
146 );
147 return Ok(());
148 };
149 let P2pRpcLocalState::Requested { id, request, .. } = local else {
150 bug_condition!(
151 "Invalid state for `P2pChannelsRpcAction::ResponseReceived`, state: {:?}",
152 rpc_state
153 );
154 return Ok(());
155 };
156 *local = P2pRpcLocalState::Responded {
157 time: meta.time(),
158 id: *id,
159 request: std::mem::take(request),
160 };
161
162 let (dispatcher, state) = state_context.into_dispatcher_and_state();
163 let p2p_state: &P2pState = state.substate()?;
164
165 if let Some(P2pRpcResponse::BestTipWithProof(resp)) = response.as_deref() {
166 let Ok(best_tip) = BlockWithHash::try_new(resp.best_tip.clone()) else {
167 error!(meta.time(); "P2pChannelsRpcAction::ResponseReceived: Invalid bigint in block");
168 return Ok(());
169 };
170
171 dispatcher.push(P2pPeerAction::BestTipUpdate { peer_id, best_tip });
172 }
173
174 if let Some(callback) = &p2p_state.callbacks.on_p2p_channels_rpc_response_received {
175 dispatcher.push_callback(callback.clone(), (peer_id, rpc_id, response));
176 }
177 Ok(())
178 }
179 P2pChannelsRpcAction::RequestReceived { id, request, .. } => {
180 let Self::Ready { remote, .. } = rpc_state else {
181 bug_condition!(
182 "Invalid state for `P2pChannelsRpcAction::RequestReceived`, state: {:?}",
183 rpc_state
184 );
185 return Ok(());
186 };
187 remote
188 .pending_requests
189 .push_back(P2pRpcRemotePendingRequestState {
190 time: meta.time(),
191 id,
192 request: *request.clone(),
193 is_pending: false,
194 });
195
196 let (dispatcher, state) = state_context.into_dispatcher_and_state();
197 let p2p_state: &P2pState = state.substate()?;
198
199 if let Some(callback) = &p2p_state.callbacks.on_p2p_channels_rpc_request_received {
200 dispatcher.push_callback(callback.clone(), (peer_id, id, request));
201 }
202 Ok(())
203 }
204 P2pChannelsRpcAction::ResponsePending { id, .. } => {
205 let Self::Ready { remote, .. } = rpc_state else {
206 bug_condition!(
207 "Invalid state for `P2pChannelsRpcAction::ResponsePending`, state: {:?}",
208 rpc_state
209 );
210 return Ok(());
211 };
212 if let Some(req) = remote.pending_requests.iter_mut().find(|r| r.id == id) {
213 req.is_pending = true;
214 }
215 Ok(())
216 }
217 P2pChannelsRpcAction::ResponseSend { id, response, .. } => {
218 let Self::Ready { remote, .. } = rpc_state else {
219 bug_condition!(
220 "Invalid state for `P2pChannelsRpcAction::ResponseSend`, state: {:?}",
221 rpc_state
222 );
223 return Ok(());
224 };
225
226 if let Some(pos) = remote.pending_requests.iter().position(|r| r.id == id) {
227 remote.pending_requests.remove(pos);
228 remote.last_responded = meta.time();
229 }
230
231 let dispatcher = state_context.into_dispatcher();
232
233 #[cfg(feature = "p2p-libp2p")]
234 if is_libp2p {
235 if let Some(response) = response {
236 if let Some((response, data)) =
237 super::libp2p::internal_response_into_libp2p(*response, id)
238 {
239 dispatcher.push(P2pNetworkRpcAction::OutgoingResponse {
240 peer_id,
241 response,
242 data,
243 });
244 }
245 }
246
247 return Ok(());
248 }
249
250 dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
251 peer_id,
252 msg_id: MsgId::first(),
253 msg: ChannelMsg::Rpc(RpcChannelMsg::Response(id, response.map(|v| *v))),
254 });
255 Ok(())
256 }
257 }
258 }
259}