mina_p2p/network/select/
p2p_network_select_reducer.rs

1use mina_core::{bug_condition, error, fuzz_maybe, fuzzed_maybe, Substate};
2use redux::Timestamp;
3use token::{
4    AuthKind, DiscoveryAlgorithm, IdentifyAlgorithm, MuxKind, Protocol, RpcAlgorithm, StreamKind,
5    Token,
6};
7
8use crate::{
9    fuzzer::{mutate_select_authentication, mutate_select_multiplexing, mutate_select_stream},
10    network::identify::P2pNetworkIdentifyStreamAction,
11    ConnectionAddr, Data, P2pNetworkKademliaStreamAction, P2pNetworkNoiseAction,
12    P2pNetworkPnetAction, P2pNetworkPubsubAction, P2pNetworkRpcAction, P2pNetworkSchedulerAction,
13    P2pNetworkSchedulerState, P2pNetworkYamuxAction, P2pState, YamuxFlags,
14};
15
16use self::{p2p_network_select_state::P2pNetworkSelectStateInner, token::ParseTokenError};
17
18use super::*;
19
20impl P2pNetworkSelectState {
21    pub fn reducer<State, Action>(
22        mut state_context: Substate<Action, State, P2pNetworkSchedulerState>,
23        action: redux::ActionWithMeta<P2pNetworkSelectAction>,
24    ) -> Result<(), String>
25    where
26        State: crate::P2pStateTrait,
27        Action: crate::P2pActionTrait<State>,
28    {
29        let (action, meta) = action.split();
30        let select_kind = action.select_kind();
31
32        let select_state = state_context
33            .get_substate_mut()?
34            .connection_state_mut(action.addr())
35            .and_then(|conn| conn.select_state_mut(&select_kind))
36            .ok_or_else(|| format!("Select state not found for {action:?}"))?;
37
38        if let P2pNetworkSelectStateInner::Error(_) = &select_state.inner {
39            return Ok(());
40        }
41
42        match action {
43            // hack for noise
44            P2pNetworkSelectAction::Init {
45                incoming,
46                addr,
47                kind,
48            } => {
49                match (&select_state.inner, incoming) {
50                    (P2pNetworkSelectStateInner::Initiator { .. }, true) => {
51                        select_state.inner = P2pNetworkSelectStateInner::Responder
52                    }
53                    (P2pNetworkSelectStateInner::Responder, false) => {
54                        select_state.inner = P2pNetworkSelectStateInner::Initiator {
55                            proposing: token::Protocol::Mux(token::MuxKind::YamuxNoNewLine1_0_0),
56                        }
57                    }
58                    _ => {}
59                }
60
61                let (dispatcher, state) = state_context.into_dispatcher_and_state();
62                let state: &P2pNetworkSchedulerState = state.substate()?;
63                let state = state
64                    .connection_state(&addr)
65                    .and_then(|state| state.select_state(&kind))
66                    .ok_or_else(|| format!("Select state not found for {action:?}"))?;
67
68                if state.negotiated.is_none() && !incoming {
69                    let mut tokens = vec![Token::Handshake];
70
71                    match &state.inner {
72                        P2pNetworkSelectStateInner::Uncertain { proposing } => {
73                            tokens.push(Token::SimultaneousConnect);
74                            tokens.push(Token::Protocol(*proposing));
75                        }
76                        P2pNetworkSelectStateInner::Initiator { proposing } => {
77                            tokens.push(Token::Protocol(*proposing));
78                        }
79                        _ => {}
80                    };
81                    dispatcher.push(P2pNetworkSelectAction::OutgoingTokens { addr, kind, tokens });
82                }
83
84                Ok(())
85            }
86            P2pNetworkSelectAction::IncomingData {
87                data, addr, fin, ..
88            }
89            | P2pNetworkSelectAction::IncomingDataAuth { data, addr, fin }
90            | P2pNetworkSelectAction::IncomingDataMux {
91                data, addr, fin, ..
92            } => {
93                select_state.handle_incoming_data(&data);
94
95                let (dispatcher, state) = state_context.into_dispatcher_and_state();
96
97                let state: &P2pNetworkSchedulerState = state.substate()?;
98                let select_state = state
99                    .connection_state(&addr)
100                    .and_then(|conn| conn.select_state(&select_kind))
101                    .ok_or("Select state not found for incoming data")?;
102
103                if let P2pNetworkSelectStateInner::Error(error) = &select_state.inner {
104                    dispatcher.push(P2pNetworkSchedulerAction::SelectError {
105                        addr,
106                        kind: select_kind,
107                        error: error.to_owned(),
108                    })
109                } else {
110                    for action in select_state.forward_incoming_data(select_kind, addr, data, fin) {
111                        dispatcher.push(action)
112                    }
113                }
114
115                Ok(())
116            }
117            P2pNetworkSelectAction::IncomingPayloadAuth { addr, fin, data }
118            | P2pNetworkSelectAction::IncomingPayloadMux {
119                addr, fin, data, ..
120            }
121            | P2pNetworkSelectAction::IncomingPayload {
122                addr, fin, data, ..
123            } => {
124                select_state.recv.buffer.clear();
125                select_state.recv.buffer.shrink_to(0x2000);
126
127                P2pNetworkSelectState::handle_negotiated_token(
128                    state_context,
129                    select_kind,
130                    addr,
131                    data,
132                    fin,
133                    meta.time(),
134                )
135            }
136            P2pNetworkSelectAction::IncomingToken { kind, addr } => {
137                let Some(token) = select_state.tokens.pop_front() else {
138                    bug_condition!("Invalid state for action: {action:?}");
139                    return Ok(());
140                };
141                select_state.handle_incoming_token(token, meta.time(), select_kind);
142
143                let (dispatcher, state) = state_context.into_dispatcher_and_state();
144                let scheduler: &P2pNetworkSchedulerState = state.substate()?;
145                let select_state = scheduler
146                    .connection_state(&addr)
147                    .and_then(|stream| stream.select_state(&kind))
148                    .ok_or_else(|| format!("Select state not found for {action:?}"))?;
149
150                if let P2pNetworkSelectStateInner::Error(error) = &select_state.inner {
151                    dispatcher.push(P2pNetworkSchedulerAction::SelectError {
152                        addr,
153                        kind,
154                        error: error.to_owned(),
155                    });
156                    return Ok(());
157                }
158
159                if let Some(token) = &select_state.to_send {
160                    dispatcher.push(P2pNetworkSelectAction::OutgoingTokens {
161                        addr,
162                        kind,
163                        tokens: vec![token.clone()],
164                    })
165                }
166
167                if let Some(protocol) = select_state.negotiated {
168                    let p2p_state: &P2pState = state.substate()?;
169
170                    let expected_peer_id = p2p_state
171                        .peer_with_connection(addr)
172                        .map(|(peer_id, _)| peer_id);
173
174                    let incoming =
175                        matches!(&select_state.inner, P2pNetworkSelectStateInner::Responder);
176                    dispatcher.push(P2pNetworkSchedulerAction::SelectDone {
177                        addr,
178                        kind,
179                        protocol,
180                        incoming,
181                        expected_peer_id,
182                    })
183                }
184
185                Ok(())
186            }
187            P2pNetworkSelectAction::OutgoingTokens { addr, kind, tokens } => {
188                let dispatcher = state_context.into_dispatcher();
189
190                let mut data = {
191                    let mut data = vec![];
192                    for token in &tokens {
193                        data.extend_from_slice(token.name())
194                    }
195                    data.into()
196                };
197
198                match kind {
199                    SelectKind::Authentication => {
200                        fuzz_maybe!(&mut data, mutate_select_authentication);
201                        dispatcher.push(P2pNetworkPnetAction::OutgoingData { addr, data });
202                    }
203                    SelectKind::Multiplexing(_) | SelectKind::MultiplexingNoPeerId => {
204                        fuzz_maybe!(&mut data, mutate_select_multiplexing);
205                        dispatcher
206                            .push(P2pNetworkNoiseAction::OutgoingDataSelectMux { addr, data });
207                    }
208                    SelectKind::Stream(_, stream_id) => {
209                        if let Some(na) = tokens.iter().find(|t| t.name() == Token::Na.name()) {
210                            dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
211                                addr,
212                                stream_id,
213                                data: na.name().to_vec().into(),
214                                flags: YamuxFlags::FIN,
215                            });
216                        } else {
217                            for token in tokens {
218                                let data = fuzzed_maybe!(
219                                    token.name().to_vec().into(),
220                                    mutate_select_stream
221                                );
222                                dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
223                                    addr,
224                                    stream_id,
225                                    data,
226                                    flags: Default::default(),
227                                });
228                            }
229                        }
230                    }
231                }
232
233                Ok(())
234            }
235            P2pNetworkSelectAction::Timeout { addr, kind } => {
236                let error = "timeout".to_owned();
237                select_state.inner = P2pNetworkSelectStateInner::Error(error.clone());
238                let dispatcher = state_context.into_dispatcher();
239                dispatcher.push(P2pNetworkSchedulerAction::SelectError { addr, kind, error });
240                Ok(())
241            }
242        }
243    }
244
245    fn handle_incoming_data(&mut self, data: &Data) {
246        if self.negotiated.is_none() {
247            self.recv.put(data);
248            loop {
249                let parse_result = self.recv.parse_token();
250
251                match parse_result {
252                    Err(ParseTokenError) => {
253                        self.inner = P2pNetworkSelectStateInner::Error("parse_token".to_owned());
254                        self.recv.buffer.clear();
255                        self.recv.buffer.shrink_to(0x2000);
256                        break;
257                    }
258                    Ok(None) => break,
259                    Ok(Some(token)) => {
260                        let done = matches!(
261                            token,
262                            token::Token::Protocol(..) | token::Token::UnknownProtocol(..)
263                        );
264                        self.tokens.push_back(token);
265
266                        if done {
267                            break;
268                        }
269                    }
270                }
271            }
272        }
273    }
274
275    fn handle_incoming_token(&mut self, token: Token, time: Timestamp, kind: SelectKind) {
276        self.to_send = None;
277        match &self.inner {
278            P2pNetworkSelectStateInner::Error(_) => {}
279            P2pNetworkSelectStateInner::Initiator { proposing } => match token {
280                token::Token::Handshake => {}
281                token::Token::Na => {
282                    // TODO: check if we can propose alternative
283                    self.inner = P2pNetworkSelectStateInner::Error("token is NA".to_owned());
284                    self.negotiated = Some(None);
285                }
286                token::Token::SimultaneousConnect => {
287                    // unexpected token
288                    self.inner =
289                        P2pNetworkSelectStateInner::Error("simultaneous connect token".to_owned());
290                }
291                token::Token::Protocol(response) => {
292                    if response == *proposing {
293                        self.negotiated = Some(Some(response));
294                    } else {
295                        self.inner = P2pNetworkSelectStateInner::Error(format!(
296                            "protocol mismatch: {response:?} != {proposing:?}"
297                        ));
298                    }
299                }
300                token::Token::UnknownProtocol(name) => {
301                    // unexpected token
302                    self.inner = P2pNetworkSelectStateInner::Error(format!(
303                        "unknown protocol `{}`",
304                        String::from_utf8_lossy(&name)
305                    ));
306                    self.negotiated = Some(None);
307                }
308            },
309            P2pNetworkSelectStateInner::Uncertain { proposing } => match token {
310                token::Token::Handshake => {}
311                token::Token::Na => {
312                    let proposing = *proposing;
313                    self.inner = P2pNetworkSelectStateInner::Initiator { proposing };
314                }
315                token::Token::SimultaneousConnect => {
316                    // TODO: decide who is initiator
317                }
318                token::Token::Protocol(_) => {
319                    self.inner = P2pNetworkSelectStateInner::Error(
320                        "protocol mismatch: uncertain".to_owned(),
321                    );
322                }
323                token::Token::UnknownProtocol(name) => {
324                    self.inner = P2pNetworkSelectStateInner::Error(format!(
325                        "protocol mismatch: uncertain with unknown protocol {}",
326                        String::from_utf8_lossy(&name)
327                    ));
328                }
329            },
330            P2pNetworkSelectStateInner::Responder => match token {
331                token::Token::Handshake => {
332                    self.to_send = Some(token::Token::Handshake);
333                }
334                token::Token::Na => {}
335                token::Token::SimultaneousConnect => {
336                    self.to_send = Some(token::Token::Na);
337                }
338                token::Token::Protocol(protocol) => {
339                    let reply = match protocol {
340                        token::Protocol::Auth(_) => {
341                            token::Token::Protocol(token::Protocol::Auth(token::AuthKind::Noise))
342                        }
343                        token::Protocol::Mux(token::MuxKind::Yamux1_0_0) => {
344                            token::Token::Protocol(token::Protocol::Mux(token::MuxKind::Yamux1_0_0))
345                        }
346                        token::Protocol::Mux(token::MuxKind::YamuxNoNewLine1_0_0) => {
347                            token::Token::Protocol(token::Protocol::Mux(
348                                token::MuxKind::YamuxNoNewLine1_0_0,
349                            ))
350                        }
351                        token::Protocol::Stream(
352                            token::StreamKind::Rpc(_)
353                            | token::StreamKind::Discovery(_)
354                            | token::StreamKind::Broadcast(_)
355                            | token::StreamKind::Identify(_)
356                            | token::StreamKind::Ping(_)
357                            | token::StreamKind::Bitswap(_)
358                            | token::StreamKind::Status(_),
359                        ) => token::Token::Protocol(protocol),
360                    };
361                    let negotiated = if let token::Token::Protocol(p) = &reply {
362                        Some(*p)
363                    } else {
364                        None
365                    };
366                    self.negotiated = Some(negotiated);
367                    self.to_send = Some(reply);
368                }
369                token::Token::UnknownProtocol(name) => {
370                    const KNOWN_UNKNOWN_PROTOCOLS: [&str; 3] =
371                        ["/ipfs/id/push/1.0.0", "/ipfs/id/1.0.0", "/mina/node-status"];
372                    if !name.is_empty() {
373                        if let Ok(str) = std::str::from_utf8(&name[1..]) {
374                            let str = str.trim_end_matches('\n');
375                            if !KNOWN_UNKNOWN_PROTOCOLS.iter().any(|s| (*s).eq(str)) {
376                                self.inner = P2pNetworkSelectStateInner::Error(format!(
377                                    "responder with unknown protocol {}",
378                                    str
379                                ));
380
381                                error!(time; "unknown protocol: {str}, {kind:?}");
382                            }
383                        } else {
384                            self.inner = P2pNetworkSelectStateInner::Error(format!(
385                                "responder with invalid protocol data {:?}",
386                                name
387                            ));
388
389                            error!(time; "invalid protocol: {name:?}, {kind:?}");
390                        }
391                    } else {
392                        self.inner = P2pNetworkSelectStateInner::Error(
393                            "responder with empty protocol".to_string(),
394                        );
395
396                        error!(time; "empty protocol: {kind:?}");
397                    }
398                    self.to_send = Some(token::Token::Na);
399                    self.negotiated = Some(None);
400                }
401            },
402        }
403    }
404
405    fn handle_negotiated_token<Action, State>(
406        state_context: Substate<Action, State, P2pNetworkSchedulerState>,
407        select_kind: SelectKind,
408        addr: ConnectionAddr,
409        data: Data,
410        fin: bool,
411        time: Timestamp,
412    ) -> Result<(), String>
413    where
414        State: crate::P2pStateTrait,
415        Action: crate::P2pActionTrait<State>,
416    {
417        let (dispatcher, state) = state_context.into_dispatcher_and_state();
418        let p2p_state: &P2pState = state.substate()?;
419        let state: &P2pNetworkSchedulerState = state.substate()?;
420        let state = state
421            .connection_state(&addr)
422            .and_then(|state| state.select_state(&select_kind))
423            .ok_or_else(|| "Select state not found incoming payload".to_owned())?;
424
425        let Some(Some(negotiated)) = &state.negotiated else {
426            bug_condition!(
427                "Invalid negotiation state {:?} for incoming payload",
428                state.negotiated,
429            );
430            return Ok(());
431        };
432
433        match negotiated {
434            Protocol::Auth(AuthKind::Noise) => {
435                dispatcher.push(P2pNetworkNoiseAction::IncomingData { addr, data });
436            }
437            Protocol::Mux(MuxKind::Yamux1_0_0 | MuxKind::YamuxNoNewLine1_0_0) => {
438                dispatcher.push(P2pNetworkYamuxAction::IncomingData { addr, data });
439            }
440            Protocol::Stream(kind) => match select_kind {
441                SelectKind::Stream(peer_id, stream_id) => match kind {
442                    StreamKind::Discovery(DiscoveryAlgorithm::Kademlia1_0_0) => {
443                        if !fin {
444                            dispatcher.push(P2pNetworkKademliaStreamAction::IncomingData {
445                                addr,
446                                peer_id,
447                                stream_id,
448                                data,
449                            });
450                        } else {
451                            dispatcher.push(P2pNetworkKademliaStreamAction::RemoteClose {
452                                addr,
453                                peer_id,
454                                stream_id,
455                            });
456                        }
457                    }
458                    StreamKind::Identify(IdentifyAlgorithm::Identify1_0_0) => {
459                        if !fin {
460                            dispatcher.push(P2pNetworkIdentifyStreamAction::IncomingData {
461                                addr,
462                                peer_id,
463                                stream_id,
464                                data,
465                            });
466                        } else {
467                            dispatcher.push(P2pNetworkIdentifyStreamAction::RemoteClose {
468                                addr,
469                                peer_id,
470                                stream_id,
471                            });
472                        }
473                    }
474                    StreamKind::Broadcast(_) => {
475                        dispatcher.push(P2pNetworkPubsubAction::IncomingData {
476                            peer_id,
477                            addr,
478                            stream_id,
479                            data,
480                            seen_limit: p2p_state.config.meshsub.mcache_len,
481                        });
482                    }
483                    StreamKind::Rpc(RpcAlgorithm::Rpc0_0_1) => {
484                        dispatcher.push(P2pNetworkRpcAction::IncomingData {
485                            addr,
486                            peer_id,
487                            stream_id,
488                            data,
489                        });
490                    }
491                    _ => error!(time;
492                        "trying to negotiate unimplemented stream kind {kind:?}"
493                    ),
494                },
495                _ => error!(time; "invalid select protocol kind: {:?}", kind),
496            },
497        }
498
499        Ok(())
500    }
501}