p2p/network/kad/stream/
p2p_network_kad_stream_reducer.rs

1use openmina_core::{bug_condition, fuzzed_maybe, warn, Substate, SubstateAccess};
2use quick_protobuf::{serialize_into_vec, BytesReader};
3use redux::ActionWithMeta;
4
5use crate::{
6    stream::P2pNetworkKadOutgoingStreamError, Data, P2pLimits, P2pNetworkConnectionError,
7    P2pNetworkKadRequestAction, P2pNetworkKadState, P2pNetworkKademliaRpcReply,
8    P2pNetworkKademliaRpcRequest, P2pNetworkSchedulerAction, P2pNetworkStreamProtobufError,
9    P2pNetworkYamuxAction, P2pState, YamuxFlags,
10};
11
12use super::{
13    super::Message, P2pNetworkKadIncomingStreamError, P2pNetworkKadIncomingStreamState,
14    P2pNetworkKadOutgoingStreamState, P2pNetworkKadStreamState, P2pNetworkKademliaStreamAction,
15    P2pNetworkKademliaStreamWaitOutgoingCallback,
16};
17
18impl P2pNetworkKadIncomingStreamState {
19    pub fn reducer<State, Action>(
20        mut state_context: Substate<Action, State, P2pNetworkKadState>,
21        action: ActionWithMeta<P2pNetworkKademliaStreamAction>,
22        limits: &P2pLimits,
23    ) -> Result<(), String>
24    where
25        State: SubstateAccess<P2pNetworkKadState> + SubstateAccess<P2pState>,
26        Action: crate::P2pActionTrait<State>,
27    {
28        let (action, meta) = action.split();
29        let Some(P2pNetworkKadStreamState::Incoming(state)) = state_context
30            .get_substate_mut()?
31            .find_kad_stream_state_mut(action.peer_id(), action.stream_id())
32        else {
33            return Err("invalid stream".to_owned());
34        };
35
36        match (&state, action) {
37            (
38                P2pNetworkKadIncomingStreamState::Default,
39                P2pNetworkKademliaStreamAction::New { incoming, .. },
40            ) if incoming => {
41                *state = P2pNetworkKadIncomingStreamState::WaitingForRequest {
42                    expect_close: false,
43                };
44                Ok(())
45            }
46            (
47                P2pNetworkKadIncomingStreamState::WaitingForRequest { .. },
48                P2pNetworkKademliaStreamAction::IncomingData {
49                    data,
50                    addr,
51                    peer_id,
52                    stream_id,
53                },
54            ) => {
55                let data = &data.0;
56                let mut reader = BytesReader::from_bytes(data);
57
58                match reader.read_varint32(data).map(|v| v as usize) {
59                    Ok(encoded_len) if encoded_len > limits.kademlia_request() => {
60                        *state = P2pNetworkKadIncomingStreamState::Error(
61                            P2pNetworkStreamProtobufError::Limit(
62                                encoded_len,
63                                limits.kademlia_request(),
64                            ),
65                        );
66                    }
67                    Ok(encoded_len) => {
68                        let remaining_len = reader.len();
69                        if let Some(remaining_data) = data.get(data.len() - remaining_len..) {
70                            if encoded_len > remaining_len {
71                                *state = P2pNetworkKadIncomingStreamState::PartialRequestReceived {
72                                    len: encoded_len,
73                                    data: remaining_data.to_vec(),
74                                };
75                                return Ok(());
76                            }
77
78                            state.handle_incoming_request(encoded_len, remaining_data)?;
79                        } else {
80                            *state = P2pNetworkKadIncomingStreamState::Error(
81                                P2pNetworkStreamProtobufError::Message("out of bounds".to_owned()),
82                            );
83                        }
84                    }
85                    Err(_) => {
86                        *state = P2pNetworkKadIncomingStreamState::Error(
87                            P2pNetworkStreamProtobufError::MessageLength,
88                        );
89                    }
90                };
91
92                let state = state.clone();
93                let dispatcher = state_context.into_dispatcher();
94
95                match state {
96                    P2pNetworkKadIncomingStreamState::RequestIsReady {
97                        data: P2pNetworkKademliaRpcRequest::FindNode { key },
98                    } => {
99                        dispatcher.push(P2pNetworkKademliaStreamAction::WaitOutgoing {
100                            addr,
101                            peer_id,
102                            stream_id,
103                            callback: P2pNetworkKademliaStreamWaitOutgoingCallback::answer_find_node_request(key)
104                        });
105                    }
106                    P2pNetworkKadIncomingStreamState::Error(error) => {
107                        dispatcher.push(P2pNetworkSchedulerAction::Error {
108                            addr,
109                            error: P2pNetworkConnectionError::from(
110                                P2pNetworkKadIncomingStreamError::from(error),
111                            ),
112                        });
113                    }
114                    _ => bug_condition!("Invalid state"),
115                }
116
117                Ok(())
118            }
119            (
120                P2pNetworkKadIncomingStreamState::PartialRequestReceived { len, data },
121                P2pNetworkKademliaStreamAction::IncomingData {
122                    data: new_data,
123                    addr,
124                    peer_id,
125                    stream_id,
126                },
127            ) => {
128                let mut data = data.clone();
129                data.extend_from_slice(&new_data.0);
130
131                if *len > data.len() {
132                    *state = P2pNetworkKadIncomingStreamState::PartialRequestReceived {
133                        len: *len,
134                        data,
135                    };
136                    return Ok(());
137                }
138
139                state.handle_incoming_request(*len, &data)?;
140                let state = state.clone();
141                let dispatcher = state_context.into_dispatcher();
142
143                match state {
144                    P2pNetworkKadIncomingStreamState::RequestIsReady {
145                        data: P2pNetworkKademliaRpcRequest::FindNode { key },
146                    } => {
147                        dispatcher.push(P2pNetworkKademliaStreamAction::WaitOutgoing {
148                            addr,
149                            peer_id,
150                            stream_id,
151                            callback: P2pNetworkKademliaStreamWaitOutgoingCallback::answer_find_node_request(key)
152                        });
153                    }
154                    P2pNetworkKadIncomingStreamState::Error(error) => {
155                        warn!(meta.time(); summary = "error handling kademlia action", error = display(&error));
156                        dispatcher.push(P2pNetworkSchedulerAction::Error {
157                            addr,
158                            error: P2pNetworkConnectionError::from(
159                                P2pNetworkKadIncomingStreamError::from(error),
160                            ),
161                        });
162                    }
163                    _ => bug_condition!("Invalid state"),
164                }
165
166                Ok(())
167            }
168            (
169                P2pNetworkKadIncomingStreamState::RequestIsReady { .. },
170                P2pNetworkKademliaStreamAction::WaitOutgoing {
171                    addr,
172                    peer_id,
173                    stream_id,
174                    callback,
175                },
176            ) => {
177                *state = P2pNetworkKadIncomingStreamState::WaitingForReply;
178                let dispatcher = state_context.into_dispatcher();
179                match callback {
180                    P2pNetworkKademliaStreamWaitOutgoingCallback::AnswerFindNodeRequest {
181                        callback,
182                        args,
183                    } => dispatcher.push_callback(callback, (addr, peer_id, stream_id, args)),
184                    P2pNetworkKademliaStreamWaitOutgoingCallback::UpdateFindNodeRequest {
185                        callback,
186                        args,
187                    } => dispatcher.push_callback(callback, (addr, peer_id, stream_id, args)),
188                };
189                Ok(())
190            }
191            (
192                P2pNetworkKadIncomingStreamState::WaitingForReply,
193                P2pNetworkKademliaStreamAction::SendResponse {
194                    data,
195                    peer_id,
196                    addr,
197                    stream_id,
198                },
199            ) => {
200                let message = Message::try_from(&data).map_err(|e| e.to_string())?;
201                let bytes = serialize_into_vec(&message).map_err(|e| format!("{e}"))?;
202                *state = P2pNetworkKadIncomingStreamState::ResponseBytesAreReady {
203                    bytes: bytes.clone(),
204                };
205
206                let dispatcher = state_context.into_dispatcher();
207                let data = fuzzed_maybe!(Data::from(bytes), crate::fuzzer::mutate_kad_data);
208                let flags = fuzzed_maybe!(Default::default(), crate::fuzzer::mutate_yamux_flags);
209
210                dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
211                    addr,
212                    stream_id,
213                    data,
214                    flags,
215                });
216                dispatcher.push(P2pNetworkKademliaStreamAction::WaitIncoming {
217                    addr,
218                    peer_id,
219                    stream_id,
220                });
221                Ok(())
222            }
223            (
224                P2pNetworkKadIncomingStreamState::ResponseBytesAreReady { .. },
225                P2pNetworkKademliaStreamAction::WaitIncoming { .. },
226            ) => {
227                *state = P2pNetworkKadIncomingStreamState::WaitingForRequest { expect_close: true };
228                Ok(())
229            }
230            (
231                P2pNetworkKadIncomingStreamState::WaitingForRequest { expect_close, .. },
232                P2pNetworkKademliaStreamAction::RemoteClose {
233                    addr,
234                    peer_id,
235                    stream_id,
236                },
237            ) if *expect_close => {
238                *state = P2pNetworkKadIncomingStreamState::Closing;
239
240                let dispatcher = state_context.into_dispatcher();
241                dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
242                    addr,
243                    stream_id,
244                    data: Data::empty(),
245                    flags: YamuxFlags::FIN,
246                });
247                dispatcher.push(P2pNetworkKademliaStreamAction::Prune {
248                    addr,
249                    peer_id,
250                    stream_id,
251                });
252                Ok(())
253            }
254            action => Err(format!(
255                "kademlia incoming stream state {state:?} is incorrect for action {action:?}",
256            )),
257        }
258    }
259
260    fn handle_incoming_request(&mut self, len: usize, data: &[u8]) -> Result<(), String> {
261        let mut reader = BytesReader::from_bytes(data);
262
263        let message = match reader.read_message_by_len::<Message>(data, len) {
264            Ok(v) => v,
265            Err(e) => {
266                *self = P2pNetworkKadIncomingStreamState::Error(
267                    P2pNetworkStreamProtobufError::Message(e.to_string()),
268                );
269                return Ok(());
270            }
271        };
272
273        let data = match P2pNetworkKademliaRpcRequest::try_from(message) {
274            Ok(v) => v,
275            Err(e) => {
276                *self = P2pNetworkKadIncomingStreamState::Error(e.into());
277                return Ok(());
278            }
279        };
280
281        *self = P2pNetworkKadIncomingStreamState::RequestIsReady { data };
282        Ok(())
283    }
284}
285
286impl P2pNetworkKadOutgoingStreamState {
287    pub fn reducer<State, Action>(
288        mut state_context: Substate<Action, State, P2pNetworkKadState>,
289        action: ActionWithMeta<P2pNetworkKademliaStreamAction>,
290        limits: &P2pLimits,
291    ) -> Result<(), String>
292    where
293        State: SubstateAccess<P2pNetworkKadState> + SubstateAccess<P2pState>,
294        Action: crate::P2pActionTrait<State>,
295    {
296        let (action, meta) = action.split();
297        let Some(P2pNetworkKadStreamState::Outgoing(state)) = state_context
298            .get_substate_mut()?
299            .find_kad_stream_state_mut(action.peer_id(), action.stream_id())
300        else {
301            bug_condition!("Stream not found");
302            return Ok(());
303        };
304
305        match (&state, action) {
306            (
307                P2pNetworkKadOutgoingStreamState::Default,
308                P2pNetworkKademliaStreamAction::New { incoming, .. },
309            ) if !incoming => {
310                *state = P2pNetworkKadOutgoingStreamState::WaitingForRequest {
311                    expect_close: false,
312                };
313                Ok(())
314            }
315
316            (
317                P2pNetworkKadOutgoingStreamState::WaitingForRequest { .. },
318                P2pNetworkKademliaStreamAction::SendRequest {
319                    data,
320                    addr,
321                    stream_id,
322                    peer_id,
323                },
324            ) => {
325                let message = Message::from(&data);
326                let bytes = serialize_into_vec(&message).map_err(|e| format!("{e}"))?;
327                *state = P2pNetworkKadOutgoingStreamState::RequestBytesAreReady {
328                    bytes: bytes.clone(),
329                };
330
331                let dispatcher = state_context.into_dispatcher();
332                let data = fuzzed_maybe!(Data::from(bytes), crate::fuzzer::mutate_kad_data);
333                let flags = fuzzed_maybe!(Default::default(), crate::fuzzer::mutate_yamux_flags);
334
335                dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
336                    addr,
337                    stream_id,
338                    data,
339                    flags,
340                });
341                dispatcher.push(P2pNetworkKademliaStreamAction::WaitIncoming {
342                    addr,
343                    peer_id,
344                    stream_id,
345                });
346                dispatcher.push(P2pNetworkKadRequestAction::RequestSent { peer_id });
347                Ok(())
348            }
349            (
350                P2pNetworkKadOutgoingStreamState::RequestBytesAreReady { .. },
351                P2pNetworkKademliaStreamAction::WaitIncoming { .. },
352            ) => {
353                *state = P2pNetworkKadOutgoingStreamState::WaitingForReply;
354                Ok(())
355            }
356
357            (
358                P2pNetworkKadOutgoingStreamState::WaitingForReply { .. },
359                P2pNetworkKademliaStreamAction::IncomingData {
360                    data,
361                    peer_id,
362                    stream_id,
363                    addr,
364                },
365            ) => {
366                let data = &data.0;
367
368                let mut reader = BytesReader::from_bytes(data);
369
370                match reader.read_varint32(data).map(|v| v as usize) {
371                    Ok(encoded_len) if encoded_len > limits.kademlia_response() => {
372                        *state = P2pNetworkKadOutgoingStreamState::Error(
373                            P2pNetworkStreamProtobufError::Limit(
374                                encoded_len,
375                                limits.kademlia_response(),
376                            ),
377                        );
378                    }
379                    Ok(encoded_len) => {
380                        let remaining_len = reader.len();
381                        if let Some(remaining_data) = data.get(data.len() - remaining_len..) {
382                            if encoded_len > remaining_len {
383                                *state = P2pNetworkKadOutgoingStreamState::PartialReplyReceived {
384                                    len: encoded_len,
385                                    data: remaining_data.to_vec(),
386                                };
387                                return Ok(());
388                            }
389
390                            state.handle_incoming_response(encoded_len, remaining_data)?;
391                        } else {
392                            *state = P2pNetworkKadOutgoingStreamState::Error(
393                                P2pNetworkStreamProtobufError::Message("out of bounds".to_owned()),
394                            );
395                        }
396                    }
397                    Err(_) => {
398                        *state = P2pNetworkKadOutgoingStreamState::Error(
399                            P2pNetworkStreamProtobufError::MessageLength,
400                        );
401                    }
402                };
403
404                let state = state.clone();
405                let dispatcher = state_context.into_dispatcher();
406
407                match state {
408                    P2pNetworkKadOutgoingStreamState::ResponseIsReady {
409                        data:
410                            P2pNetworkKademliaRpcReply::FindNode {
411                                closer_peers: closest_peers,
412                            },
413                    } => {
414                        dispatcher.push(P2pNetworkKademliaStreamAction::WaitOutgoing {
415                            addr,
416                            peer_id,
417                            stream_id,
418                            callback: P2pNetworkKademliaStreamWaitOutgoingCallback::update_find_node_request(closest_peers)
419                        });
420                    }
421                    P2pNetworkKadOutgoingStreamState::Error(error) => {
422                        warn!(meta.time(); summary = "error handling kademlia action", error = display(&error));
423                        dispatcher.push(P2pNetworkSchedulerAction::Error {
424                            addr,
425                            error: P2pNetworkConnectionError::from(
426                                P2pNetworkKadOutgoingStreamError::from(error),
427                            ),
428                        });
429                    }
430                    _ => {
431                        bug_condition!("Invalid state");
432                    }
433                }
434
435                Ok(())
436            }
437            (
438                P2pNetworkKadOutgoingStreamState::PartialReplyReceived { len, data },
439                P2pNetworkKademliaStreamAction::IncomingData {
440                    data: new_data,
441                    stream_id,
442                    peer_id,
443                    addr,
444                },
445            ) => {
446                let mut data = data.clone();
447                data.extend_from_slice(&new_data);
448
449                if *len > data.len() {
450                    *state =
451                        P2pNetworkKadOutgoingStreamState::PartialReplyReceived { len: *len, data };
452                    return Ok(());
453                }
454
455                state.handle_incoming_response(*len, &data)?;
456                let state = state.clone();
457                let dispatcher = state_context.into_dispatcher();
458
459                match state {
460                    P2pNetworkKadOutgoingStreamState::ResponseIsReady {
461                        data:
462                            P2pNetworkKademliaRpcReply::FindNode {
463                                closer_peers: closest_peers,
464                            },
465                    } => {
466                        dispatcher.push(P2pNetworkKademliaStreamAction::WaitOutgoing {
467                            addr,
468                            peer_id,
469                            stream_id,
470                            callback: P2pNetworkKademliaStreamWaitOutgoingCallback::update_find_node_request(closest_peers)
471                        });
472                    }
473                    P2pNetworkKadOutgoingStreamState::Error(error) => {
474                        warn!(meta.time(); summary = "error handling kademlia action", error = display(&error));
475                        dispatcher.push(P2pNetworkSchedulerAction::Error {
476                            addr,
477                            error: P2pNetworkConnectionError::from(
478                                P2pNetworkKadOutgoingStreamError::from(error),
479                            ),
480                        });
481                    }
482                    _ => bug_condition!("Invalid state"),
483                }
484
485                Ok(())
486            }
487            (
488                P2pNetworkKadOutgoingStreamState::ResponseIsReady { .. },
489                P2pNetworkKademliaStreamAction::WaitOutgoing {
490                    addr,
491                    peer_id,
492                    stream_id,
493                    callback,
494                },
495            ) => {
496                *state = P2pNetworkKadOutgoingStreamState::WaitingForRequest { expect_close: true };
497                let dispatcher = state_context.into_dispatcher();
498                match callback {
499                    P2pNetworkKademliaStreamWaitOutgoingCallback::AnswerFindNodeRequest {
500                        callback,
501                        args,
502                    } => dispatcher.push_callback(callback, (addr, peer_id, stream_id, args)),
503                    P2pNetworkKademliaStreamWaitOutgoingCallback::UpdateFindNodeRequest {
504                        callback,
505                        args,
506                    } => dispatcher.push_callback(callback, (addr, peer_id, stream_id, args)),
507                };
508                Ok(())
509            }
510            (
511                P2pNetworkKadOutgoingStreamState::WaitingForRequest { expect_close },
512                P2pNetworkKademliaStreamAction::Close {
513                    addr,
514                    peer_id,
515                    stream_id,
516                },
517            ) if *expect_close => {
518                *state =
519                    P2pNetworkKadOutgoingStreamState::RequestBytesAreReady { bytes: Vec::new() };
520
521                let dispatcher = state_context.into_dispatcher();
522                dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
523                    addr,
524                    stream_id,
525                    data: Data::empty(),
526                    flags: YamuxFlags::FIN,
527                });
528                dispatcher.push(P2pNetworkKademliaStreamAction::Prune {
529                    addr,
530                    peer_id,
531                    stream_id,
532                });
533                Ok(())
534            }
535            (
536                P2pNetworkKadOutgoingStreamState::Closing,
537                P2pNetworkKademliaStreamAction::RemoteClose {
538                    addr,
539                    peer_id,
540                    stream_id,
541                },
542            ) => {
543                *state = P2pNetworkKadOutgoingStreamState::Closed;
544                let dispatcher = state_context.into_dispatcher();
545                dispatcher.push(P2pNetworkKademliaStreamAction::Prune {
546                    addr,
547                    peer_id,
548                    stream_id,
549                });
550                Ok(())
551            }
552            (state, action) => Err(format!(
553                "kademlia outgoing stream state {state:?} is incorrect for action {action:?}",
554            )),
555        }
556    }
557
558    fn handle_incoming_response(&mut self, len: usize, data: &[u8]) -> Result<(), String> {
559        let mut reader = BytesReader::from_bytes(data);
560
561        let message = match reader.read_message_by_len::<Message>(data, len) {
562            Ok(v) => v,
563            Err(e) => {
564                *self = P2pNetworkKadOutgoingStreamState::Error(
565                    P2pNetworkStreamProtobufError::Message(e.to_string()),
566                );
567                return Ok(());
568            }
569        };
570
571        let data = match P2pNetworkKademliaRpcReply::try_from(message) {
572            Ok(v) => v,
573            Err(e) => {
574                *self = P2pNetworkKadOutgoingStreamState::Error(e.into());
575                return Ok(());
576            }
577        };
578
579        *self = P2pNetworkKadOutgoingStreamState::ResponseIsReady { data };
580        Ok(())
581    }
582}
583
584impl P2pNetworkKadStreamState {
585    pub fn reducer<State, Action>(
586        mut state_context: Substate<Action, State, P2pNetworkKadState>,
587        action: ActionWithMeta<P2pNetworkKademliaStreamAction>,
588        limits: &P2pLimits,
589    ) -> Result<(), String>
590    where
591        State: SubstateAccess<P2pNetworkKadState> + SubstateAccess<P2pState>,
592        Action: crate::P2pActionTrait<State>,
593    {
594        let state = state_context.get_substate_mut()?;
595        let (action, meta) = action.split();
596        let stream_state = match &action {
597            P2pNetworkKademliaStreamAction::New {
598                peer_id,
599                stream_id,
600                incoming,
601                ..
602            } => state
603                .create_kad_stream_state(*incoming, peer_id, stream_id)
604                .map_err(|stream| {
605                    format!("kademlia stream already exists for action {action:?}: {stream:?}")
606                })?,
607            P2pNetworkKademliaStreamAction::Prune {
608                peer_id, stream_id, ..
609            } => {
610                return state
611                    .remove_kad_stream_state(peer_id, stream_id)
612                    .then_some(())
613                    .ok_or_else(|| format!("kademlia stream not found for action {action:?}"))
614            }
615            _ => state
616                .find_kad_stream_state(action.peer_id(), action.stream_id())
617                .ok_or_else(|| format!("kademlia stream not found for action {action:?}"))?,
618        };
619
620        match stream_state {
621            P2pNetworkKadStreamState::Incoming(_) => P2pNetworkKadIncomingStreamState::reducer(
622                state_context,
623                meta.with_action(action),
624                limits,
625            ),
626            P2pNetworkKadStreamState::Outgoing(_) => P2pNetworkKadOutgoingStreamState::reducer(
627                state_context,
628                meta.with_action(action),
629                limits,
630            ),
631        }
632    }
633}