p2p/network/yamux/
p2p_network_yamux_reducer.rs

1use std::collections::VecDeque;
2
3use openmina_core::{bug_condition, fuzz_maybe, fuzzed_maybe, Substate, SubstateAccess};
4
5use crate::{
6    yamux::p2p_network_yamux_state::{YamuxFrame, YamuxFrameInner},
7    Data, Limit, P2pLimits, P2pNetworkAuthState, P2pNetworkConnectionError,
8    P2pNetworkConnectionMuxState, P2pNetworkNoiseAction, P2pNetworkSchedulerAction,
9    P2pNetworkSchedulerState, P2pNetworkSelectAction, P2pNetworkStreamState, SelectKind,
10};
11
12use super::{
13    p2p_network_yamux_state::{YamuxStreamState, MAX_WINDOW_SIZE},
14    P2pNetworkYamuxAction, P2pNetworkYamuxState, YamuxFlags, YamuxPing,
15};
16
17impl P2pNetworkYamuxState {
18    /// Handles the main reducer logic for Yamux protocol actions. It processes incoming and outgoing
19    /// data, selects appropriate behavior based on frame types, and manages the state of streams
20    /// within a Yamux session.
21    ///
22    /// # High-Level Overview
23    ///
24    /// - When data arrives, it is appended to an internal buffer. The buffer is then parsed for
25    ///   valid Yamux frames (using protocol-specific header fields and logic). Incomplete data
26    ///   remains in the buffer for future parsing.
27    /// - On successful parsing, frames are enqueued for further handling (e.g., dispatching
28    ///   actions to notify higher-level protocols or responding to pings).
29    /// - If protocol inconsistencies or invalid headers are encountered, it marks an error or
30    ///   terminates gracefully, preventing further processing of unexpected data.
31    /// - Outgoing data is prepared as frames that respect the window constraints and established
32    ///   flags (e.g., SYN, ACK, FIN), and they are dispatched for transmission.
33    /// - Once frames are processed, the function checks if the buffer has grown beyond a certain
34    ///   threshold relative to its initial capacity. If so, and if the remaining data is small,
35    ///   it resets the buffer capacity to a default size to avoid excessive memory usage.
36    /// - The function also manages streams and their states, ensuring that proper handshake
37    ///   flags are set (SYN, ACK) when a new stream is opened or accepted, enforcing limits on
38    ///   the number of streams, and notifying higher-level components about events like
39    ///   incoming data or connection errors.
40    pub fn reducer<State, Action>(
41        // Substate is accessed
42        mut state_context: Substate<Action, State, P2pNetworkSchedulerState>,
43        action: redux::ActionWithMeta<P2pNetworkYamuxAction>,
44    ) -> Result<(), String>
45    where
46        State: crate::P2pStateTrait,
47        Action: crate::P2pActionTrait<State>,
48    {
49        let (action, meta) = action.split();
50        let connection_state = state_context
51            .get_substate_mut()?
52            .connection_state_mut(action.addr())
53            .ok_or_else(|| format!("Connection not found for action: {action:?}"))
54            .inspect_err(|e| bug_condition!("{}", e))?;
55
56        let P2pNetworkConnectionMuxState::Yamux(yamux_state) = connection_state
57            .mux
58            .as_mut()
59            .ok_or_else(|| format!("Invalid yamux state for action: {action:?}"))?;
60
61        if yamux_state.terminated.is_some() {
62            return Ok(());
63        }
64
65        match action {
66            P2pNetworkYamuxAction::IncomingData { data, addr } => {
67                yamux_state.extend_buffer(&data);
68                yamux_state.parse_frames();
69
70                let frame_count = yamux_state.incoming_frame_count();
71                let dispatcher = state_context.into_dispatcher();
72
73                for _ in 0..frame_count {
74                    dispatcher.push(P2pNetworkYamuxAction::IncomingFrame { addr })
75                }
76
77                Ok(())
78            }
79            P2pNetworkYamuxAction::OutgoingData {
80                addr,
81                stream_id,
82                data,
83                mut flags,
84            } => {
85                let stream_state = yamux_state
86                    .streams
87                    .get(&stream_id)
88                    .ok_or_else(|| format!("Stream with id {stream_id} not found for `P2pNetworkYamuxAction::OutgoingData`"))?;
89
90                if !stream_state.incoming && !stream_state.established && !stream_state.syn_sent {
91                    flags.insert(YamuxFlags::SYN);
92                } else if stream_state.incoming && !stream_state.established {
93                    flags.insert(YamuxFlags::ACK);
94                }
95
96                fuzz_maybe!(&mut flags, crate::fuzzer::mutate_yamux_flags);
97
98                let frame = YamuxFrame {
99                    flags,
100                    stream_id,
101                    inner: YamuxFrameInner::Data(data),
102                };
103
104                let dispatcher = state_context.into_dispatcher();
105                dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame { addr, frame });
106
107                Ok(())
108            }
109            P2pNetworkYamuxAction::IncomingFrame { addr } => {
110                let mut pending_outgoing = VecDeque::default();
111                let Some(frame) = yamux_state.incoming.pop_front() else {
112                    bug_condition!(
113                        "Frame not found for action `P2pNetworkYamuxAction::IncomingFrame`"
114                    );
115                    return Ok(());
116                };
117
118                if frame.flags.contains(YamuxFlags::SYN) {
119                    yamux_state
120                        .streams
121                        .insert(frame.stream_id, YamuxStreamState::incoming());
122
123                    if frame.stream_id != 0 {
124                        connection_state.streams.insert(
125                            frame.stream_id,
126                            P2pNetworkStreamState::new_incoming(meta.time()),
127                        );
128                    }
129                }
130                if frame.flags.contains(YamuxFlags::ACK) {
131                    yamux_state
132                        .streams
133                        .entry(frame.stream_id)
134                        .or_default()
135                        .established = true;
136                }
137
138                match &frame.inner {
139                    YamuxFrameInner::Data(_) => {
140                        if let Some(stream) = yamux_state.streams.get_mut(&frame.stream_id) {
141                            // must not underflow
142                            // TODO: check it and disconnect peer that violates flow rules
143                            stream.window_ours =
144                                stream.window_ours.saturating_sub(frame.len_as_u32());
145                        }
146                    }
147                    YamuxFrameInner::WindowUpdate { difference } => {
148                        let stream = yamux_state
149                            .streams
150                            .entry(frame.stream_id)
151                            .or_insert_with(YamuxStreamState::incoming);
152
153                        stream.window_theirs = stream.window_theirs.saturating_add(*difference);
154
155                        if *difference > 0 {
156                            // have some fresh space in the window
157                            // try send as many frames as can
158                            let mut window = stream.window_theirs;
159                            while let Some(frame) = stream.pending.pop_front() {
160                                let len = frame.len_as_u32();
161                                pending_outgoing.push_back(frame);
162                                if let Some(new_window) = window.checked_sub(len) {
163                                    window = new_window;
164                                } else {
165                                    break;
166                                }
167                            }
168                        }
169                    }
170                    YamuxFrameInner::Ping { .. } => {}
171                    YamuxFrameInner::GoAway(res) => yamux_state.set_res(*res),
172                }
173
174                let (dispatcher, state) = state_context.into_dispatcher_and_state();
175                let limits: &P2pLimits = state.substate()?;
176                let max_streams = limits.max_streams();
177                let connection_state =
178                    <State as SubstateAccess<P2pNetworkSchedulerState>>::substate(state)?
179                        .connection_state(&addr)
180                        .ok_or_else(|| format!("Connection not found {}", addr))?;
181
182                let stream = connection_state
183                    .yamux_state()
184                    .and_then(|yamux_state| yamux_state.streams.get(&frame.stream_id))
185                    .ok_or_else(|| format!("Stream with id {} not found for `P2pNetworkYamuxAction::IncomingFrame`", frame.stream_id))?;
186
187                let peer_id = match connection_state
188                    .auth
189                    .as_ref()
190                    .and_then(|P2pNetworkAuthState::Noise(noise)| noise.peer_id())
191                {
192                    Some(peer_id) => *peer_id,
193                    None => return Ok(()),
194                };
195
196                if frame.flags.contains(YamuxFlags::RST) {
197                    dispatcher.push(P2pNetworkSchedulerAction::Error {
198                        addr,
199                        error: P2pNetworkConnectionError::StreamReset(frame.stream_id),
200                    });
201                    return Ok(());
202                }
203
204                if frame.flags.contains(YamuxFlags::SYN) && frame.stream_id != 0 {
205                    // count incoming streams
206                    let incoming_streams_number = connection_state
207                        .streams
208                        .values()
209                        .filter(|s| s.select.is_incoming())
210                        .count();
211
212                    match (max_streams, incoming_streams_number) {
213                        (Limit::Some(limit), actual) if actual > limit => {
214                            dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame {
215                                addr,
216                                frame: YamuxFrame {
217                                    flags: YamuxFlags::FIN,
218                                    stream_id: frame.stream_id,
219                                    inner: YamuxFrameInner::Data(vec![].into()),
220                                },
221                            });
222                        }
223                        _ => {
224                            dispatcher.push(P2pNetworkSelectAction::Init {
225                                addr,
226                                kind: SelectKind::Stream(peer_id, frame.stream_id),
227                                incoming: true,
228                            });
229                        }
230                    }
231                }
232                match &frame.inner {
233                    YamuxFrameInner::Data(data) => {
234                        // when our window size is less than half of the max window size send window update
235                        if stream.window_ours < stream.max_window_size / 2 {
236                            let difference =
237                                stream.max_window_size.saturating_mul(2).min(1024 * 1024);
238
239                            dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame {
240                                addr,
241                                frame: YamuxFrame {
242                                    stream_id: frame.stream_id,
243                                    flags: YamuxFlags::empty(),
244                                    inner: YamuxFrameInner::WindowUpdate { difference },
245                                },
246                            });
247                        }
248
249                        dispatcher.push(P2pNetworkSelectAction::IncomingData {
250                            addr,
251                            peer_id,
252                            stream_id: frame.stream_id,
253                            data: data.clone(),
254                            fin: frame.flags.contains(YamuxFlags::FIN),
255                        });
256                    }
257                    YamuxFrameInner::Ping { opaque } => {
258                        let response = frame.flags.contains(YamuxFlags::ACK);
259                        // if this ping is not response create our response
260                        if !response {
261                            let ping = YamuxPing {
262                                stream_id: frame.stream_id,
263                                opaque: *opaque,
264                                response: true,
265                            };
266                            dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame {
267                                addr,
268                                frame: ping.into_frame(),
269                            });
270                        }
271                    }
272                    YamuxFrameInner::WindowUpdate { .. } => {
273                        while let Some(frame) = pending_outgoing.pop_front() {
274                            dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame { addr, frame });
275                        }
276                    }
277                    _ => {}
278                }
279
280                Ok(())
281            }
282            P2pNetworkYamuxAction::OutgoingFrame { mut frame, addr } => {
283                let stream_id = frame.stream_id;
284                let Some(stream) = yamux_state.streams.get_mut(&stream_id) else {
285                    return Ok(());
286                };
287                match &mut frame.inner {
288                    YamuxFrameInner::Data(_) => {
289                        if let Some(new_window) =
290                            stream.window_theirs.checked_sub(frame.len_as_u32())
291                        {
292                            // their window is big enough, decrease the size
293                            // and send the whole frame
294                            stream.window_theirs = new_window;
295                        } else {
296                            // their window is not big enough
297                            // split the frame to send as much as you can and put the rest in the queue
298                            if let Some(remaining) = frame.split_at(stream.window_theirs as usize) {
299                                stream.pending.push_front(remaining);
300                            }
301
302                            // the window will be zero after sending
303                            stream.window_theirs = 0;
304
305                            // if size of pending that is above the limit, ignore the peer
306                            if stream.pending.iter().map(YamuxFrame::len).sum::<usize>()
307                                > yamux_state.pending_outgoing_limit
308                            {
309                                let dispatcher = state_context.into_dispatcher();
310                                let error = P2pNetworkConnectionError::YamuxOverflow(stream_id);
311                                dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error });
312                                return Ok(());
313                            }
314                        }
315                    }
316                    YamuxFrameInner::WindowUpdate { difference } => {
317                        stream.window_ours = stream.window_ours.saturating_add(*difference);
318                        if stream.window_ours > stream.max_window_size {
319                            stream.max_window_size = stream.window_ours.min(MAX_WINDOW_SIZE);
320                        }
321                    }
322                    _ => {}
323                }
324
325                if frame.flags.contains(YamuxFlags::FIN) {
326                    connection_state.streams.remove(&frame.stream_id);
327                    stream.writable = false;
328                } else {
329                    if frame.flags.contains(YamuxFlags::ACK) {
330                        stream.established |= true;
331                    }
332                    if frame.flags.contains(YamuxFlags::SYN) {
333                        stream.syn_sent = true;
334                    }
335                }
336
337                let dispatcher = state_context.into_dispatcher();
338                let data = fuzzed_maybe!(
339                    Data::from(frame.into_bytes()),
340                    crate::fuzzer::mutate_yamux_frame
341                );
342                dispatcher.push(P2pNetworkNoiseAction::OutgoingData { addr, data });
343                Ok(())
344            }
345            P2pNetworkYamuxAction::PingStream { addr, ping } => {
346                let dispatcher = state_context.into_dispatcher();
347                dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame {
348                    addr,
349                    frame: ping.into_frame(),
350                });
351
352                Ok(())
353            }
354            P2pNetworkYamuxAction::OpenStream {
355                stream_id,
356                stream_kind,
357                addr,
358            } => {
359                yamux_state
360                    .streams
361                    .insert(stream_id, YamuxStreamState::default());
362                connection_state.streams.insert(
363                    stream_id,
364                    P2pNetworkStreamState::new(stream_kind, meta.time()),
365                );
366
367                let peer_id = match connection_state
368                    .auth
369                    .as_ref()
370                    .and_then(|P2pNetworkAuthState::Noise(noise)| noise.peer_id())
371                {
372                    Some(peer_id) => *peer_id,
373                    None => return Ok(()),
374                };
375
376                let dispatcher = state_context.into_dispatcher();
377                dispatcher.push(P2pNetworkSelectAction::Init {
378                    addr,
379                    kind: SelectKind::Stream(peer_id, stream_id),
380                    incoming: false,
381                });
382                Ok(())
383            }
384        }
385    }
386}