1use std::collections::VecDeque;
2
3use openmina_core::{bug_condition, fuzz_maybe, fuzzed_maybe, Substate, SubstateAccess};
4
5use crate::{
6 yamux::p2p_network_yamux_state::{YamuxFrame, YamuxFrameInner},
7 Data, Limit, P2pLimits, P2pNetworkAuthState, P2pNetworkConnectionError,
8 P2pNetworkConnectionMuxState, P2pNetworkNoiseAction, P2pNetworkSchedulerAction,
9 P2pNetworkSchedulerState, P2pNetworkSelectAction, P2pNetworkStreamState, SelectKind,
10};
11
12use super::{
13 p2p_network_yamux_state::{YamuxStreamState, MAX_WINDOW_SIZE},
14 P2pNetworkYamuxAction, P2pNetworkYamuxState, YamuxFlags, YamuxPing,
15};
16
17impl P2pNetworkYamuxState {
18 pub fn reducer<State, Action>(
41 mut state_context: Substate<Action, State, P2pNetworkSchedulerState>,
43 action: redux::ActionWithMeta<P2pNetworkYamuxAction>,
44 ) -> Result<(), String>
45 where
46 State: crate::P2pStateTrait,
47 Action: crate::P2pActionTrait<State>,
48 {
49 let (action, meta) = action.split();
50 let connection_state = state_context
51 .get_substate_mut()?
52 .connection_state_mut(action.addr())
53 .ok_or_else(|| format!("Connection not found for action: {action:?}"))
54 .inspect_err(|e| bug_condition!("{}", e))?;
55
56 let P2pNetworkConnectionMuxState::Yamux(yamux_state) = connection_state
57 .mux
58 .as_mut()
59 .ok_or_else(|| format!("Invalid yamux state for action: {action:?}"))?;
60
61 if yamux_state.terminated.is_some() {
62 return Ok(());
63 }
64
65 match action {
66 P2pNetworkYamuxAction::IncomingData { data, addr } => {
67 yamux_state.extend_buffer(&data);
68 yamux_state.parse_frames();
69
70 let frame_count = yamux_state.incoming_frame_count();
71 let dispatcher = state_context.into_dispatcher();
72
73 for _ in 0..frame_count {
74 dispatcher.push(P2pNetworkYamuxAction::IncomingFrame { addr })
75 }
76
77 Ok(())
78 }
79 P2pNetworkYamuxAction::OutgoingData {
80 addr,
81 stream_id,
82 data,
83 mut flags,
84 } => {
85 let stream_state = yamux_state
86 .streams
87 .get(&stream_id)
88 .ok_or_else(|| format!("Stream with id {stream_id} not found for `P2pNetworkYamuxAction::OutgoingData`"))?;
89
90 if !stream_state.incoming && !stream_state.established && !stream_state.syn_sent {
91 flags.insert(YamuxFlags::SYN);
92 } else if stream_state.incoming && !stream_state.established {
93 flags.insert(YamuxFlags::ACK);
94 }
95
96 fuzz_maybe!(&mut flags, crate::fuzzer::mutate_yamux_flags);
97
98 let frame = YamuxFrame {
99 flags,
100 stream_id,
101 inner: YamuxFrameInner::Data(data),
102 };
103
104 let dispatcher = state_context.into_dispatcher();
105 dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame { addr, frame });
106
107 Ok(())
108 }
109 P2pNetworkYamuxAction::IncomingFrame { addr } => {
110 let mut pending_outgoing = VecDeque::default();
111 let Some(frame) = yamux_state.incoming.pop_front() else {
112 bug_condition!(
113 "Frame not found for action `P2pNetworkYamuxAction::IncomingFrame`"
114 );
115 return Ok(());
116 };
117
118 if frame.flags.contains(YamuxFlags::SYN) {
119 yamux_state
120 .streams
121 .insert(frame.stream_id, YamuxStreamState::incoming());
122
123 if frame.stream_id != 0 {
124 connection_state.streams.insert(
125 frame.stream_id,
126 P2pNetworkStreamState::new_incoming(meta.time()),
127 );
128 }
129 }
130 if frame.flags.contains(YamuxFlags::ACK) {
131 yamux_state
132 .streams
133 .entry(frame.stream_id)
134 .or_default()
135 .established = true;
136 }
137
138 match &frame.inner {
139 YamuxFrameInner::Data(_) => {
140 if let Some(stream) = yamux_state.streams.get_mut(&frame.stream_id) {
141 stream.window_ours =
144 stream.window_ours.saturating_sub(frame.len_as_u32());
145 }
146 }
147 YamuxFrameInner::WindowUpdate { difference } => {
148 let stream = yamux_state
149 .streams
150 .entry(frame.stream_id)
151 .or_insert_with(YamuxStreamState::incoming);
152
153 stream.window_theirs = stream.window_theirs.saturating_add(*difference);
154
155 if *difference > 0 {
156 let mut window = stream.window_theirs;
159 while let Some(frame) = stream.pending.pop_front() {
160 let len = frame.len_as_u32();
161 pending_outgoing.push_back(frame);
162 if let Some(new_window) = window.checked_sub(len) {
163 window = new_window;
164 } else {
165 break;
166 }
167 }
168 }
169 }
170 YamuxFrameInner::Ping { .. } => {}
171 YamuxFrameInner::GoAway(res) => yamux_state.set_res(*res),
172 }
173
174 let (dispatcher, state) = state_context.into_dispatcher_and_state();
175 let limits: &P2pLimits = state.substate()?;
176 let max_streams = limits.max_streams();
177 let connection_state =
178 <State as SubstateAccess<P2pNetworkSchedulerState>>::substate(state)?
179 .connection_state(&addr)
180 .ok_or_else(|| format!("Connection not found {}", addr))?;
181
182 let stream = connection_state
183 .yamux_state()
184 .and_then(|yamux_state| yamux_state.streams.get(&frame.stream_id))
185 .ok_or_else(|| format!("Stream with id {} not found for `P2pNetworkYamuxAction::IncomingFrame`", frame.stream_id))?;
186
187 let peer_id = match connection_state
188 .auth
189 .as_ref()
190 .and_then(|P2pNetworkAuthState::Noise(noise)| noise.peer_id())
191 {
192 Some(peer_id) => *peer_id,
193 None => return Ok(()),
194 };
195
196 if frame.flags.contains(YamuxFlags::RST) {
197 dispatcher.push(P2pNetworkSchedulerAction::Error {
198 addr,
199 error: P2pNetworkConnectionError::StreamReset(frame.stream_id),
200 });
201 return Ok(());
202 }
203
204 if frame.flags.contains(YamuxFlags::SYN) && frame.stream_id != 0 {
205 let incoming_streams_number = connection_state
207 .streams
208 .values()
209 .filter(|s| s.select.is_incoming())
210 .count();
211
212 match (max_streams, incoming_streams_number) {
213 (Limit::Some(limit), actual) if actual > limit => {
214 dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame {
215 addr,
216 frame: YamuxFrame {
217 flags: YamuxFlags::FIN,
218 stream_id: frame.stream_id,
219 inner: YamuxFrameInner::Data(vec![].into()),
220 },
221 });
222 }
223 _ => {
224 dispatcher.push(P2pNetworkSelectAction::Init {
225 addr,
226 kind: SelectKind::Stream(peer_id, frame.stream_id),
227 incoming: true,
228 });
229 }
230 }
231 }
232 match &frame.inner {
233 YamuxFrameInner::Data(data) => {
234 if stream.window_ours < stream.max_window_size / 2 {
236 let difference =
237 stream.max_window_size.saturating_mul(2).min(1024 * 1024);
238
239 dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame {
240 addr,
241 frame: YamuxFrame {
242 stream_id: frame.stream_id,
243 flags: YamuxFlags::empty(),
244 inner: YamuxFrameInner::WindowUpdate { difference },
245 },
246 });
247 }
248
249 dispatcher.push(P2pNetworkSelectAction::IncomingData {
250 addr,
251 peer_id,
252 stream_id: frame.stream_id,
253 data: data.clone(),
254 fin: frame.flags.contains(YamuxFlags::FIN),
255 });
256 }
257 YamuxFrameInner::Ping { opaque } => {
258 let response = frame.flags.contains(YamuxFlags::ACK);
259 if !response {
261 let ping = YamuxPing {
262 stream_id: frame.stream_id,
263 opaque: *opaque,
264 response: true,
265 };
266 dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame {
267 addr,
268 frame: ping.into_frame(),
269 });
270 }
271 }
272 YamuxFrameInner::WindowUpdate { .. } => {
273 while let Some(frame) = pending_outgoing.pop_front() {
274 dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame { addr, frame });
275 }
276 }
277 _ => {}
278 }
279
280 Ok(())
281 }
282 P2pNetworkYamuxAction::OutgoingFrame { mut frame, addr } => {
283 let stream_id = frame.stream_id;
284 let Some(stream) = yamux_state.streams.get_mut(&stream_id) else {
285 return Ok(());
286 };
287 match &mut frame.inner {
288 YamuxFrameInner::Data(_) => {
289 if let Some(new_window) =
290 stream.window_theirs.checked_sub(frame.len_as_u32())
291 {
292 stream.window_theirs = new_window;
295 } else {
296 if let Some(remaining) = frame.split_at(stream.window_theirs as usize) {
299 stream.pending.push_front(remaining);
300 }
301
302 stream.window_theirs = 0;
304
305 if stream.pending.iter().map(YamuxFrame::len).sum::<usize>()
307 > yamux_state.pending_outgoing_limit
308 {
309 let dispatcher = state_context.into_dispatcher();
310 let error = P2pNetworkConnectionError::YamuxOverflow(stream_id);
311 dispatcher.push(P2pNetworkSchedulerAction::Error { addr, error });
312 return Ok(());
313 }
314 }
315 }
316 YamuxFrameInner::WindowUpdate { difference } => {
317 stream.window_ours = stream.window_ours.saturating_add(*difference);
318 if stream.window_ours > stream.max_window_size {
319 stream.max_window_size = stream.window_ours.min(MAX_WINDOW_SIZE);
320 }
321 }
322 _ => {}
323 }
324
325 if frame.flags.contains(YamuxFlags::FIN) {
326 connection_state.streams.remove(&frame.stream_id);
327 stream.writable = false;
328 } else {
329 if frame.flags.contains(YamuxFlags::ACK) {
330 stream.established |= true;
331 }
332 if frame.flags.contains(YamuxFlags::SYN) {
333 stream.syn_sent = true;
334 }
335 }
336
337 let dispatcher = state_context.into_dispatcher();
338 let data = fuzzed_maybe!(
339 Data::from(frame.into_bytes()),
340 crate::fuzzer::mutate_yamux_frame
341 );
342 dispatcher.push(P2pNetworkNoiseAction::OutgoingData { addr, data });
343 Ok(())
344 }
345 P2pNetworkYamuxAction::PingStream { addr, ping } => {
346 let dispatcher = state_context.into_dispatcher();
347 dispatcher.push(P2pNetworkYamuxAction::OutgoingFrame {
348 addr,
349 frame: ping.into_frame(),
350 });
351
352 Ok(())
353 }
354 P2pNetworkYamuxAction::OpenStream {
355 stream_id,
356 stream_kind,
357 addr,
358 } => {
359 yamux_state
360 .streams
361 .insert(stream_id, YamuxStreamState::default());
362 connection_state.streams.insert(
363 stream_id,
364 P2pNetworkStreamState::new(stream_kind, meta.time()),
365 );
366
367 let peer_id = match connection_state
368 .auth
369 .as_ref()
370 .and_then(|P2pNetworkAuthState::Noise(noise)| noise.peer_id())
371 {
372 Some(peer_id) => *peer_id,
373 None => return Ok(()),
374 };
375
376 let dispatcher = state_context.into_dispatcher();
377 dispatcher.push(P2pNetworkSelectAction::Init {
378 addr,
379 kind: SelectKind::Stream(peer_id, stream_id),
380 incoming: false,
381 });
382 Ok(())
383 }
384 }
385 }
386}