1use std::net::{IpAddr, SocketAddr};
2
3use openmina_core::ActionEvent;
4use serde::{Deserialize, Serialize};
5
6use super::{
7 super::{
8 select::{token, SelectKind},
9 Data, Limit,
10 },
11 p2p_network_scheduler_state::{P2pNetworkConnectionCloseReason, P2pNetworkConnectionError},
12};
13
14use crate::{disconnection::P2pDisconnectionReason, ConnectionAddr, P2pState, PeerId, StreamId};
15
16#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
17#[action_event(fields(display(ip), display(listener), display(addr), debug(result), select_kind = debug(kind), display(error)))]
18pub enum P2pNetworkSchedulerAction {
19 InterfaceDetected {
20 ip: IpAddr,
21 },
22 InterfaceExpired {
23 ip: IpAddr,
24 },
25 ListenerReady {
26 listener: SocketAddr,
27 },
28 ListenerError {
29 listener: SocketAddr,
30 error: String,
31 },
32 IncomingConnectionIsReady {
33 listener: SocketAddr,
34 },
35 #[action_event(fields(debug(addr), debug(result)))]
36 IncomingDidAccept {
37 addr: Option<ConnectionAddr>,
38 result: Result<(), String>,
39 },
40 IncomingDataIsReady {
41 addr: ConnectionAddr,
42 },
43 OutgoingConnect {
45 addr: SocketAddr,
46 },
47 OutgoingDidConnect {
49 addr: ConnectionAddr,
50 result: Result<(), String>,
51 },
52 IncomingDataDidReceive {
53 addr: ConnectionAddr,
54 result: Result<Data, String>,
55 },
56 SelectDone {
57 addr: ConnectionAddr,
58 kind: SelectKind,
59 protocol: Option<token::Protocol>,
60 incoming: bool,
61 expected_peer_id: Option<PeerId>,
62 },
63 SelectError {
64 addr: ConnectionAddr,
65 kind: SelectKind,
66 error: String,
67 },
68 YamuxDidInit {
69 addr: ConnectionAddr,
70 peer_id: PeerId,
71 message_size_limit: Limit<usize>,
72 pending_outgoing_limit: Limit<usize>,
73 },
74
75 Disconnect {
77 addr: ConnectionAddr,
79 reason: P2pDisconnectionReason,
81 },
82
83 Error {
85 addr: ConnectionAddr,
87 error: P2pNetworkConnectionError,
89 },
90
91 Disconnected {
95 addr: ConnectionAddr,
97 reason: P2pNetworkConnectionCloseReason,
99 },
100
101 Prune {
103 addr: ConnectionAddr,
105 },
106 PruneStream {
108 peer_id: PeerId,
109 stream_id: StreamId,
110 },
111}
112
113impl From<P2pNetworkSchedulerAction> for crate::P2pAction {
114 fn from(value: P2pNetworkSchedulerAction) -> Self {
115 crate::P2pAction::Network(value.into())
116 }
117}
118
119impl redux::EnablingCondition<P2pState> for P2pNetworkSchedulerAction {
120 fn is_enabled(&self, state: &P2pState, _time: redux::Timestamp) -> bool {
121 match self {
122 P2pNetworkSchedulerAction::InterfaceDetected { .. }
123 | P2pNetworkSchedulerAction::InterfaceExpired { .. }
124 | P2pNetworkSchedulerAction::ListenerReady { .. }
125 | P2pNetworkSchedulerAction::ListenerError { .. }
126 | P2pNetworkSchedulerAction::IncomingConnectionIsReady { .. }
127 | P2pNetworkSchedulerAction::SelectDone { .. }
128 | P2pNetworkSchedulerAction::SelectError { .. } => true,
129 P2pNetworkSchedulerAction::IncomingDidAccept { addr, .. } => addr
130 .as_ref()
131 .is_some_and(|addr| !state.network.scheduler.connections.contains_key(addr)),
132 P2pNetworkSchedulerAction::OutgoingConnect { addr } => state
133 .network
134 .scheduler
135 .connections
136 .get(&ConnectionAddr {
137 sock_addr: *addr,
138 incoming: false,
139 })
140 .is_none_or(|v| v.closed.is_some()),
141 P2pNetworkSchedulerAction::OutgoingDidConnect { addr, .. } => state
142 .network
143 .scheduler
144 .connections
145 .get(addr)
146 .is_some_and(|conn_state| !conn_state.incoming),
147 P2pNetworkSchedulerAction::IncomingDataDidReceive { addr, .. }
148 | P2pNetworkSchedulerAction::IncomingDataIsReady { addr }
149 | P2pNetworkSchedulerAction::YamuxDidInit { addr, .. } => {
150 state.network.scheduler.connections.contains_key(addr)
151 }
152 P2pNetworkSchedulerAction::Disconnect { addr, .. }
153 | P2pNetworkSchedulerAction::Error { addr, .. } => state
154 .network
155 .scheduler
156 .connections
157 .get(addr)
158 .is_some_and(|conn_state| conn_state.closed.is_none()),
159 P2pNetworkSchedulerAction::Disconnected { addr, reason } => state
160 .network
161 .scheduler
162 .connections
163 .get(addr)
164 .is_some_and(|conn_state| conn_state.closed.as_ref() == Some(reason)),
165 P2pNetworkSchedulerAction::Prune { addr } => state
167 .network
168 .scheduler
169 .connections
170 .get(addr)
171 .is_some_and(|conn_state| conn_state.closed.is_some()),
172 P2pNetworkSchedulerAction::PruneStream { peer_id, stream_id } => state
173 .network
174 .scheduler
175 .find_peer(peer_id)
176 .and_then(|(_, conn_state)| conn_state.streams.get(stream_id))
177 .is_some(),
178 }
179 }
180}