p2p/network/scheduler/
p2p_network_scheduler_reducer.rs

1use std::{collections::BTreeMap, sync::OnceLock};
2
3use identify::P2pNetworkIdentifyStreamAction;
4use openmina_core::{bug_condition, error, warn, Substate};
5use redux::Dispatcher;
6use request::{P2pNetworkKadRequestState, P2pNetworkKadRequestStatus};
7use token::{
8    AuthKind, DiscoveryAlgorithm, IdentifyAlgorithm, MuxKind, PingAlgorithm, Protocol,
9    RpcAlgorithm, StreamKind,
10};
11
12use crate::{
13    connection::{
14        incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction,
15        P2pConnectionState,
16    },
17    disconnection::P2pDisconnectionAction,
18    identify::P2pIdentifyAction,
19    P2pConfig, P2pPeerStatus, P2pState, PeerId,
20};
21
22use super::{super::*, p2p_network_scheduler_state::P2pNetworkConnectionState, *};
23
24impl P2pNetworkSchedulerState {
25    pub fn reducer<Action, State>(
26        mut state_context: Substate<Action, State, Self>,
27        action: redux::ActionWithMeta<P2pNetworkSchedulerAction>,
28    ) -> Result<(), String>
29    where
30        State: crate::P2pStateTrait,
31        Action: crate::P2pActionTrait<State>,
32    {
33        let (action, meta) = action.split();
34        let scheduler_state = state_context.get_substate_mut()?;
35
36        match action {
37            P2pNetworkSchedulerAction::InterfaceDetected { ip, .. } => {
38                scheduler_state.interfaces.insert(ip);
39
40                let (dispatcher, state) = state_context.into_dispatcher_and_state();
41                let p2p_config: &P2pConfig = state.substate()?;
42
43                if let Some(port) = p2p_config.libp2p_port {
44                    dispatcher
45                        .push(P2pNetworkSchedulerEffectfulAction::InterfaceDetected { ip, port });
46                }
47
48                Ok(())
49            }
50            P2pNetworkSchedulerAction::InterfaceExpired { ip, .. } => {
51                scheduler_state.interfaces.remove(&ip);
52                Ok(())
53            }
54            P2pNetworkSchedulerAction::ListenerReady { listener } => {
55                scheduler_state.listeners.insert(listener);
56                Ok(())
57            }
58            P2pNetworkSchedulerAction::ListenerError { listener, .. } => {
59                scheduler_state.listeners.remove(&listener);
60                Ok(())
61            }
62            P2pNetworkSchedulerAction::IncomingDataIsReady { addr } => {
63                let (dispatcher, state) = state_context.into_dispatcher_and_state();
64                let scheduler: &Self = state.substate()?;
65                let Some(connection_state) = scheduler.connection_state(&addr) else {
66                    bug_condition!(
67                        "Invalid state for `P2pNetworkSchedulerAction::IncomingDataIsReady`"
68                    );
69                    return Ok(());
70                };
71
72                let limit = connection_state.limit();
73                if limit > 0 {
74                    dispatcher.push(P2pNetworkSchedulerEffectfulAction::IncomingDataIsReady {
75                        addr,
76                        limit,
77                    });
78                }
79
80                Ok(())
81            }
82            P2pNetworkSchedulerAction::IncomingDidAccept { addr, result } => {
83                if let Some(addr) = addr {
84                    scheduler_state.connections.insert(
85                        addr,
86                        P2pNetworkConnectionState {
87                            incoming: true,
88                            pnet: P2pNetworkPnetState::new(scheduler_state.pnet_key, meta.time()),
89                            select_auth: P2pNetworkSelectState::default(),
90                            auth: None,
91                            select_mux: P2pNetworkSelectState::default(),
92                            mux: None,
93                            streams: BTreeMap::default(),
94                            closed: None,
95                            limit: P2pNetworkConnectionState::INITIAL_LIMIT,
96                        },
97                    );
98                };
99
100                let dispatcher = state_context.into_dispatcher();
101                if let Some(addr) = addr {
102                    dispatcher.push(P2pNetworkSchedulerEffectfulAction::IncomingDidAccept {
103                        addr,
104                        result,
105                    });
106                }
107
108                Ok(())
109            }
110            P2pNetworkSchedulerAction::OutgoingConnect { addr } => {
111                scheduler_state.connections.insert(
112                    ConnectionAddr {
113                        sock_addr: addr,
114                        incoming: false,
115                    },
116                    P2pNetworkConnectionState {
117                        incoming: false,
118                        pnet: P2pNetworkPnetState::new(scheduler_state.pnet_key, meta.time()),
119                        select_auth: P2pNetworkSelectState::initiator_auth(
120                            token::AuthKind::Noise,
121                            meta.time(),
122                        ),
123                        auth: None,
124                        select_mux: P2pNetworkSelectState::initiator_mux(
125                            token::MuxKind::Yamux1_0_0,
126                            meta.time(),
127                        ),
128                        mux: None,
129                        streams: BTreeMap::default(),
130                        closed: None,
131                        limit: P2pNetworkConnectionState::INITIAL_LIMIT,
132                    },
133                );
134
135                let dispatcher = state_context.into_dispatcher();
136                dispatcher.push(P2pNetworkSchedulerEffectfulAction::OutgoingConnect { addr });
137                Ok(())
138            }
139            P2pNetworkSchedulerAction::OutgoingDidConnect { addr, result } => {
140                // TODO: change to connected
141
142                let (dispatcher, state) = state_context.into_dispatcher_and_state();
143                let p2p_state: &P2pState = state.substate()?;
144
145                match result {
146                    Ok(()) => {
147                        dispatcher
148                            .push(P2pNetworkSchedulerEffectfulAction::OutgoingDidConnect { addr });
149                    }
150                    Err(error) => {
151                        let Some((peer_id, peer_state)) = p2p_state.peer_with_connection(addr)
152                        else {
153                            bug_condition!(
154                                "outgoing connection to {addr} failed, but there is no peer for it"
155                            );
156                            return Ok(());
157                        };
158                        if matches!(
159                            peer_state.status,
160                            P2pPeerStatus::Connecting(P2pConnectionState::Outgoing(_))
161                        ) {
162                            dispatcher.push(P2pConnectionOutgoingAction::FinalizeError {
163                                peer_id,
164                                error: error.to_string(),
165                            });
166                        } else {
167                            bug_condition!("Invalid status for `P2pNetworkSchedulerAction::OutgoingDidConnect`: {:?}", peer_state.status);
168                        }
169                    }
170                }
171                Ok(())
172            }
173            P2pNetworkSchedulerAction::IncomingDataDidReceive { result, addr } => {
174                // since both actions dispatcher later require connection state, if we can't find it we shouldn't dispatcher them
175                let Some(state) = scheduler_state.connection_state_mut(&addr) else {
176                    bug_condition!("Unable to find connection for `P2pNetworkSchedulerAction::IncomingDataDidReceive`");
177                    return Ok(());
178                };
179
180                if let Ok(data) = &result {
181                    state.consume(data.len());
182                };
183
184                let dispatcher = state_context.into_dispatcher();
185                match result {
186                    Ok(data) => {
187                        dispatcher.push(P2pNetworkPnetAction::IncomingData { addr, data });
188                    }
189                    Err(error) => dispatcher.push(P2pNetworkSchedulerAction::Error {
190                        addr,
191                        error: P2pNetworkConnectionError::MioError(error),
192                    }),
193                }
194                Ok(())
195            }
196            P2pNetworkSchedulerAction::SelectDone {
197                addr,
198                kind,
199                protocol,
200                incoming,
201                expected_peer_id,
202            } => {
203                scheduler_state.reducer_select_done(
204                    addr,
205                    kind,
206                    protocol,
207                    incoming,
208                    expected_peer_id,
209                );
210
211                let (dispatcher, state) = state_context.into_dispatcher_and_state();
212                let p2p_state: &P2pState = state.substate()?;
213                Self::forward_select_done(dispatcher, p2p_state, protocol, addr, incoming, kind);
214                Ok(())
215            }
216            P2pNetworkSchedulerAction::SelectError { addr, kind, .. } => {
217                let dispatcher = state_context.into_dispatcher();
218
219                match kind {
220                    SelectKind::Stream(peer_id, stream_id)
221                        if keep_connection_with_unknown_stream() =>
222                    {
223                        warn!(meta.time(); summary="select error for stream", addr = display(addr), peer_id = display(peer_id));
224                        // just close the stream
225                        dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
226                            addr,
227                            stream_id,
228                            data: Data::default(),
229                            flags: YamuxFlags::RST,
230                        });
231                        dispatcher
232                            .push(P2pNetworkSchedulerAction::PruneStream { peer_id, stream_id });
233                    }
234                    _ => {
235                        dispatcher.push(P2pNetworkSchedulerAction::Error {
236                            addr,
237                            error: P2pNetworkConnectionError::SelectError,
238                        });
239                    }
240                }
241
242                dispatcher.push(P2pNetworkSchedulerEffectfulAction::Disconnect {
243                    addr,
244                    reason: P2pNetworkConnectionCloseReason::Error(
245                        P2pNetworkConnectionError::SelectError,
246                    ),
247                });
248
249                Ok(())
250            }
251            P2pNetworkSchedulerAction::YamuxDidInit {
252                addr,
253                peer_id,
254                message_size_limit,
255                pending_outgoing_limit,
256            } => {
257                let Some(cn) = scheduler_state.connections.get_mut(&addr) else {
258                    bug_condition!(
259                        "Missing connection state for `P2pNetworkSchedulerAction::YamuxDidInit`"
260                    );
261                    return Ok(());
262                };
263                if let Some(P2pNetworkConnectionMuxState::Yamux(yamux)) = &mut cn.mux {
264                    yamux.init = true;
265                    yamux.message_size_limit = message_size_limit;
266                    yamux.pending_outgoing_limit = pending_outgoing_limit;
267                }
268
269                let incoming = cn.incoming;
270                let dispatcher = state_context.into_dispatcher();
271
272                if incoming {
273                    dispatcher.push(P2pConnectionIncomingAction::Libp2pReceived { peer_id });
274                } else {
275                    dispatcher.push(P2pConnectionOutgoingAction::FinalizeSuccess {
276                        peer_id,
277                        remote_auth: None,
278                    });
279                }
280
281                dispatcher.push(P2pIdentifyAction::NewRequest { peer_id, addr });
282                Ok(())
283            }
284            P2pNetworkSchedulerAction::Disconnect { addr, reason } => {
285                let Some(conn_state) = scheduler_state.connections.get_mut(&addr) else {
286                    bug_condition!(
287                        "`P2pNetworkSchedulerAction::Disconnect`: connection {addr} does not exist"
288                    );
289                    return Ok(());
290                };
291                if conn_state.closed.is_some() {
292                    bug_condition!(
293                        "`P2pNetworkSchedulerAction::Disconnect`: {addr} already disconnected"
294                    );
295                    return Ok(());
296                }
297                conn_state.closed = Some(reason.clone().into());
298
299                let dispatcher = state_context.into_dispatcher();
300                dispatcher.push(P2pNetworkSchedulerEffectfulAction::Disconnect {
301                    addr,
302                    reason: P2pNetworkConnectionCloseReason::Disconnect(reason),
303                });
304
305                Ok(())
306            }
307            P2pNetworkSchedulerAction::Error { addr, error } => {
308                let Some(conn_state) = scheduler_state.connections.get_mut(&addr) else {
309                    bug_condition!(
310                        "`P2pNetworkSchedulerAction::Error`: connection {addr} does not exist"
311                    );
312                    return Ok(());
313                };
314                if conn_state.closed.is_some() {
315                    bug_condition!(
316                        "`P2pNetworkSchedulerAction::Error`: {addr} already disconnected"
317                    );
318                    return Ok(());
319                }
320                conn_state.closed = Some(error.clone().into());
321
322                let dispatcher = state_context.into_dispatcher();
323                dispatcher.push(P2pNetworkSchedulerEffectfulAction::Disconnect {
324                    addr,
325                    reason: P2pNetworkConnectionCloseReason::Error(error),
326                });
327                Ok(())
328            }
329            P2pNetworkSchedulerAction::Disconnected { addr, reason } => {
330                let Some(cn) = scheduler_state.connections.get_mut(&addr) else {
331                    bug_condition!(
332                        "P2pNetworkSchedulerAction::Disconnected: connection {addr} does not exist"
333                    );
334                    return Ok(());
335                };
336                if cn.closed.is_none() {
337                    bug_condition!(
338                        "P2pNetworkSchedulerAction::Disconnect: {addr} is not disconnected"
339                    );
340                }
341
342                let incoming = cn.incoming;
343                let (dispatcher, state) = state_context.into_dispatcher_and_state();
344                let state: &P2pState = state.substate()?;
345
346                let peer_with_state = state.peer_with_connection(addr);
347
348                if reason.is_disconnected() {
349                    // statemachine behaviour should continue with this, i.e. dispatch P2pDisconnectionAction::Finish
350                    return Ok(());
351                }
352
353                match peer_with_state {
354                    Some((peer_id, peer_state)) => {
355                        // TODO: connection state type should tell if it is finalized
356                        match &peer_state.status {
357                            P2pPeerStatus::Connecting(
358                                crate::connection::P2pConnectionState::Incoming(_),
359                            ) => {
360                                dispatcher.push(P2pConnectionIncomingAction::FinalizeError {
361                                    peer_id,
362                                    error: reason.to_string(),
363                                });
364                            }
365                            P2pPeerStatus::Connecting(
366                                crate::connection::P2pConnectionState::Outgoing(_),
367                            ) => {
368                                dispatcher.push(P2pConnectionOutgoingAction::FinalizeError {
369                                    peer_id,
370                                    error: reason.to_string(),
371                                });
372                            }
373                            P2pPeerStatus::Disconnecting { .. } => {}
374                            P2pPeerStatus::Disconnected { .. } => {
375                                // sanity check, should be incoming connection
376                                if !incoming {
377                                    error!(meta.time(); "disconnected peer connection for address {addr}");
378                                } else {
379                                    // TODO: introduce action for incoming connection finalization without peer_id
380                                }
381                            }
382                            P2pPeerStatus::Ready(_) => {
383                                dispatcher.push(P2pDisconnectionAction::Finish { peer_id });
384                            }
385                        }
386                    }
387                    None => {
388                        // sanity check, should be incoming connection
389                        if !incoming {
390                            // TODO: error!(meta.time(); "non-existing peer connection for address {addr}");
391                        } else {
392                            // TODO: introduce action for incoming connection finalization without peer_id
393                        }
394                    }
395                }
396                Ok(())
397            }
398            P2pNetworkSchedulerAction::Prune { addr } => {
399                if let Some(old) = scheduler_state.connections.remove(&addr) {
400                    if let Some(peer_id) = old.peer_id() {
401                        scheduler_state.prune_peer_state(peer_id);
402                    }
403                }
404                Ok(())
405            }
406            P2pNetworkSchedulerAction::PruneStream { peer_id, stream_id } => {
407                let Some((_, conn_state)) = scheduler_state
408                    .connections
409                    .iter_mut()
410                    .find(|(_, conn_state)| conn_state.peer_id() == Some(&peer_id))
411                else {
412                    bug_condition!("PruneStream: peer {peer_id} not found");
413                    return Ok(());
414                };
415
416                if conn_state.streams.remove(&stream_id).is_none() {
417                    bug_condition!("PruneStream: peer {peer_id} does not have stream {stream_id}");
418                }
419
420                Ok(())
421            }
422            P2pNetworkSchedulerAction::IncomingConnectionIsReady { listener } => {
423                let (dispatcher, state) = state_context.into_dispatcher_and_state();
424                let p2p_state: &P2pState = state.substate()?;
425
426                let should_accept = p2p_state.network.scheduler.connections.len()
427                    < p2p_state.config.limits.max_connections();
428
429                dispatcher.push(
430                    P2pNetworkSchedulerEffectfulAction::IncomingConnectionIsReady {
431                        listener,
432                        should_accept,
433                    },
434                );
435                Ok(())
436            }
437        }
438    }
439
440    fn reducer_select_done(
441        &mut self,
442        addr: ConnectionAddr,
443        kind: SelectKind,
444        protocol: Option<Protocol>,
445        incoming: bool,
446        expected_peer_id: Option<PeerId>,
447    ) {
448        let Some(connection) = self.connections.get_mut(&addr) else {
449            bug_condition!("Missing connection state for `P2pNetworkSchedulerAction::SelectDone`");
450            return;
451        };
452
453        match protocol {
454            Some(token::Protocol::Auth(token::AuthKind::Noise)) => {
455                connection.auth = Some(P2pNetworkAuthState::Noise(P2pNetworkNoiseState::new(
456                    self.local_pk.clone(),
457                    expected_peer_id,
458                )));
459            }
460            Some(token::Protocol::Mux(
461                token::MuxKind::Yamux1_0_0 | token::MuxKind::YamuxNoNewLine1_0_0,
462            )) => {
463                connection.mux = Some(P2pNetworkConnectionMuxState::Yamux(P2pNetworkYamuxState {
464                    init: true,
465                    ..Default::default()
466                }));
467            }
468            Some(token::Protocol::Stream(stream_kind)) => {
469                let SelectKind::Stream(peer_id, stream_id) = kind else {
470                    bug_condition!(
471                        "incorrect stream kind {kind:?} for protocol stream: {stream_kind:?}"
472                    );
473                    return;
474                };
475                match stream_kind {
476                    token::StreamKind::Rpc(_) => {
477                        if incoming {
478                            self.rpc_incoming_streams
479                                .entry(peer_id)
480                                .or_default()
481                                .insert(stream_id, P2pNetworkRpcState::new(addr, stream_id));
482                        } else {
483                            self.rpc_outgoing_streams
484                                .entry(peer_id)
485                                .or_default()
486                                .insert(stream_id, P2pNetworkRpcState::new(addr, stream_id));
487                        }
488                    }
489                    token::StreamKind::Broadcast(_) => {}
490                    token::StreamKind::Identify(_) => {}
491                    token::StreamKind::Discovery(_) => {}
492                    token::StreamKind::Ping(_) => {}
493                    token::StreamKind::Bitswap(_) => {}
494                    token::StreamKind::Status(_) => {}
495                }
496            }
497            None => {}
498        }
499    }
500
501    fn forward_select_done<Action, State>(
502        dispatcher: &mut Dispatcher<Action, State>,
503        p2p_state: &P2pState,
504        protocol: Option<Protocol>,
505        addr: ConnectionAddr,
506        incoming: bool,
507        select_kind: SelectKind,
508    ) where
509        State: crate::P2pStateTrait,
510        Action: crate::P2pActionTrait<State>,
511    {
512        match protocol {
513            Some(Protocol::Auth(AuthKind::Noise)) => {
514                dispatcher
515                    .push(P2pNetworkSchedulerEffectfulAction::NoiseSelectDone { addr, incoming });
516            }
517            Some(Protocol::Mux(MuxKind::Yamux1_0_0 | MuxKind::YamuxNoNewLine1_0_0)) => {
518                let SelectKind::Multiplexing(peer_id) = select_kind else {
519                    bug_condition!("wrong kind for multiplexing protocol action: {select_kind:?}");
520                    return;
521                };
522                let message_size_limit = p2p_state.config.limits.yamux_message_size();
523                let pending_outgoing_limit =
524                    p2p_state.config.limits.yamux_pending_outgoing_per_peer();
525                dispatcher.push(P2pNetworkSchedulerAction::YamuxDidInit {
526                    addr,
527                    peer_id,
528                    message_size_limit,
529                    pending_outgoing_limit,
530                });
531            }
532            Some(Protocol::Stream(kind)) => {
533                let SelectKind::Stream(peer_id, stream_id) = select_kind else {
534                    bug_condition!("wrong kind for stream protocol action: {kind:?}");
535                    return;
536                };
537                match kind {
538                    StreamKind::Status(_)
539                    | StreamKind::Identify(IdentifyAlgorithm::IdentifyPush1_0_0)
540                    | StreamKind::Bitswap(_)
541                    | StreamKind::Ping(PingAlgorithm::Ping1_0_0) => {
542                        //unimplemented!()
543                    }
544                    StreamKind::Identify(IdentifyAlgorithm::Identify1_0_0) => {
545                        dispatcher.push(P2pNetworkIdentifyStreamAction::New {
546                            addr,
547                            peer_id,
548                            stream_id,
549                            incoming,
550                        });
551                    }
552
553                    StreamKind::Broadcast(protocol) => {
554                        dispatcher.push(P2pNetworkPubsubAction::NewStream {
555                            incoming,
556                            peer_id,
557                            addr,
558                            stream_id,
559                            protocol,
560                        });
561                    }
562                    StreamKind::Discovery(DiscoveryAlgorithm::Kademlia1_0_0) => {
563                        if let Some(discovery_state) = p2p_state.network.scheduler.discovery_state()
564                        {
565                            let request = !incoming && discovery_state.request(&peer_id).is_some();
566                            dispatcher.push(P2pNetworkKademliaStreamAction::New {
567                                addr,
568                                peer_id,
569                                stream_id,
570                                incoming,
571                            });
572                            // if our node initiated a request to the peer, notify that the stream is ready.
573                            if request {
574                                dispatcher.push(P2pNetworkKadRequestAction::StreamReady {
575                                    peer_id,
576                                    addr,
577                                    stream_id,
578                                    callback: redux::callback!(
579                                        on_p2p_network_kad_request_stream_ready((
580                                            addr: ConnectionAddr,
581                                            peer_id: PeerId,
582                                            stream_id: StreamId,
583                                            data: P2pNetworkKademliaRpcRequest
584                                        )) -> crate::P2pAction{
585                                            P2pNetworkKademliaStreamAction::SendRequest {
586                                                addr,
587                                                peer_id,
588                                                stream_id,
589                                                data
590                                            }
591                                        }
592                                    ),
593                                });
594                            }
595                        }
596                    }
597                    StreamKind::Rpc(RpcAlgorithm::Rpc0_0_1) => {
598                        dispatcher.push(P2pNetworkRpcAction::Init {
599                            addr,
600                            peer_id,
601                            stream_id,
602                            incoming,
603                        });
604                    }
605                }
606            }
607            None => {
608                match &select_kind {
609                    SelectKind::Authentication => {
610                        // TODO: close the connection
611                    }
612                    SelectKind::MultiplexingNoPeerId => {
613                        bug_condition!("`SelectKind::MultiplexingNoPeerId` not handled");
614                        // WARNING: must not happen
615                    }
616                    SelectKind::Multiplexing(_) => {
617                        // TODO: close the connection
618                    }
619                    SelectKind::Stream(peer_id, stream_id) => {
620                        if let Some(discovery_state) = p2p_state.network.scheduler.discovery_state()
621                        {
622                            if let Some(P2pNetworkKadRequestState {
623                                status: P2pNetworkKadRequestStatus::WaitingForKadStream(id),
624                                ..
625                            }) = discovery_state.request(peer_id)
626                            {
627                                if id == stream_id {
628                                    dispatcher.push(P2pNetworkKadRequestAction::Error {
629                                        peer_id: *peer_id,
630                                        error: "stream protocol is not negotiated".into(),
631                                    });
632                                }
633                            }
634                        }
635                    }
636                }
637            }
638        }
639    }
640}
641
642fn keep_connection_with_unknown_stream() -> bool {
643    static VAL: OnceLock<bool> = OnceLock::new();
644    *VAL.get_or_init(|| {
645        std::env::var("KEEP_CONNECTION_WITH_UNKNOWN_STREAM")
646            .ok()
647            .and_then(|v| v.parse().ok())
648            .unwrap_or(false)
649    })
650}