1use openmina_core::{bug_condition, error, fuzz_maybe, fuzzed_maybe, Substate};
2use redux::Timestamp;
3use token::{
4 AuthKind, DiscoveryAlgorithm, IdentifyAlgorithm, MuxKind, Protocol, RpcAlgorithm, StreamKind,
5 Token,
6};
7
8use crate::{
9 fuzzer::{mutate_select_authentication, mutate_select_multiplexing, mutate_select_stream},
10 network::identify::P2pNetworkIdentifyStreamAction,
11 ConnectionAddr, Data, P2pNetworkKademliaStreamAction, P2pNetworkNoiseAction,
12 P2pNetworkPnetAction, P2pNetworkPubsubAction, P2pNetworkRpcAction, P2pNetworkSchedulerAction,
13 P2pNetworkSchedulerState, P2pNetworkYamuxAction, P2pState, YamuxFlags,
14};
15
16use self::{p2p_network_select_state::P2pNetworkSelectStateInner, token::ParseTokenError};
17
18use super::*;
19
20impl P2pNetworkSelectState {
21 pub fn reducer<State, Action>(
22 mut state_context: Substate<Action, State, P2pNetworkSchedulerState>,
23 action: redux::ActionWithMeta<P2pNetworkSelectAction>,
24 ) -> Result<(), String>
25 where
26 State: crate::P2pStateTrait,
27 Action: crate::P2pActionTrait<State>,
28 {
29 let (action, meta) = action.split();
30 let select_kind = action.select_kind();
31
32 let select_state = state_context
33 .get_substate_mut()?
34 .connection_state_mut(action.addr())
35 .and_then(|conn| conn.select_state_mut(&select_kind))
36 .ok_or_else(|| format!("Select state not found for {action:?}"))?;
37
38 if let P2pNetworkSelectStateInner::Error(_) = &select_state.inner {
39 return Ok(());
40 }
41
42 match action {
43 P2pNetworkSelectAction::Init {
45 incoming,
46 addr,
47 kind,
48 } => {
49 match (&select_state.inner, incoming) {
50 (P2pNetworkSelectStateInner::Initiator { .. }, true) => {
51 select_state.inner = P2pNetworkSelectStateInner::Responder
52 }
53 (P2pNetworkSelectStateInner::Responder, false) => {
54 select_state.inner = P2pNetworkSelectStateInner::Initiator {
55 proposing: token::Protocol::Mux(token::MuxKind::YamuxNoNewLine1_0_0),
56 }
57 }
58 _ => {}
59 }
60
61 let (dispatcher, state) = state_context.into_dispatcher_and_state();
62 let state: &P2pNetworkSchedulerState = state.substate()?;
63 let state = state
64 .connection_state(&addr)
65 .and_then(|state| state.select_state(&kind))
66 .ok_or_else(|| format!("Select state not found for {action:?}"))?;
67
68 if state.negotiated.is_none() && !incoming {
69 let mut tokens = vec![Token::Handshake];
70
71 match &state.inner {
72 P2pNetworkSelectStateInner::Uncertain { proposing } => {
73 tokens.push(Token::SimultaneousConnect);
74 tokens.push(Token::Protocol(*proposing));
75 }
76 P2pNetworkSelectStateInner::Initiator { proposing } => {
77 tokens.push(Token::Protocol(*proposing));
78 }
79 _ => {}
80 };
81 dispatcher.push(P2pNetworkSelectAction::OutgoingTokens { addr, kind, tokens });
82 }
83
84 Ok(())
85 }
86 P2pNetworkSelectAction::IncomingData {
87 data, addr, fin, ..
88 }
89 | P2pNetworkSelectAction::IncomingDataAuth { data, addr, fin }
90 | P2pNetworkSelectAction::IncomingDataMux {
91 data, addr, fin, ..
92 } => {
93 select_state.handle_incoming_data(&data);
94
95 let (dispatcher, state) = state_context.into_dispatcher_and_state();
96
97 let state: &P2pNetworkSchedulerState = state.substate()?;
98 let select_state = state
99 .connection_state(&addr)
100 .and_then(|conn| conn.select_state(&select_kind))
101 .ok_or("Select state not found for incoming data")?;
102
103 if let P2pNetworkSelectStateInner::Error(error) = &select_state.inner {
104 dispatcher.push(P2pNetworkSchedulerAction::SelectError {
105 addr,
106 kind: select_kind,
107 error: error.to_owned(),
108 })
109 } else {
110 for action in select_state.forward_incoming_data(select_kind, addr, data, fin) {
111 dispatcher.push(action)
112 }
113 }
114
115 Ok(())
116 }
117 P2pNetworkSelectAction::IncomingPayloadAuth { addr, fin, data }
118 | P2pNetworkSelectAction::IncomingPayloadMux {
119 addr, fin, data, ..
120 }
121 | P2pNetworkSelectAction::IncomingPayload {
122 addr, fin, data, ..
123 } => {
124 select_state.recv.buffer.clear();
125 select_state.recv.buffer.shrink_to(0x2000);
126
127 P2pNetworkSelectState::handle_negotiated_token(
128 state_context,
129 select_kind,
130 addr,
131 data,
132 fin,
133 meta.time(),
134 )
135 }
136 P2pNetworkSelectAction::IncomingToken { kind, addr } => {
137 let Some(token) = select_state.tokens.pop_front() else {
138 bug_condition!("Invalid state for action: {action:?}");
139 return Ok(());
140 };
141 select_state.handle_incoming_token(token, meta.time(), select_kind);
142
143 let (dispatcher, state) = state_context.into_dispatcher_and_state();
144 let scheduler: &P2pNetworkSchedulerState = state.substate()?;
145 let select_state = scheduler
146 .connection_state(&addr)
147 .and_then(|stream| stream.select_state(&kind))
148 .ok_or_else(|| format!("Select state not found for {action:?}"))?;
149
150 if let P2pNetworkSelectStateInner::Error(error) = &select_state.inner {
151 dispatcher.push(P2pNetworkSchedulerAction::SelectError {
152 addr,
153 kind,
154 error: error.to_owned(),
155 });
156 return Ok(());
157 }
158
159 if let Some(token) = &select_state.to_send {
160 dispatcher.push(P2pNetworkSelectAction::OutgoingTokens {
161 addr,
162 kind,
163 tokens: vec![token.clone()],
164 })
165 }
166
167 if let Some(protocol) = select_state.negotiated {
168 let p2p_state: &P2pState = state.substate()?;
169
170 let expected_peer_id = p2p_state
171 .peer_with_connection(addr)
172 .map(|(peer_id, _)| peer_id);
173
174 let incoming = matches!(
175 &select_state.inner,
176 P2pNetworkSelectStateInner::Responder { .. }
177 );
178 dispatcher.push(P2pNetworkSchedulerAction::SelectDone {
179 addr,
180 kind,
181 protocol,
182 incoming,
183 expected_peer_id,
184 })
185 }
186
187 Ok(())
188 }
189 P2pNetworkSelectAction::OutgoingTokens { addr, kind, tokens } => {
190 let dispatcher = state_context.into_dispatcher();
191
192 let mut data = {
193 let mut data = vec![];
194 for token in &tokens {
195 data.extend_from_slice(token.name())
196 }
197 data.into()
198 };
199
200 match kind {
201 SelectKind::Authentication => {
202 fuzz_maybe!(&mut data, mutate_select_authentication);
203 dispatcher.push(P2pNetworkPnetAction::OutgoingData { addr, data });
204 }
205 SelectKind::Multiplexing(_) | SelectKind::MultiplexingNoPeerId => {
206 fuzz_maybe!(&mut data, mutate_select_multiplexing);
207 dispatcher
208 .push(P2pNetworkNoiseAction::OutgoingDataSelectMux { addr, data });
209 }
210 SelectKind::Stream(_, stream_id) => {
211 if let Some(na) = tokens.iter().find(|t| t.name() == Token::Na.name()) {
212 dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
213 addr,
214 stream_id,
215 data: na.name().to_vec().into(),
216 flags: YamuxFlags::FIN,
217 });
218 } else {
219 for token in tokens {
220 let data = fuzzed_maybe!(
221 token.name().to_vec().into(),
222 mutate_select_stream
223 );
224 dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
225 addr,
226 stream_id,
227 data,
228 flags: Default::default(),
229 });
230 }
231 }
232 }
233 }
234
235 Ok(())
236 }
237 P2pNetworkSelectAction::Timeout { addr, kind } => {
238 let error = "timeout".to_owned();
239 select_state.inner = P2pNetworkSelectStateInner::Error(error.clone());
240 let dispatcher = state_context.into_dispatcher();
241 dispatcher.push(P2pNetworkSchedulerAction::SelectError { addr, kind, error });
242 Ok(())
243 }
244 }
245 }
246
247 fn handle_incoming_data(&mut self, data: &Data) {
248 if self.negotiated.is_none() {
249 self.recv.put(data);
250 loop {
251 let parse_result = self.recv.parse_token();
252
253 match parse_result {
254 Err(ParseTokenError) => {
255 self.inner = P2pNetworkSelectStateInner::Error("parse_token".to_owned());
256 self.recv.buffer.clear();
257 self.recv.buffer.shrink_to(0x2000);
258 break;
259 }
260 Ok(None) => break,
261 Ok(Some(token)) => {
262 let done = matches!(
263 token,
264 token::Token::Protocol(..) | token::Token::UnknownProtocol(..)
265 );
266 self.tokens.push_back(token);
267
268 if done {
269 break;
270 }
271 }
272 }
273 }
274 }
275 }
276
277 fn handle_incoming_token(&mut self, token: Token, time: Timestamp, kind: SelectKind) {
278 self.to_send = None;
279 match &self.inner {
280 P2pNetworkSelectStateInner::Error(_) => {}
281 P2pNetworkSelectStateInner::Initiator { proposing } => match token {
282 token::Token::Handshake => {}
283 token::Token::Na => {
284 self.inner = P2pNetworkSelectStateInner::Error("token is NA".to_owned());
286 self.negotiated = Some(None);
287 }
288 token::Token::SimultaneousConnect => {
289 self.inner =
291 P2pNetworkSelectStateInner::Error("simultaneous connect token".to_owned());
292 }
293 token::Token::Protocol(response) => {
294 if response == *proposing {
295 self.negotiated = Some(Some(response));
296 } else {
297 self.inner = P2pNetworkSelectStateInner::Error(format!(
298 "protocol mismatch: {response:?} != {proposing:?}"
299 ));
300 }
301 }
302 token::Token::UnknownProtocol(name) => {
303 self.inner = P2pNetworkSelectStateInner::Error(format!(
305 "unknown protocol `{}`",
306 String::from_utf8_lossy(&name)
307 ));
308 self.negotiated = Some(None);
309 }
310 },
311 P2pNetworkSelectStateInner::Uncertain { proposing } => match token {
312 token::Token::Handshake => {}
313 token::Token::Na => {
314 let proposing = *proposing;
315 self.inner = P2pNetworkSelectStateInner::Initiator { proposing };
316 }
317 token::Token::SimultaneousConnect => {
318 }
320 token::Token::Protocol(_) => {
321 self.inner = P2pNetworkSelectStateInner::Error(
322 "protocol mismatch: uncertain".to_owned(),
323 );
324 }
325 token::Token::UnknownProtocol(name) => {
326 self.inner = P2pNetworkSelectStateInner::Error(format!(
327 "protocol mismatch: uncertain with unknown protocol {}",
328 String::from_utf8_lossy(&name)
329 ));
330 }
331 },
332 P2pNetworkSelectStateInner::Responder => match token {
333 token::Token::Handshake => {
334 self.to_send = Some(token::Token::Handshake);
335 }
336 token::Token::Na => {}
337 token::Token::SimultaneousConnect => {
338 self.to_send = Some(token::Token::Na);
339 }
340 token::Token::Protocol(protocol) => {
341 let reply = match protocol {
342 token::Protocol::Auth(_) => {
343 token::Token::Protocol(token::Protocol::Auth(token::AuthKind::Noise))
344 }
345 token::Protocol::Mux(token::MuxKind::Yamux1_0_0) => {
346 token::Token::Protocol(token::Protocol::Mux(token::MuxKind::Yamux1_0_0))
347 }
348 token::Protocol::Mux(token::MuxKind::YamuxNoNewLine1_0_0) => {
349 token::Token::Protocol(token::Protocol::Mux(
350 token::MuxKind::YamuxNoNewLine1_0_0,
351 ))
352 }
353 token::Protocol::Stream(
354 token::StreamKind::Rpc(_)
355 | token::StreamKind::Discovery(_)
356 | token::StreamKind::Broadcast(_)
357 | token::StreamKind::Identify(_)
358 | token::StreamKind::Ping(_)
359 | token::StreamKind::Bitswap(_)
360 | token::StreamKind::Status(_),
361 ) => token::Token::Protocol(protocol),
362 };
363 let negotiated = if let token::Token::Protocol(p) = &reply {
364 Some(*p)
365 } else {
366 None
367 };
368 self.negotiated = Some(negotiated);
369 self.to_send = Some(reply);
370 }
371 token::Token::UnknownProtocol(name) => {
372 const KNOWN_UNKNOWN_PROTOCOLS: [&str; 3] =
373 ["/ipfs/id/push/1.0.0", "/ipfs/id/1.0.0", "/mina/node-status"];
374 if !name.is_empty() {
375 if let Ok(str) = std::str::from_utf8(&name[1..]) {
376 let str = str.trim_end_matches('\n');
377 if !KNOWN_UNKNOWN_PROTOCOLS.iter().any(|s| (*s).eq(str)) {
378 self.inner = P2pNetworkSelectStateInner::Error(format!(
379 "responder with unknown protocol {}",
380 str
381 ));
382
383 error!(time; "unknown protocol: {str}, {kind:?}");
384 }
385 } else {
386 self.inner = P2pNetworkSelectStateInner::Error(format!(
387 "responder with invalid protocol data {:?}",
388 name
389 ));
390
391 error!(time; "invalid protocol: {name:?}, {kind:?}");
392 }
393 } else {
394 self.inner = P2pNetworkSelectStateInner::Error(
395 "responder with empty protocol".to_string(),
396 );
397
398 error!(time; "empty protocol: {kind:?}");
399 }
400 self.to_send = Some(token::Token::Na);
401 self.negotiated = Some(None);
402 }
403 },
404 }
405 }
406
407 fn handle_negotiated_token<Action, State>(
408 state_context: Substate<Action, State, P2pNetworkSchedulerState>,
409 select_kind: SelectKind,
410 addr: ConnectionAddr,
411 data: Data,
412 fin: bool,
413 time: Timestamp,
414 ) -> Result<(), String>
415 where
416 State: crate::P2pStateTrait,
417 Action: crate::P2pActionTrait<State>,
418 {
419 let (dispatcher, state) = state_context.into_dispatcher_and_state();
420 let p2p_state: &P2pState = state.substate()?;
421 let state: &P2pNetworkSchedulerState = state.substate()?;
422 let state = state
423 .connection_state(&addr)
424 .and_then(|state| state.select_state(&select_kind))
425 .ok_or_else(|| "Select state not found incoming payload".to_owned())?;
426
427 let Some(Some(negotiated)) = &state.negotiated else {
428 bug_condition!(
429 "Invalid negotiation state {:?} for incoming payload",
430 state.negotiated,
431 );
432 return Ok(());
433 };
434
435 match negotiated {
436 Protocol::Auth(AuthKind::Noise) => {
437 dispatcher.push(P2pNetworkNoiseAction::IncomingData { addr, data });
438 }
439 Protocol::Mux(MuxKind::Yamux1_0_0 | MuxKind::YamuxNoNewLine1_0_0) => {
440 dispatcher.push(P2pNetworkYamuxAction::IncomingData { addr, data });
441 }
442 Protocol::Stream(kind) => match select_kind {
443 SelectKind::Stream(peer_id, stream_id) => match kind {
444 StreamKind::Discovery(DiscoveryAlgorithm::Kademlia1_0_0) => {
445 if !fin {
446 dispatcher.push(P2pNetworkKademliaStreamAction::IncomingData {
447 addr,
448 peer_id,
449 stream_id,
450 data,
451 });
452 } else {
453 dispatcher.push(P2pNetworkKademliaStreamAction::RemoteClose {
454 addr,
455 peer_id,
456 stream_id,
457 });
458 }
459 }
460 StreamKind::Identify(IdentifyAlgorithm::Identify1_0_0) => {
461 if !fin {
462 dispatcher.push(P2pNetworkIdentifyStreamAction::IncomingData {
463 addr,
464 peer_id,
465 stream_id,
466 data,
467 });
468 } else {
469 dispatcher.push(P2pNetworkIdentifyStreamAction::RemoteClose {
470 addr,
471 peer_id,
472 stream_id,
473 });
474 }
475 }
476 StreamKind::Broadcast(_) => {
477 dispatcher.push(P2pNetworkPubsubAction::IncomingData {
478 peer_id,
479 addr,
480 stream_id,
481 data,
482 seen_limit: p2p_state.config.meshsub.mcache_len,
483 });
484 }
485 StreamKind::Rpc(RpcAlgorithm::Rpc0_0_1) => {
486 dispatcher.push(P2pNetworkRpcAction::IncomingData {
487 addr,
488 peer_id,
489 stream_id,
490 data,
491 });
492 }
493 _ => error!(time;
494 "trying to negotiate unimplemented stream kind {kind:?}"
495 ),
496 },
497 _ => error!(time; "invalid select protocol kind: {:?}", kind),
498 },
499 }
500
501 Ok(())
502 }
503}