p2p/network/select/
p2p_network_select_reducer.rs

1use openmina_core::{bug_condition, error, fuzz_maybe, fuzzed_maybe, Substate};
2use redux::Timestamp;
3use token::{
4    AuthKind, DiscoveryAlgorithm, IdentifyAlgorithm, MuxKind, Protocol, RpcAlgorithm, StreamKind,
5    Token,
6};
7
8use crate::{
9    fuzzer::{mutate_select_authentication, mutate_select_multiplexing, mutate_select_stream},
10    network::identify::P2pNetworkIdentifyStreamAction,
11    ConnectionAddr, Data, P2pNetworkKademliaStreamAction, P2pNetworkNoiseAction,
12    P2pNetworkPnetAction, P2pNetworkPubsubAction, P2pNetworkRpcAction, P2pNetworkSchedulerAction,
13    P2pNetworkSchedulerState, P2pNetworkYamuxAction, P2pState, YamuxFlags,
14};
15
16use self::{p2p_network_select_state::P2pNetworkSelectStateInner, token::ParseTokenError};
17
18use super::*;
19
20impl P2pNetworkSelectState {
21    pub fn reducer<State, Action>(
22        mut state_context: Substate<Action, State, P2pNetworkSchedulerState>,
23        action: redux::ActionWithMeta<P2pNetworkSelectAction>,
24    ) -> Result<(), String>
25    where
26        State: crate::P2pStateTrait,
27        Action: crate::P2pActionTrait<State>,
28    {
29        let (action, meta) = action.split();
30        let select_kind = action.select_kind();
31
32        let select_state = state_context
33            .get_substate_mut()?
34            .connection_state_mut(action.addr())
35            .and_then(|conn| conn.select_state_mut(&select_kind))
36            .ok_or_else(|| format!("Select state not found for {action:?}"))?;
37
38        if let P2pNetworkSelectStateInner::Error(_) = &select_state.inner {
39            return Ok(());
40        }
41
42        match action {
43            // hack for noise
44            P2pNetworkSelectAction::Init {
45                incoming,
46                addr,
47                kind,
48            } => {
49                match (&select_state.inner, incoming) {
50                    (P2pNetworkSelectStateInner::Initiator { .. }, true) => {
51                        select_state.inner = P2pNetworkSelectStateInner::Responder
52                    }
53                    (P2pNetworkSelectStateInner::Responder, false) => {
54                        select_state.inner = P2pNetworkSelectStateInner::Initiator {
55                            proposing: token::Protocol::Mux(token::MuxKind::YamuxNoNewLine1_0_0),
56                        }
57                    }
58                    _ => {}
59                }
60
61                let (dispatcher, state) = state_context.into_dispatcher_and_state();
62                let state: &P2pNetworkSchedulerState = state.substate()?;
63                let state = state
64                    .connection_state(&addr)
65                    .and_then(|state| state.select_state(&kind))
66                    .ok_or_else(|| format!("Select state not found for {action:?}"))?;
67
68                if state.negotiated.is_none() && !incoming {
69                    let mut tokens = vec![Token::Handshake];
70
71                    match &state.inner {
72                        P2pNetworkSelectStateInner::Uncertain { proposing } => {
73                            tokens.push(Token::SimultaneousConnect);
74                            tokens.push(Token::Protocol(*proposing));
75                        }
76                        P2pNetworkSelectStateInner::Initiator { proposing } => {
77                            tokens.push(Token::Protocol(*proposing));
78                        }
79                        _ => {}
80                    };
81                    dispatcher.push(P2pNetworkSelectAction::OutgoingTokens { addr, kind, tokens });
82                }
83
84                Ok(())
85            }
86            P2pNetworkSelectAction::IncomingData {
87                data, addr, fin, ..
88            }
89            | P2pNetworkSelectAction::IncomingDataAuth { data, addr, fin }
90            | P2pNetworkSelectAction::IncomingDataMux {
91                data, addr, fin, ..
92            } => {
93                select_state.handle_incoming_data(&data);
94
95                let (dispatcher, state) = state_context.into_dispatcher_and_state();
96
97                let state: &P2pNetworkSchedulerState = state.substate()?;
98                let select_state = state
99                    .connection_state(&addr)
100                    .and_then(|conn| conn.select_state(&select_kind))
101                    .ok_or("Select state not found for incoming data")?;
102
103                if let P2pNetworkSelectStateInner::Error(error) = &select_state.inner {
104                    dispatcher.push(P2pNetworkSchedulerAction::SelectError {
105                        addr,
106                        kind: select_kind,
107                        error: error.to_owned(),
108                    })
109                } else {
110                    for action in select_state.forward_incoming_data(select_kind, addr, data, fin) {
111                        dispatcher.push(action)
112                    }
113                }
114
115                Ok(())
116            }
117            P2pNetworkSelectAction::IncomingPayloadAuth { addr, fin, data }
118            | P2pNetworkSelectAction::IncomingPayloadMux {
119                addr, fin, data, ..
120            }
121            | P2pNetworkSelectAction::IncomingPayload {
122                addr, fin, data, ..
123            } => {
124                select_state.recv.buffer.clear();
125                select_state.recv.buffer.shrink_to(0x2000);
126
127                P2pNetworkSelectState::handle_negotiated_token(
128                    state_context,
129                    select_kind,
130                    addr,
131                    data,
132                    fin,
133                    meta.time(),
134                )
135            }
136            P2pNetworkSelectAction::IncomingToken { kind, addr } => {
137                let Some(token) = select_state.tokens.pop_front() else {
138                    bug_condition!("Invalid state for action: {action:?}");
139                    return Ok(());
140                };
141                select_state.handle_incoming_token(token, meta.time(), select_kind);
142
143                let (dispatcher, state) = state_context.into_dispatcher_and_state();
144                let scheduler: &P2pNetworkSchedulerState = state.substate()?;
145                let select_state = scheduler
146                    .connection_state(&addr)
147                    .and_then(|stream| stream.select_state(&kind))
148                    .ok_or_else(|| format!("Select state not found for {action:?}"))?;
149
150                if let P2pNetworkSelectStateInner::Error(error) = &select_state.inner {
151                    dispatcher.push(P2pNetworkSchedulerAction::SelectError {
152                        addr,
153                        kind,
154                        error: error.to_owned(),
155                    });
156                    return Ok(());
157                }
158
159                if let Some(token) = &select_state.to_send {
160                    dispatcher.push(P2pNetworkSelectAction::OutgoingTokens {
161                        addr,
162                        kind,
163                        tokens: vec![token.clone()],
164                    })
165                }
166
167                if let Some(protocol) = select_state.negotiated {
168                    let p2p_state: &P2pState = state.substate()?;
169
170                    let expected_peer_id = p2p_state
171                        .peer_with_connection(addr)
172                        .map(|(peer_id, _)| peer_id);
173
174                    let incoming = matches!(
175                        &select_state.inner,
176                        P2pNetworkSelectStateInner::Responder { .. }
177                    );
178                    dispatcher.push(P2pNetworkSchedulerAction::SelectDone {
179                        addr,
180                        kind,
181                        protocol,
182                        incoming,
183                        expected_peer_id,
184                    })
185                }
186
187                Ok(())
188            }
189            P2pNetworkSelectAction::OutgoingTokens { addr, kind, tokens } => {
190                let dispatcher = state_context.into_dispatcher();
191
192                let mut data = {
193                    let mut data = vec![];
194                    for token in &tokens {
195                        data.extend_from_slice(token.name())
196                    }
197                    data.into()
198                };
199
200                match kind {
201                    SelectKind::Authentication => {
202                        fuzz_maybe!(&mut data, mutate_select_authentication);
203                        dispatcher.push(P2pNetworkPnetAction::OutgoingData { addr, data });
204                    }
205                    SelectKind::Multiplexing(_) | SelectKind::MultiplexingNoPeerId => {
206                        fuzz_maybe!(&mut data, mutate_select_multiplexing);
207                        dispatcher
208                            .push(P2pNetworkNoiseAction::OutgoingDataSelectMux { addr, data });
209                    }
210                    SelectKind::Stream(_, stream_id) => {
211                        if let Some(na) = tokens.iter().find(|t| t.name() == Token::Na.name()) {
212                            dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
213                                addr,
214                                stream_id,
215                                data: na.name().to_vec().into(),
216                                flags: YamuxFlags::FIN,
217                            });
218                        } else {
219                            for token in tokens {
220                                let data = fuzzed_maybe!(
221                                    token.name().to_vec().into(),
222                                    mutate_select_stream
223                                );
224                                dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
225                                    addr,
226                                    stream_id,
227                                    data,
228                                    flags: Default::default(),
229                                });
230                            }
231                        }
232                    }
233                }
234
235                Ok(())
236            }
237            P2pNetworkSelectAction::Timeout { addr, kind } => {
238                let error = "timeout".to_owned();
239                select_state.inner = P2pNetworkSelectStateInner::Error(error.clone());
240                let dispatcher = state_context.into_dispatcher();
241                dispatcher.push(P2pNetworkSchedulerAction::SelectError { addr, kind, error });
242                Ok(())
243            }
244        }
245    }
246
247    fn handle_incoming_data(&mut self, data: &Data) {
248        if self.negotiated.is_none() {
249            self.recv.put(data);
250            loop {
251                let parse_result = self.recv.parse_token();
252
253                match parse_result {
254                    Err(ParseTokenError) => {
255                        self.inner = P2pNetworkSelectStateInner::Error("parse_token".to_owned());
256                        self.recv.buffer.clear();
257                        self.recv.buffer.shrink_to(0x2000);
258                        break;
259                    }
260                    Ok(None) => break,
261                    Ok(Some(token)) => {
262                        let done = matches!(
263                            token,
264                            token::Token::Protocol(..) | token::Token::UnknownProtocol(..)
265                        );
266                        self.tokens.push_back(token);
267
268                        if done {
269                            break;
270                        }
271                    }
272                }
273            }
274        }
275    }
276
277    fn handle_incoming_token(&mut self, token: Token, time: Timestamp, kind: SelectKind) {
278        self.to_send = None;
279        match &self.inner {
280            P2pNetworkSelectStateInner::Error(_) => {}
281            P2pNetworkSelectStateInner::Initiator { proposing } => match token {
282                token::Token::Handshake => {}
283                token::Token::Na => {
284                    // TODO: check if we can propose alternative
285                    self.inner = P2pNetworkSelectStateInner::Error("token is NA".to_owned());
286                    self.negotiated = Some(None);
287                }
288                token::Token::SimultaneousConnect => {
289                    // unexpected token
290                    self.inner =
291                        P2pNetworkSelectStateInner::Error("simultaneous connect token".to_owned());
292                }
293                token::Token::Protocol(response) => {
294                    if response == *proposing {
295                        self.negotiated = Some(Some(response));
296                    } else {
297                        self.inner = P2pNetworkSelectStateInner::Error(format!(
298                            "protocol mismatch: {response:?} != {proposing:?}"
299                        ));
300                    }
301                }
302                token::Token::UnknownProtocol(name) => {
303                    // unexpected token
304                    self.inner = P2pNetworkSelectStateInner::Error(format!(
305                        "unknown protocol `{}`",
306                        String::from_utf8_lossy(&name)
307                    ));
308                    self.negotiated = Some(None);
309                }
310            },
311            P2pNetworkSelectStateInner::Uncertain { proposing } => match token {
312                token::Token::Handshake => {}
313                token::Token::Na => {
314                    let proposing = *proposing;
315                    self.inner = P2pNetworkSelectStateInner::Initiator { proposing };
316                }
317                token::Token::SimultaneousConnect => {
318                    // TODO: decide who is initiator
319                }
320                token::Token::Protocol(_) => {
321                    self.inner = P2pNetworkSelectStateInner::Error(
322                        "protocol mismatch: uncertain".to_owned(),
323                    );
324                }
325                token::Token::UnknownProtocol(name) => {
326                    self.inner = P2pNetworkSelectStateInner::Error(format!(
327                        "protocol mismatch: uncertain with unknown protocol {}",
328                        String::from_utf8_lossy(&name)
329                    ));
330                }
331            },
332            P2pNetworkSelectStateInner::Responder => match token {
333                token::Token::Handshake => {
334                    self.to_send = Some(token::Token::Handshake);
335                }
336                token::Token::Na => {}
337                token::Token::SimultaneousConnect => {
338                    self.to_send = Some(token::Token::Na);
339                }
340                token::Token::Protocol(protocol) => {
341                    let reply = match protocol {
342                        token::Protocol::Auth(_) => {
343                            token::Token::Protocol(token::Protocol::Auth(token::AuthKind::Noise))
344                        }
345                        token::Protocol::Mux(token::MuxKind::Yamux1_0_0) => {
346                            token::Token::Protocol(token::Protocol::Mux(token::MuxKind::Yamux1_0_0))
347                        }
348                        token::Protocol::Mux(token::MuxKind::YamuxNoNewLine1_0_0) => {
349                            token::Token::Protocol(token::Protocol::Mux(
350                                token::MuxKind::YamuxNoNewLine1_0_0,
351                            ))
352                        }
353                        token::Protocol::Stream(
354                            token::StreamKind::Rpc(_)
355                            | token::StreamKind::Discovery(_)
356                            | token::StreamKind::Broadcast(_)
357                            | token::StreamKind::Identify(_)
358                            | token::StreamKind::Ping(_)
359                            | token::StreamKind::Bitswap(_)
360                            | token::StreamKind::Status(_),
361                        ) => token::Token::Protocol(protocol),
362                    };
363                    let negotiated = if let token::Token::Protocol(p) = &reply {
364                        Some(*p)
365                    } else {
366                        None
367                    };
368                    self.negotiated = Some(negotiated);
369                    self.to_send = Some(reply);
370                }
371                token::Token::UnknownProtocol(name) => {
372                    const KNOWN_UNKNOWN_PROTOCOLS: [&str; 3] =
373                        ["/ipfs/id/push/1.0.0", "/ipfs/id/1.0.0", "/mina/node-status"];
374                    if !name.is_empty() {
375                        if let Ok(str) = std::str::from_utf8(&name[1..]) {
376                            let str = str.trim_end_matches('\n');
377                            if !KNOWN_UNKNOWN_PROTOCOLS.iter().any(|s| (*s).eq(str)) {
378                                self.inner = P2pNetworkSelectStateInner::Error(format!(
379                                    "responder with unknown protocol {}",
380                                    str
381                                ));
382
383                                error!(time; "unknown protocol: {str}, {kind:?}");
384                            }
385                        } else {
386                            self.inner = P2pNetworkSelectStateInner::Error(format!(
387                                "responder with invalid protocol data {:?}",
388                                name
389                            ));
390
391                            error!(time; "invalid protocol: {name:?}, {kind:?}");
392                        }
393                    } else {
394                        self.inner = P2pNetworkSelectStateInner::Error(
395                            "responder with empty protocol".to_string(),
396                        );
397
398                        error!(time; "empty protocol: {kind:?}");
399                    }
400                    self.to_send = Some(token::Token::Na);
401                    self.negotiated = Some(None);
402                }
403            },
404        }
405    }
406
407    fn handle_negotiated_token<Action, State>(
408        state_context: Substate<Action, State, P2pNetworkSchedulerState>,
409        select_kind: SelectKind,
410        addr: ConnectionAddr,
411        data: Data,
412        fin: bool,
413        time: Timestamp,
414    ) -> Result<(), String>
415    where
416        State: crate::P2pStateTrait,
417        Action: crate::P2pActionTrait<State>,
418    {
419        let (dispatcher, state) = state_context.into_dispatcher_and_state();
420        let p2p_state: &P2pState = state.substate()?;
421        let state: &P2pNetworkSchedulerState = state.substate()?;
422        let state = state
423            .connection_state(&addr)
424            .and_then(|state| state.select_state(&select_kind))
425            .ok_or_else(|| "Select state not found incoming payload".to_owned())?;
426
427        let Some(Some(negotiated)) = &state.negotiated else {
428            bug_condition!(
429                "Invalid negotiation state {:?} for incoming payload",
430                state.negotiated,
431            );
432            return Ok(());
433        };
434
435        match negotiated {
436            Protocol::Auth(AuthKind::Noise) => {
437                dispatcher.push(P2pNetworkNoiseAction::IncomingData { addr, data });
438            }
439            Protocol::Mux(MuxKind::Yamux1_0_0 | MuxKind::YamuxNoNewLine1_0_0) => {
440                dispatcher.push(P2pNetworkYamuxAction::IncomingData { addr, data });
441            }
442            Protocol::Stream(kind) => match select_kind {
443                SelectKind::Stream(peer_id, stream_id) => match kind {
444                    StreamKind::Discovery(DiscoveryAlgorithm::Kademlia1_0_0) => {
445                        if !fin {
446                            dispatcher.push(P2pNetworkKademliaStreamAction::IncomingData {
447                                addr,
448                                peer_id,
449                                stream_id,
450                                data,
451                            });
452                        } else {
453                            dispatcher.push(P2pNetworkKademliaStreamAction::RemoteClose {
454                                addr,
455                                peer_id,
456                                stream_id,
457                            });
458                        }
459                    }
460                    StreamKind::Identify(IdentifyAlgorithm::Identify1_0_0) => {
461                        if !fin {
462                            dispatcher.push(P2pNetworkIdentifyStreamAction::IncomingData {
463                                addr,
464                                peer_id,
465                                stream_id,
466                                data,
467                            });
468                        } else {
469                            dispatcher.push(P2pNetworkIdentifyStreamAction::RemoteClose {
470                                addr,
471                                peer_id,
472                                stream_id,
473                            });
474                        }
475                    }
476                    StreamKind::Broadcast(_) => {
477                        dispatcher.push(P2pNetworkPubsubAction::IncomingData {
478                            peer_id,
479                            addr,
480                            stream_id,
481                            data,
482                            seen_limit: p2p_state.config.meshsub.mcache_len,
483                        });
484                    }
485                    StreamKind::Rpc(RpcAlgorithm::Rpc0_0_1) => {
486                        dispatcher.push(P2pNetworkRpcAction::IncomingData {
487                            addr,
488                            peer_id,
489                            stream_id,
490                            data,
491                        });
492                    }
493                    _ => error!(time;
494                        "trying to negotiate unimplemented stream kind {kind:?}"
495                    ),
496                },
497                _ => error!(time; "invalid select protocol kind: {:?}", kind),
498            },
499        }
500
501        Ok(())
502    }
503}