1use crate::{
2 channels::{
3 rpc::P2pChannelsRpcAction, signaling::discovery::P2pChannelsSignalingDiscoveryAction,
4 streaming_rpc::P2pChannelsStreamingRpcAction, P2pChannelsState,
5 },
6 connection::{
7 incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction,
8 P2pConnectionState,
9 },
10 disconnection::{P2pDisconnectedState, P2pDisconnectionAction},
11 P2pAction, P2pNetworkKadKey, P2pNetworkKademliaAction, P2pNetworkPnetAction,
12 P2pNetworkPubsubAction, P2pNetworkRpcAction, P2pNetworkSelectAction, P2pNetworkState,
13 P2pPeerState, P2pState, PeerId,
14};
15use openmina_core::{bug_condition, Substate};
16use redux::{ActionMeta, ActionWithMeta, Dispatcher, Timestamp};
17
18impl P2pState {
19 pub fn reducer<State, Action>(
20 mut state_context: Substate<Action, State, Self>,
21 action: ActionWithMeta<P2pAction>,
22 ) -> Result<(), String>
23 where
24 State: crate::P2pStateTrait,
25 Action: crate::P2pActionTrait<State>,
26 {
27 let Ok(state) = state_context.get_substate_mut() else {
28 bug_condition!("no P2pState");
29 return Ok(());
30 };
31 let (action, meta) = action.split();
32
33 match action {
34 P2pAction::Initialization(_) => {
35 Ok(())
37 }
38 P2pAction::Connection(action) => {
39 P2pConnectionState::reducer(state_context, meta.with_action(action))
40 }
41 P2pAction::Disconnection(action) => {
42 P2pDisconnectedState::reducer(state_context, meta.with_action(action))
43 }
44 P2pAction::Peer(action) => P2pPeerState::reducer(
45 Substate::from_compatible_substate(state_context),
46 meta.with_action(action),
47 ),
48 P2pAction::Channels(action) => {
49 P2pChannelsState::reducer(state_context, meta.with_action(action))
50 }
51 P2pAction::Identify(_action) => {
52 #[cfg(feature = "p2p-libp2p")]
53 Self::identify_reducer(state_context, meta.with_action(_action))?;
54 Ok(())
55 }
56 P2pAction::Network(_action) => {
57 #[cfg(feature = "p2p-libp2p")]
58 {
59 let limits = state.config.limits;
60 P2pNetworkState::reducer(
61 Substate::from_compatible_substate(state_context),
62 meta.with_action(_action),
63 &limits,
64 )?;
65 }
66 Ok(())
67 }
68 }
69 }
70
71 pub fn p2p_timeout_dispatch<State, Action>(
72 state_context: Substate<Action, State, Self>,
73 meta: &ActionMeta,
74 ) -> Result<(), String>
75 where
76 State: crate::P2pStateTrait,
77 Action: crate::P2pActionTrait<State>,
78 {
79 let (dispatcher, state) = state_context.into_dispatcher_and_state();
80 let state: &P2pState = state.substate()?;
81 let time = meta.time();
82
83 state.p2p_connection_timeouts_dispatch(dispatcher, time)?;
84 dispatcher.push(P2pConnectionOutgoingAction::RandomInit);
85 dispatcher.push(P2pDisconnectionAction::RandomTry);
86
87 state.p2p_connect_initial_peers(dispatcher);
88 state.p2p_try_reconnect_disconnected_peers(dispatcher, time)?;
89 state.p2p_discovery(dispatcher, time)?;
90
91 #[cfg(feature = "p2p-libp2p")]
92 {
93 state.p2p_pnet_timeouts(dispatcher, time)?;
94 state.p2p_select_timeouts(dispatcher, time)?;
95 state.p2p_rpc_heartbeats(dispatcher, time)?;
96 dispatcher.push(P2pNetworkPubsubAction::PruneMessages {});
97 }
98
99 state.rpc_timeouts(dispatcher, time)?;
100 Ok(())
101 }
102
103 fn p2p_connection_timeouts_dispatch<State, Action>(
104 &self,
105 dispatcher: &mut Dispatcher<Action, State>,
106 time: Timestamp,
107 ) -> Result<(), String>
108 where
109 State: crate::P2pStateTrait,
110 Action: crate::P2pActionTrait<State>,
111 {
112 let timeouts = &self.config.timeouts;
113
114 self.peers
115 .iter()
116 .filter_map(|(peer_id, peer)| {
117 let state = peer.status.as_connecting()?;
118 state
119 .is_timed_out(time, timeouts)
120 .then(|| (*peer_id, state.as_outgoing().is_some()))
121 })
122 .for_each(|(peer_id, is_outgoing)| match is_outgoing {
123 true => dispatcher.push(P2pConnectionOutgoingAction::Timeout { peer_id }),
124 false => dispatcher.push(P2pConnectionIncomingAction::Timeout { peer_id }),
125 });
126
127 Ok(())
128 }
129
130 fn p2p_try_reconnect_disconnected_peers<State, Action>(
131 &self,
132 dispatcher: &mut Dispatcher<Action, State>,
133 time: Timestamp,
134 ) -> Result<(), String>
135 where
136 State: crate::P2pStateTrait,
137 Action: crate::P2pActionTrait<State>,
138 {
139 if self.already_has_min_peers() {
140 return Ok(());
141 }
142
143 let timeouts = &self.config.timeouts;
144
145 self.peers
146 .iter()
147 .filter_map(|(_, peer)| {
148 if peer.can_reconnect(time, timeouts) {
149 peer.dial_opts.clone()
150 } else {
151 None
152 }
153 })
154 .map(|opts| P2pConnectionOutgoingAction::Reconnect { opts, rpc_id: None })
155 .for_each(|action| dispatcher.push(action));
156 Ok(())
157 }
158
159 fn p2p_connect_initial_peers<State, Action>(&self, dispatcher: &mut Dispatcher<Action, State>)
160 where
161 State: crate::P2pStateTrait,
162 Action: crate::P2pActionTrait<State>,
163 {
164 if self.ready_peers_iter().count() >= self.config.initial_peers.len() {
165 return;
166 }
167
168 self.config
169 .initial_peers
170 .iter()
171 .filter(|opts| !self.peers.contains_key(opts.peer_id()))
172 .cloned()
173 .for_each(|opts| {
174 dispatcher.push(P2pConnectionOutgoingAction::Init {
175 opts,
176 rpc_id: None,
177 on_success: None,
178 });
179 });
180 }
181
182 fn rpc_timeouts<State, Action>(
183 &self,
184 dispatcher: &mut Dispatcher<Action, State>,
185 time: Timestamp,
186 ) -> Result<(), String>
187 where
188 State: crate::P2pStateTrait,
189 Action: crate::P2pActionTrait<State>,
190 {
191 self.peer_rpc_timeouts(time)
192 .into_iter()
193 .for_each(|(peer_id, id, is_streaming)| {
194 if is_streaming {
195 dispatcher.push(P2pChannelsStreamingRpcAction::Timeout { peer_id, id });
196 } else {
197 dispatcher.push(P2pChannelsRpcAction::Timeout { peer_id, id });
198 }
199 });
200
201 Ok(())
202 }
203
204 fn p2p_discovery<State, Action>(
205 &self,
206 dispatcher: &mut Dispatcher<Action, State>,
207 time: Timestamp,
208 ) -> Result<(), String>
209 where
210 State: crate::P2pStateTrait,
211 Action: crate::P2pActionTrait<State>,
212 {
213 let config = &self.config;
214 let timeouts = &config.timeouts;
215
216 if !config.peer_discovery {
217 return Ok(());
218 }
219
220 for (&peer_id, _) in self
221 .ready_peers_iter()
222 .filter(|(_, peer)| peer.channels.signaling.discovery.is_ready())
223 {
224 dispatcher.push(P2pChannelsSignalingDiscoveryAction::RequestSend { peer_id });
225 dispatcher.push(P2pChannelsSignalingDiscoveryAction::DiscoveryRequestSend { peer_id });
226 }
227
228 if let Some(_d) = config.timeouts.initial_peers {
229 }
232
233 #[cfg(feature = "p2p-libp2p")]
234 {
235 if let Some(discovery_state) = self.network.scheduler.discovery_state() {
236 let my_id = self.my_id();
237
238 match P2pNetworkKadKey::try_from(&my_id) {
239 Ok(key) => {
240 if discovery_state.status.can_bootstrap(time, timeouts)
241 && discovery_state
242 .routing_table
243 .closest_peers(&key)
244 .any(|_| true)
245 {
246 dispatcher
247 .push(P2pNetworkKademliaAction::StartBootstrap { key: my_id });
248 }
249 }
250 Err(e) => bug_condition!("p2p discovery error: {:?}", e),
251 }
252 }
253 }
254
255 Ok(())
256 }
257}
258
259#[cfg(feature = "p2p-libp2p")]
260impl P2pState {
261 fn p2p_pnet_timeouts<State, Action>(
262 &self,
263 dispatcher: &mut Dispatcher<Action, State>,
264 time: Timestamp,
265 ) -> Result<(), String>
266 where
267 State: crate::P2pStateTrait,
268 Action: crate::P2pActionTrait<State>,
269 {
270 let timeouts = &self.config.timeouts;
271
272 self.network
273 .scheduler
274 .connections
275 .iter()
276 .filter(|(_, state)| state.pnet.is_timed_out(time, timeouts))
277 .map(|(addr, _)| P2pNetworkPnetAction::Timeout { addr: *addr })
278 .for_each(|action| dispatcher.push(action));
279
280 Ok(())
281 }
282
283 fn p2p_select_timeouts<State, Action>(
284 &self,
285 dispatcher: &mut Dispatcher<Action, State>,
286 time: Timestamp,
287 ) -> Result<(), String>
288 where
289 State: crate::P2pStateTrait,
290 Action: crate::P2pActionTrait<State>,
291 {
292 let timeouts = &self.config.timeouts;
293
294 self.network
295 .scheduler
296 .connections
297 .iter()
298 .filter(|(_, state)| state.select_auth.is_timed_out(time, timeouts))
299 .map(|(addr, _)| P2pNetworkSelectAction::Timeout {
300 addr: *addr,
301 kind: crate::SelectKind::Authentication,
302 })
303 .for_each(|action| dispatcher.push(action));
304
305 self.network
306 .scheduler
307 .connections
308 .iter()
309 .filter(|(_, state)| state.select_mux.is_timed_out(time, timeouts))
310 .map(|(addr, _)| P2pNetworkSelectAction::Timeout {
311 addr: *addr,
312 kind: crate::SelectKind::MultiplexingNoPeerId,
313 })
314 .for_each(|action| dispatcher.push(action));
315
316 let dummy = PeerId::from_bytes([0u8; 32]);
318 self.network
319 .scheduler
320 .connections
321 .iter()
322 .flat_map(|(sock_addr, state)| {
323 state
324 .streams
325 .iter()
326 .filter(|(_, stream)| stream.select.is_timed_out(time, timeouts))
327 .map(|(stream_id, _)| (*sock_addr, *stream_id))
328 })
329 .map(|(addr, stream_id)| P2pNetworkSelectAction::Timeout {
330 addr,
331 kind: crate::SelectKind::Stream(dummy, stream_id),
332 })
333 .for_each(|action| dispatcher.push(action));
334
335 Ok(())
336 }
337
338 fn p2p_rpc_heartbeats<State, Action>(
339 &self,
340 dispatcher: &mut Dispatcher<Action, State>,
341 time: Timestamp,
342 ) -> Result<(), String>
343 where
344 State: crate::P2pStateTrait,
345 Action: crate::P2pActionTrait<State>,
346 {
347 let scheduler = &self.network.scheduler;
348
349 scheduler
350 .rpc_incoming_streams
351 .0
352 .iter()
353 .chain(&scheduler.rpc_outgoing_streams.0)
354 .flat_map(|(peer_id, state)| {
355 state
356 .iter()
357 .filter(|(_, s)| s.should_send_heartbeat(time))
358 .map(|(stream_id, state)| P2pNetworkRpcAction::HeartbeatSend {
359 addr: state.addr,
360 peer_id: *peer_id,
361 stream_id: *stream_id,
362 })
363 })
364 .for_each(|action| dispatcher.push(action));
365
366 Ok(())
367 }
368}