1use mina_core::{bug_condition, error, fuzz_maybe, fuzzed_maybe, Substate};
2use redux::Timestamp;
3use token::{
4 AuthKind, DiscoveryAlgorithm, IdentifyAlgorithm, MuxKind, Protocol, RpcAlgorithm, StreamKind,
5 Token,
6};
7
8use crate::{
9 fuzzer::{mutate_select_authentication, mutate_select_multiplexing, mutate_select_stream},
10 network::identify::P2pNetworkIdentifyStreamAction,
11 ConnectionAddr, Data, P2pNetworkKademliaStreamAction, P2pNetworkNoiseAction,
12 P2pNetworkPnetAction, P2pNetworkPubsubAction, P2pNetworkRpcAction, P2pNetworkSchedulerAction,
13 P2pNetworkSchedulerState, P2pNetworkYamuxAction, P2pState, YamuxFlags,
14};
15
16use self::{p2p_network_select_state::P2pNetworkSelectStateInner, token::ParseTokenError};
17
18use super::*;
19
20impl P2pNetworkSelectState {
21 pub fn reducer<State, Action>(
22 mut state_context: Substate<Action, State, P2pNetworkSchedulerState>,
23 action: redux::ActionWithMeta<P2pNetworkSelectAction>,
24 ) -> Result<(), String>
25 where
26 State: crate::P2pStateTrait,
27 Action: crate::P2pActionTrait<State>,
28 {
29 let (action, meta) = action.split();
30 let select_kind = action.select_kind();
31
32 let select_state = state_context
33 .get_substate_mut()?
34 .connection_state_mut(action.addr())
35 .and_then(|conn| conn.select_state_mut(&select_kind))
36 .ok_or_else(|| format!("Select state not found for {action:?}"))?;
37
38 if let P2pNetworkSelectStateInner::Error(_) = &select_state.inner {
39 return Ok(());
40 }
41
42 match action {
43 P2pNetworkSelectAction::Init {
45 incoming,
46 addr,
47 kind,
48 } => {
49 match (&select_state.inner, incoming) {
50 (P2pNetworkSelectStateInner::Initiator { .. }, true) => {
51 select_state.inner = P2pNetworkSelectStateInner::Responder
52 }
53 (P2pNetworkSelectStateInner::Responder, false) => {
54 select_state.inner = P2pNetworkSelectStateInner::Initiator {
55 proposing: token::Protocol::Mux(token::MuxKind::YamuxNoNewLine1_0_0),
56 }
57 }
58 _ => {}
59 }
60
61 let (dispatcher, state) = state_context.into_dispatcher_and_state();
62 let state: &P2pNetworkSchedulerState = state.substate()?;
63 let state = state
64 .connection_state(&addr)
65 .and_then(|state| state.select_state(&kind))
66 .ok_or_else(|| format!("Select state not found for {action:?}"))?;
67
68 if state.negotiated.is_none() && !incoming {
69 let mut tokens = vec![Token::Handshake];
70
71 match &state.inner {
72 P2pNetworkSelectStateInner::Uncertain { proposing } => {
73 tokens.push(Token::SimultaneousConnect);
74 tokens.push(Token::Protocol(*proposing));
75 }
76 P2pNetworkSelectStateInner::Initiator { proposing } => {
77 tokens.push(Token::Protocol(*proposing));
78 }
79 _ => {}
80 };
81 dispatcher.push(P2pNetworkSelectAction::OutgoingTokens { addr, kind, tokens });
82 }
83
84 Ok(())
85 }
86 P2pNetworkSelectAction::IncomingData {
87 data, addr, fin, ..
88 }
89 | P2pNetworkSelectAction::IncomingDataAuth { data, addr, fin }
90 | P2pNetworkSelectAction::IncomingDataMux {
91 data, addr, fin, ..
92 } => {
93 select_state.handle_incoming_data(&data);
94
95 let (dispatcher, state) = state_context.into_dispatcher_and_state();
96
97 let state: &P2pNetworkSchedulerState = state.substate()?;
98 let select_state = state
99 .connection_state(&addr)
100 .and_then(|conn| conn.select_state(&select_kind))
101 .ok_or("Select state not found for incoming data")?;
102
103 if let P2pNetworkSelectStateInner::Error(error) = &select_state.inner {
104 dispatcher.push(P2pNetworkSchedulerAction::SelectError {
105 addr,
106 kind: select_kind,
107 error: error.to_owned(),
108 })
109 } else {
110 for action in select_state.forward_incoming_data(select_kind, addr, data, fin) {
111 dispatcher.push(action)
112 }
113 }
114
115 Ok(())
116 }
117 P2pNetworkSelectAction::IncomingPayloadAuth { addr, fin, data }
118 | P2pNetworkSelectAction::IncomingPayloadMux {
119 addr, fin, data, ..
120 }
121 | P2pNetworkSelectAction::IncomingPayload {
122 addr, fin, data, ..
123 } => {
124 select_state.recv.buffer.clear();
125 select_state.recv.buffer.shrink_to(0x2000);
126
127 P2pNetworkSelectState::handle_negotiated_token(
128 state_context,
129 select_kind,
130 addr,
131 data,
132 fin,
133 meta.time(),
134 )
135 }
136 P2pNetworkSelectAction::IncomingToken { kind, addr } => {
137 let Some(token) = select_state.tokens.pop_front() else {
138 bug_condition!("Invalid state for action: {action:?}");
139 return Ok(());
140 };
141 select_state.handle_incoming_token(token, meta.time(), select_kind);
142
143 let (dispatcher, state) = state_context.into_dispatcher_and_state();
144 let scheduler: &P2pNetworkSchedulerState = state.substate()?;
145 let select_state = scheduler
146 .connection_state(&addr)
147 .and_then(|stream| stream.select_state(&kind))
148 .ok_or_else(|| format!("Select state not found for {action:?}"))?;
149
150 if let P2pNetworkSelectStateInner::Error(error) = &select_state.inner {
151 dispatcher.push(P2pNetworkSchedulerAction::SelectError {
152 addr,
153 kind,
154 error: error.to_owned(),
155 });
156 return Ok(());
157 }
158
159 if let Some(token) = &select_state.to_send {
160 dispatcher.push(P2pNetworkSelectAction::OutgoingTokens {
161 addr,
162 kind,
163 tokens: vec![token.clone()],
164 })
165 }
166
167 if let Some(protocol) = select_state.negotiated {
168 let p2p_state: &P2pState = state.substate()?;
169
170 let expected_peer_id = p2p_state
171 .peer_with_connection(addr)
172 .map(|(peer_id, _)| peer_id);
173
174 let incoming =
175 matches!(&select_state.inner, P2pNetworkSelectStateInner::Responder);
176 dispatcher.push(P2pNetworkSchedulerAction::SelectDone {
177 addr,
178 kind,
179 protocol,
180 incoming,
181 expected_peer_id,
182 })
183 }
184
185 Ok(())
186 }
187 P2pNetworkSelectAction::OutgoingTokens { addr, kind, tokens } => {
188 let dispatcher = state_context.into_dispatcher();
189
190 let mut data = {
191 let mut data = vec![];
192 for token in &tokens {
193 data.extend_from_slice(token.name())
194 }
195 data.into()
196 };
197
198 match kind {
199 SelectKind::Authentication => {
200 fuzz_maybe!(&mut data, mutate_select_authentication);
201 dispatcher.push(P2pNetworkPnetAction::OutgoingData { addr, data });
202 }
203 SelectKind::Multiplexing(_) | SelectKind::MultiplexingNoPeerId => {
204 fuzz_maybe!(&mut data, mutate_select_multiplexing);
205 dispatcher
206 .push(P2pNetworkNoiseAction::OutgoingDataSelectMux { addr, data });
207 }
208 SelectKind::Stream(_, stream_id) => {
209 if let Some(na) = tokens.iter().find(|t| t.name() == Token::Na.name()) {
210 dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
211 addr,
212 stream_id,
213 data: na.name().to_vec().into(),
214 flags: YamuxFlags::FIN,
215 });
216 } else {
217 for token in tokens {
218 let data = fuzzed_maybe!(
219 token.name().to_vec().into(),
220 mutate_select_stream
221 );
222 dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
223 addr,
224 stream_id,
225 data,
226 flags: Default::default(),
227 });
228 }
229 }
230 }
231 }
232
233 Ok(())
234 }
235 P2pNetworkSelectAction::Timeout { addr, kind } => {
236 let error = "timeout".to_owned();
237 select_state.inner = P2pNetworkSelectStateInner::Error(error.clone());
238 let dispatcher = state_context.into_dispatcher();
239 dispatcher.push(P2pNetworkSchedulerAction::SelectError { addr, kind, error });
240 Ok(())
241 }
242 }
243 }
244
245 fn handle_incoming_data(&mut self, data: &Data) {
246 if self.negotiated.is_none() {
247 self.recv.put(data);
248 loop {
249 let parse_result = self.recv.parse_token();
250
251 match parse_result {
252 Err(ParseTokenError) => {
253 self.inner = P2pNetworkSelectStateInner::Error("parse_token".to_owned());
254 self.recv.buffer.clear();
255 self.recv.buffer.shrink_to(0x2000);
256 break;
257 }
258 Ok(None) => break,
259 Ok(Some(token)) => {
260 let done = matches!(
261 token,
262 token::Token::Protocol(..) | token::Token::UnknownProtocol(..)
263 );
264 self.tokens.push_back(token);
265
266 if done {
267 break;
268 }
269 }
270 }
271 }
272 }
273 }
274
275 fn handle_incoming_token(&mut self, token: Token, time: Timestamp, kind: SelectKind) {
276 self.to_send = None;
277 match &self.inner {
278 P2pNetworkSelectStateInner::Error(_) => {}
279 P2pNetworkSelectStateInner::Initiator { proposing } => match token {
280 token::Token::Handshake => {}
281 token::Token::Na => {
282 self.inner = P2pNetworkSelectStateInner::Error("token is NA".to_owned());
284 self.negotiated = Some(None);
285 }
286 token::Token::SimultaneousConnect => {
287 self.inner =
289 P2pNetworkSelectStateInner::Error("simultaneous connect token".to_owned());
290 }
291 token::Token::Protocol(response) => {
292 if response == *proposing {
293 self.negotiated = Some(Some(response));
294 } else {
295 self.inner = P2pNetworkSelectStateInner::Error(format!(
296 "protocol mismatch: {response:?} != {proposing:?}"
297 ));
298 }
299 }
300 token::Token::UnknownProtocol(name) => {
301 self.inner = P2pNetworkSelectStateInner::Error(format!(
303 "unknown protocol `{}`",
304 String::from_utf8_lossy(&name)
305 ));
306 self.negotiated = Some(None);
307 }
308 },
309 P2pNetworkSelectStateInner::Uncertain { proposing } => match token {
310 token::Token::Handshake => {}
311 token::Token::Na => {
312 let proposing = *proposing;
313 self.inner = P2pNetworkSelectStateInner::Initiator { proposing };
314 }
315 token::Token::SimultaneousConnect => {
316 }
318 token::Token::Protocol(_) => {
319 self.inner = P2pNetworkSelectStateInner::Error(
320 "protocol mismatch: uncertain".to_owned(),
321 );
322 }
323 token::Token::UnknownProtocol(name) => {
324 self.inner = P2pNetworkSelectStateInner::Error(format!(
325 "protocol mismatch: uncertain with unknown protocol {}",
326 String::from_utf8_lossy(&name)
327 ));
328 }
329 },
330 P2pNetworkSelectStateInner::Responder => match token {
331 token::Token::Handshake => {
332 self.to_send = Some(token::Token::Handshake);
333 }
334 token::Token::Na => {}
335 token::Token::SimultaneousConnect => {
336 self.to_send = Some(token::Token::Na);
337 }
338 token::Token::Protocol(protocol) => {
339 let reply = match protocol {
340 token::Protocol::Auth(_) => {
341 token::Token::Protocol(token::Protocol::Auth(token::AuthKind::Noise))
342 }
343 token::Protocol::Mux(token::MuxKind::Yamux1_0_0) => {
344 token::Token::Protocol(token::Protocol::Mux(token::MuxKind::Yamux1_0_0))
345 }
346 token::Protocol::Mux(token::MuxKind::YamuxNoNewLine1_0_0) => {
347 token::Token::Protocol(token::Protocol::Mux(
348 token::MuxKind::YamuxNoNewLine1_0_0,
349 ))
350 }
351 token::Protocol::Stream(
352 token::StreamKind::Rpc(_)
353 | token::StreamKind::Discovery(_)
354 | token::StreamKind::Broadcast(_)
355 | token::StreamKind::Identify(_)
356 | token::StreamKind::Ping(_)
357 | token::StreamKind::Bitswap(_)
358 | token::StreamKind::Status(_),
359 ) => token::Token::Protocol(protocol),
360 };
361 let negotiated = if let token::Token::Protocol(p) = &reply {
362 Some(*p)
363 } else {
364 None
365 };
366 self.negotiated = Some(negotiated);
367 self.to_send = Some(reply);
368 }
369 token::Token::UnknownProtocol(name) => {
370 const KNOWN_UNKNOWN_PROTOCOLS: [&str; 3] =
371 ["/ipfs/id/push/1.0.0", "/ipfs/id/1.0.0", "/mina/node-status"];
372 if !name.is_empty() {
373 if let Ok(str) = std::str::from_utf8(&name[1..]) {
374 let str = str.trim_end_matches('\n');
375 if !KNOWN_UNKNOWN_PROTOCOLS.iter().any(|s| (*s).eq(str)) {
376 self.inner = P2pNetworkSelectStateInner::Error(format!(
377 "responder with unknown protocol {}",
378 str
379 ));
380
381 error!(time; "unknown protocol: {str}, {kind:?}");
382 }
383 } else {
384 self.inner = P2pNetworkSelectStateInner::Error(format!(
385 "responder with invalid protocol data {:?}",
386 name
387 ));
388
389 error!(time; "invalid protocol: {name:?}, {kind:?}");
390 }
391 } else {
392 self.inner = P2pNetworkSelectStateInner::Error(
393 "responder with empty protocol".to_string(),
394 );
395
396 error!(time; "empty protocol: {kind:?}");
397 }
398 self.to_send = Some(token::Token::Na);
399 self.negotiated = Some(None);
400 }
401 },
402 }
403 }
404
405 fn handle_negotiated_token<Action, State>(
406 state_context: Substate<Action, State, P2pNetworkSchedulerState>,
407 select_kind: SelectKind,
408 addr: ConnectionAddr,
409 data: Data,
410 fin: bool,
411 time: Timestamp,
412 ) -> Result<(), String>
413 where
414 State: crate::P2pStateTrait,
415 Action: crate::P2pActionTrait<State>,
416 {
417 let (dispatcher, state) = state_context.into_dispatcher_and_state();
418 let p2p_state: &P2pState = state.substate()?;
419 let state: &P2pNetworkSchedulerState = state.substate()?;
420 let state = state
421 .connection_state(&addr)
422 .and_then(|state| state.select_state(&select_kind))
423 .ok_or_else(|| "Select state not found incoming payload".to_owned())?;
424
425 let Some(Some(negotiated)) = &state.negotiated else {
426 bug_condition!(
427 "Invalid negotiation state {:?} for incoming payload",
428 state.negotiated,
429 );
430 return Ok(());
431 };
432
433 match negotiated {
434 Protocol::Auth(AuthKind::Noise) => {
435 dispatcher.push(P2pNetworkNoiseAction::IncomingData { addr, data });
436 }
437 Protocol::Mux(MuxKind::Yamux1_0_0 | MuxKind::YamuxNoNewLine1_0_0) => {
438 dispatcher.push(P2pNetworkYamuxAction::IncomingData { addr, data });
439 }
440 Protocol::Stream(kind) => match select_kind {
441 SelectKind::Stream(peer_id, stream_id) => match kind {
442 StreamKind::Discovery(DiscoveryAlgorithm::Kademlia1_0_0) => {
443 if !fin {
444 dispatcher.push(P2pNetworkKademliaStreamAction::IncomingData {
445 addr,
446 peer_id,
447 stream_id,
448 data,
449 });
450 } else {
451 dispatcher.push(P2pNetworkKademliaStreamAction::RemoteClose {
452 addr,
453 peer_id,
454 stream_id,
455 });
456 }
457 }
458 StreamKind::Identify(IdentifyAlgorithm::Identify1_0_0) => {
459 if !fin {
460 dispatcher.push(P2pNetworkIdentifyStreamAction::IncomingData {
461 addr,
462 peer_id,
463 stream_id,
464 data,
465 });
466 } else {
467 dispatcher.push(P2pNetworkIdentifyStreamAction::RemoteClose {
468 addr,
469 peer_id,
470 stream_id,
471 });
472 }
473 }
474 StreamKind::Broadcast(_) => {
475 dispatcher.push(P2pNetworkPubsubAction::IncomingData {
476 peer_id,
477 addr,
478 stream_id,
479 data,
480 seen_limit: p2p_state.config.meshsub.mcache_len,
481 });
482 }
483 StreamKind::Rpc(RpcAlgorithm::Rpc0_0_1) => {
484 dispatcher.push(P2pNetworkRpcAction::IncomingData {
485 addr,
486 peer_id,
487 stream_id,
488 data,
489 });
490 }
491 _ => error!(time;
492 "trying to negotiate unimplemented stream kind {kind:?}"
493 ),
494 },
495 _ => error!(time; "invalid select protocol kind: {:?}", kind),
496 },
497 }
498
499 Ok(())
500 }
501}