1use openmina_core::{bug_condition, Substate};
2use redux::ActionWithMeta;
3
4use crate::{
5 channels::{ChannelId, ChannelMsg, MsgId, P2pChannelsEffectfulAction},
6 P2pState,
7};
8
9use super::{
10 staged_ledger_parts::{StagedLedgerPartsReceiveProgress, StagedLedgerPartsSendProgress},
11 P2pChannelsStreamingRpcAction, P2pChannelsStreamingRpcState, P2pStreamingRpcLocalState,
12 P2pStreamingRpcRemoteState, P2pStreamingRpcRequest, P2pStreamingRpcResponseFull,
13 P2pStreamingRpcSendProgress, StreamingRpcChannelMsg,
14};
15
16impl P2pChannelsStreamingRpcState {
17 pub fn reducer<Action, State>(
18 mut state_context: Substate<Action, State, P2pState>,
19 action: ActionWithMeta<P2pChannelsStreamingRpcAction>,
20 ) -> Result<(), String>
21 where
22 State: crate::P2pStateTrait,
23 Action: crate::P2pActionTrait<State>,
24 {
25 let (action, meta) = action.split();
26 let peer_id = *action.peer_id();
27 let p2p_state = state_context.get_substate_mut()?;
28
29 let channels_state = &mut p2p_state
30 .get_ready_peer_mut(&peer_id)
31 .ok_or_else(|| format!("Invalid state for: {action:?}"))?
32 .channels;
33
34 let next_local_rpc_id = &mut channels_state.next_local_rpc_id;
35 let streaming_rpc_state = &mut channels_state.streaming_rpc;
36
37 match action {
38 P2pChannelsStreamingRpcAction::Init { .. } => {
39 *streaming_rpc_state = Self::Init { time: meta.time() };
40
41 let dispatcher = state_context.into_dispatcher();
42 dispatcher.push(P2pChannelsEffectfulAction::InitChannel {
43 peer_id,
44 id: ChannelId::StreamingRpc,
45 on_success: redux::callback!(
46 on_streaming_rpc_channel_init(peer_id: crate::PeerId) -> crate::P2pAction {
47 P2pChannelsStreamingRpcAction::Pending { peer_id }
48 }
49 ),
50 });
51 Ok(())
52 }
53 P2pChannelsStreamingRpcAction::Pending { .. } => {
54 *streaming_rpc_state = Self::Pending { time: meta.time() };
55 Ok(())
56 }
57 P2pChannelsStreamingRpcAction::Ready { .. } => {
58 *streaming_rpc_state = Self::Ready {
59 time: meta.time(),
60 local: P2pStreamingRpcLocalState::WaitingForRequest { time: meta.time() },
61 remote: P2pStreamingRpcRemoteState::WaitingForRequest { time: meta.time() },
62 remote_last_responded: redux::Timestamp::ZERO,
63 };
64
65 let (dispatcher, state) = state_context.into_dispatcher_and_state();
66 let p2p_state: &P2pState = state.substate()?;
67
68 if let Some(callback) = &p2p_state.callbacks.on_p2p_channels_streaming_rpc_ready {
69 dispatcher.push_callback(callback.clone(), ());
70 }
71 Ok(())
72 }
73 P2pChannelsStreamingRpcAction::RequestSend {
74 id,
75 request,
76 on_init,
77 ..
78 } => {
79 let Self::Ready { local, .. } = streaming_rpc_state else {
80 bug_condition!(
81 "`P2pChannelsStreamingRpcAction::RequestSend` with state {:?}",
82 streaming_rpc_state
83 );
84 return Ok(());
85 };
86
87 *next_local_rpc_id += 1;
88 *local = P2pStreamingRpcLocalState::Requested {
89 time: meta.time(),
90 id,
91 request: request.clone(),
92 progress: match &*request {
93 P2pStreamingRpcRequest::StagedLedgerParts(_) => {
94 Into::into(StagedLedgerPartsReceiveProgress::BasePending {
95 time: meta.time(),
96 })
97 }
98 },
99 };
100
101 let dispatcher = state_context.into_dispatcher();
102 dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
103 peer_id,
104 msg_id: MsgId::first(),
105 msg: StreamingRpcChannelMsg::Request(id, *request.clone()).into(),
106 });
107 if let Some(callback) = on_init {
108 dispatcher.push_callback(callback, (peer_id, id, *request));
109 }
110 Ok(())
111 }
112 P2pChannelsStreamingRpcAction::Timeout { id, .. } => {
113 let (dispatcher, state) = state_context.into_dispatcher_and_state();
114 let p2p_state: &P2pState = state.substate()?;
115
116 if let Some(callback) = &p2p_state.callbacks.on_p2p_channels_streaming_rpc_timeout {
117 dispatcher.push_callback(callback.clone(), (peer_id, id));
118 }
119
120 Ok(())
121 }
122 P2pChannelsStreamingRpcAction::ResponseNextPartGet { id, .. } => {
123 let Self::Ready {
124 local: P2pStreamingRpcLocalState::Requested { progress, .. },
125 ..
126 } = streaming_rpc_state
127 else {
128 bug_condition!("{:?} with state {:?}", action, streaming_rpc_state);
129 return Ok(());
130 };
131
132 if !progress.set_next_pending(meta.time()) {
133 bug_condition!("progress state already pending: {progress:?}");
134 }
135
136 if !progress.is_part_pending() {
137 bug_condition!("progress state is not pending {:?}", progress);
138 }
139
140 let dispatcher = state_context.into_dispatcher();
141 dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
142 peer_id,
143 msg_id: MsgId::first(),
144 msg: ChannelMsg::StreamingRpc(StreamingRpcChannelMsg::Next(id)),
145 });
146 Ok(())
147 }
148 P2pChannelsStreamingRpcAction::ResponsePartReceived { response, id, .. } => {
149 let Self::Ready {
150 local: P2pStreamingRpcLocalState::Requested { progress, .. },
151 ..
152 } = streaming_rpc_state
153 else {
154 bug_condition!(
155 "`P2pChannelsStreamingRpcAction::ResponsePartReceived` with state {:?}",
156 streaming_rpc_state
157 );
158 return Ok(());
159 };
160 if !progress.update(meta.time(), response) {
161 bug_condition!("progress response mismatch! {progress:?}");
162 }
163
164 let (dispatcher, state) = state_context.into_dispatcher_and_state();
165 let state: &P2pState = state.substate()?;
166 let Some(peer) = state.get_ready_peer(&peer_id) else {
167 return Ok(());
168 };
169
170 if let Some(response) = peer.channels.streaming_rpc.local_done_response() {
171 dispatcher.push(P2pChannelsStreamingRpcAction::ResponseReceived {
172 peer_id,
173 id,
174 response: Some(response),
175 });
176 return Ok(());
177 }
178 dispatcher.push(P2pChannelsStreamingRpcAction::ResponseNextPartGet { peer_id, id });
179 Ok(())
180 }
181 P2pChannelsStreamingRpcAction::ResponseReceived {
182 id: rpc_id,
183 response,
184 ..
185 } => {
186 let Self::Ready { local, .. } = streaming_rpc_state else {
187 bug_condition!(
188 "`P2pChannelsStreamingRpcAction::ResponseReceived` with state {:?}",
189 streaming_rpc_state
190 );
191 return Ok(());
192 };
193 let P2pStreamingRpcLocalState::Requested { id, request, .. } = local else {
194 bug_condition!(
195 "`P2pChannelsStreamingRpcAction::ResponseReceived` with state {:?}",
196 streaming_rpc_state
197 );
198 return Ok(());
199 };
200 *local = P2pStreamingRpcLocalState::Responded {
201 time: meta.time(),
202 id: *id,
203 request: std::mem::take(request),
204 };
205
206 let (dispatcher, state) = state_context.into_dispatcher_and_state();
207 let p2p_state: &P2pState = state.substate()?;
208
209 if let Some(callback) = &p2p_state
210 .callbacks
211 .on_p2p_channels_streaming_rpc_response_received
212 {
213 dispatcher.push_callback(callback.clone(), (peer_id, rpc_id, response))
214 }
215
216 Ok(())
217 }
218 P2pChannelsStreamingRpcAction::RequestReceived { id, request, .. } => {
219 let Self::Ready { remote, .. } = streaming_rpc_state else {
220 bug_condition!(
221 "`P2pChannelsStreamingRpcAction::RequestReceived` with state {:?}",
222 streaming_rpc_state
223 );
224 return Ok(());
225 };
226 *remote = P2pStreamingRpcRemoteState::Requested {
227 time: meta.time(),
228 id,
229 request,
230 progress: StagedLedgerPartsSendProgress::LedgerGetIdle { time: meta.time() }
231 .into(),
232 };
233 Ok(())
235 }
236 P2pChannelsStreamingRpcAction::ResponsePending { .. } => {
237 let Self::Ready {
238 remote:
239 P2pStreamingRpcRemoteState::Requested {
240 request, progress, ..
241 },
242 ..
243 } = streaming_rpc_state
244 else {
245 bug_condition!("{:?} with state {:?}", action, streaming_rpc_state);
246 return Ok(());
247 };
248 match &**request {
249 P2pStreamingRpcRequest::StagedLedgerParts(_) => {
250 *progress =
251 StagedLedgerPartsSendProgress::LedgerGetPending { time: meta.time() }
252 .into();
253 }
254 }
255 Ok(())
256 }
257 P2pChannelsStreamingRpcAction::ResponseSendInit { response, id, .. } => {
258 let Self::Ready {
259 remote:
260 P2pStreamingRpcRemoteState::Requested {
261 request, progress, ..
262 },
263 ..
264 } = streaming_rpc_state
265 else {
266 bug_condition!(
267 "`P2pChannelsStreamingRpcAction::ResponseSendInit` with state {:?}",
268 streaming_rpc_state
269 );
270 return Ok(());
271 };
272 match (&**request, &response) {
273 (_, Some(P2pStreamingRpcResponseFull::StagedLedgerParts(data))) => {
274 *progress = StagedLedgerPartsSendProgress::LedgerGetSuccess {
275 time: meta.time(),
276 data: Some(data.clone()),
277 }
278 .into();
279 }
280 (P2pStreamingRpcRequest::StagedLedgerParts(_), None) => {
281 *progress =
282 StagedLedgerPartsSendProgress::Success { time: meta.time() }.into();
283 } }
285
286 let dispatcher = state_context.into_dispatcher();
287 if response.is_none() {
288 let msg = StreamingRpcChannelMsg::Response(id, None).into();
289 dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
290 peer_id,
291 msg_id: MsgId::first(),
292 msg,
293 });
294 dispatcher.push(P2pChannelsStreamingRpcAction::ResponseSent { peer_id, id });
295 return Ok(());
296 }
297
298 dispatcher
299 .push(P2pChannelsStreamingRpcAction::ResponsePartNextSend { peer_id, id });
300 Ok(())
301 }
302 P2pChannelsStreamingRpcAction::ResponsePartNextSend { id, .. } => {
303 let (dispatcher, state) = state_context.into_dispatcher_and_state();
304 let state: &P2pState = state.substate()?;
305
306 let Some(response) = state
307 .get_ready_peer(&peer_id)
308 .and_then(|peer| peer.channels.streaming_rpc.remote_next_msg().map(Box::new))
309 else {
310 return Ok(());
311 };
312
313 dispatcher.push(P2pChannelsStreamingRpcAction::ResponsePartSend {
314 peer_id,
315 id,
316 response,
317 });
318
319 Ok(())
320 }
321 P2pChannelsStreamingRpcAction::ResponsePartSend { id, response, .. } => {
322 let Self::Ready {
323 remote: P2pStreamingRpcRemoteState::Requested { progress, .. },
324 ..
325 } = streaming_rpc_state
326 else {
327 bug_condition!(
328 "`P2pChannelsStreamingRpcAction::ResponsePartSend` with state {:?}",
329 streaming_rpc_state
330 );
331 return Ok(());
332 };
333 match progress {
334 P2pStreamingRpcSendProgress::StagedLedgerParts(progress) => {
335 *progress = match progress {
336 StagedLedgerPartsSendProgress::LedgerGetSuccess {
337 data: Some(data),
338 ..
339 } => StagedLedgerPartsSendProgress::BaseSent {
340 time: meta.time(),
341 data: data.clone(),
342 },
343 StagedLedgerPartsSendProgress::BaseSent { data, .. } => {
344 StagedLedgerPartsSendProgress::ScanStateBaseSent {
345 time: meta.time(),
346 data: data.clone(),
347 }
348 }
349 StagedLedgerPartsSendProgress::ScanStateBaseSent { data, .. } => {
350 StagedLedgerPartsSendProgress::PreviousIncompleteZkappUpdatesSent {
351 time: meta.time(),
352 data: data.clone(),
353 }
354 }
355 StagedLedgerPartsSendProgress::PreviousIncompleteZkappUpdatesSent {
356 data,
357 ..
358 } => StagedLedgerPartsSendProgress::ScanStateTreesSending {
359 time: meta.time(),
360 data: data.clone(),
361 tree_index: 0,
362 },
363 StagedLedgerPartsSendProgress::ScanStateTreesSending {
364 data,
365 tree_index,
366 ..
367 } => StagedLedgerPartsSendProgress::ScanStateTreesSending {
368 time: meta.time(),
369 data: data.clone(),
370 tree_index: *tree_index + 1,
371 },
372 progress => {
373 bug_condition!("unexpected state during `P2pStreamingRpcSendProgress::StagedLedgerParts`: {progress:?}");
374 return Ok(());
375 }
376 };
377
378 if let StagedLedgerPartsSendProgress::ScanStateTreesSending {
379 data,
380 tree_index,
381 ..
382 } = progress
383 {
384 let target_index = data.scan_state.scan_state.trees.1.len();
385 if *tree_index >= target_index {
386 *progress =
387 StagedLedgerPartsSendProgress::Success { time: meta.time() };
388 }
389 }
390 }
391 }
392
393 let dispatcher = state_context.into_dispatcher();
394
395 let msg = StreamingRpcChannelMsg::Response(id, Some(*response)).into();
396 dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
397 peer_id,
398 msg_id: MsgId::first(),
399 msg,
400 });
401 dispatcher.push(P2pChannelsStreamingRpcAction::ResponseSent { peer_id, id });
402 Ok(())
403 }
404 P2pChannelsStreamingRpcAction::ResponseSent { id, .. } => {
405 let (remote, request) = match streaming_rpc_state {
406 Self::Ready { remote, .. } => match remote {
407 P2pStreamingRpcRemoteState::Requested { request, .. } => {
408 let request = std::mem::take(request);
409 (remote, request)
410 }
411 _ => {
412 bug_condition!(
413 "`P2pChannelsStreamingRpcAction::ResponseSent` with state {:?}",
414 streaming_rpc_state
415 );
416 return Ok(());
417 }
418 },
419 _ => {
420 bug_condition!(
421 "`P2pChannelsStreamingRpcAction::ResponseSent` with state {:?}",
422 streaming_rpc_state
423 );
424 return Ok(());
425 }
426 };
427 *remote = P2pStreamingRpcRemoteState::Responded {
428 time: meta.time(),
429 id,
430 request,
431 };
432
433 Ok(())
434 }
435 }
436 }
437}