p2p/network/scheduler/
p2p_network_scheduler_actions.rs

1use std::net::{IpAddr, SocketAddr};
2
3use openmina_core::ActionEvent;
4use serde::{Deserialize, Serialize};
5
6use super::{
7    super::{
8        select::{token, SelectKind},
9        Data, Limit,
10    },
11    p2p_network_scheduler_state::{P2pNetworkConnectionCloseReason, P2pNetworkConnectionError},
12};
13
14use crate::{disconnection::P2pDisconnectionReason, ConnectionAddr, P2pState, PeerId, StreamId};
15
16#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
17#[action_event(fields(display(ip), display(listener), display(addr), debug(result), select_kind = debug(kind), display(error)))]
18pub enum P2pNetworkSchedulerAction {
19    InterfaceDetected {
20        ip: IpAddr,
21    },
22    InterfaceExpired {
23        ip: IpAddr,
24    },
25    ListenerReady {
26        listener: SocketAddr,
27    },
28    ListenerError {
29        listener: SocketAddr,
30        error: String,
31    },
32    IncomingConnectionIsReady {
33        listener: SocketAddr,
34    },
35    #[action_event(fields(debug(addr), debug(result)))]
36    IncomingDidAccept {
37        addr: Option<ConnectionAddr>,
38        result: Result<(), String>,
39    },
40    IncomingDataIsReady {
41        addr: ConnectionAddr,
42    },
43    /// Initialize outgoing connection.
44    OutgoingConnect {
45        addr: SocketAddr,
46    },
47    /// Outgoint TCP stream is established.
48    OutgoingDidConnect {
49        addr: ConnectionAddr,
50        result: Result<(), String>,
51    },
52    IncomingDataDidReceive {
53        addr: ConnectionAddr,
54        result: Result<Data, String>,
55    },
56    SelectDone {
57        addr: ConnectionAddr,
58        kind: SelectKind,
59        protocol: Option<token::Protocol>,
60        incoming: bool,
61        expected_peer_id: Option<PeerId>,
62    },
63    SelectError {
64        addr: ConnectionAddr,
65        kind: SelectKind,
66        error: String,
67    },
68    YamuxDidInit {
69        addr: ConnectionAddr,
70        peer_id: PeerId,
71        message_size_limit: Limit<usize>,
72        pending_outgoing_limit: Limit<usize>,
73    },
74
75    /// Action that initiate the specified peer disconnection.
76    Disconnect {
77        /// Connection address.
78        addr: ConnectionAddr,
79        /// Reason why disconneciton is triggered.
80        reason: P2pDisconnectionReason,
81    },
82
83    /// Fatal connection error.
84    Error {
85        /// Connection address.
86        addr: ConnectionAddr,
87        /// Reason why disconneciton is triggered.
88        error: P2pNetworkConnectionError,
89    },
90
91    /// Remote address is disconnected.
92    ///
93    /// Action that signals that the peer is disconnected.
94    Disconnected {
95        /// Connection address.
96        addr: ConnectionAddr,
97        /// Reason why the peer disconnected.
98        reason: P2pNetworkConnectionCloseReason,
99    },
100
101    /// Prune connection.
102    Prune {
103        /// Connection address.
104        addr: ConnectionAddr,
105    },
106    /// Prune streams.
107    PruneStream {
108        peer_id: PeerId,
109        stream_id: StreamId,
110    },
111}
112
113impl From<P2pNetworkSchedulerAction> for crate::P2pAction {
114    fn from(value: P2pNetworkSchedulerAction) -> Self {
115        crate::P2pAction::Network(value.into())
116    }
117}
118
119impl redux::EnablingCondition<P2pState> for P2pNetworkSchedulerAction {
120    fn is_enabled(&self, state: &P2pState, _time: redux::Timestamp) -> bool {
121        match self {
122            P2pNetworkSchedulerAction::InterfaceDetected { .. }
123            | P2pNetworkSchedulerAction::InterfaceExpired { .. }
124            | P2pNetworkSchedulerAction::ListenerReady { .. }
125            | P2pNetworkSchedulerAction::ListenerError { .. }
126            | P2pNetworkSchedulerAction::IncomingConnectionIsReady { .. }
127            | P2pNetworkSchedulerAction::SelectDone { .. }
128            | P2pNetworkSchedulerAction::SelectError { .. } => true,
129            P2pNetworkSchedulerAction::IncomingDidAccept { addr, .. } => addr
130                .as_ref()
131                .is_some_and(|addr| !state.network.scheduler.connections.contains_key(addr)),
132            P2pNetworkSchedulerAction::OutgoingConnect { addr } => state
133                .network
134                .scheduler
135                .connections
136                .get(&ConnectionAddr {
137                    sock_addr: *addr,
138                    incoming: false,
139                })
140                .is_none_or(|v| v.closed.is_some()),
141            P2pNetworkSchedulerAction::OutgoingDidConnect { addr, .. } => state
142                .network
143                .scheduler
144                .connections
145                .get(addr)
146                .is_some_and(|conn_state| !conn_state.incoming),
147            P2pNetworkSchedulerAction::IncomingDataDidReceive { addr, .. }
148            | P2pNetworkSchedulerAction::IncomingDataIsReady { addr }
149            | P2pNetworkSchedulerAction::YamuxDidInit { addr, .. } => {
150                state.network.scheduler.connections.contains_key(addr)
151            }
152            P2pNetworkSchedulerAction::Disconnect { addr, .. }
153            | P2pNetworkSchedulerAction::Error { addr, .. } => state
154                .network
155                .scheduler
156                .connections
157                .get(addr)
158                .is_some_and(|conn_state| conn_state.closed.is_none()),
159            P2pNetworkSchedulerAction::Disconnected { addr, reason } => state
160                .network
161                .scheduler
162                .connections
163                .get(addr)
164                .is_some_and(|conn_state| conn_state.closed.as_ref() == Some(reason)),
165            // TODO: introduce state for closed connection
166            P2pNetworkSchedulerAction::Prune { addr } => state
167                .network
168                .scheduler
169                .connections
170                .get(addr)
171                .is_some_and(|conn_state| conn_state.closed.is_some()),
172            P2pNetworkSchedulerAction::PruneStream { peer_id, stream_id } => state
173                .network
174                .scheduler
175                .find_peer(peer_id)
176                .and_then(|(_, conn_state)| conn_state.streams.get(stream_id))
177                .is_some(),
178        }
179    }
180}