p2p/disconnection/
p2p_disconnection_reducer.rs1use std::time::Duration;
2
3use openmina_core::{bug_condition, pseudo_rng, Substate};
4use rand::prelude::*;
5use redux::ActionWithMeta;
6
7use crate::{
8 disconnection_effectful::P2pDisconnectionEffectfulAction, P2pNetworkSchedulerAction,
9 P2pPeerAction, P2pPeerStatus, P2pState,
10};
11
12use super::{P2pDisconnectedState, P2pDisconnectionAction, P2pDisconnectionReason};
13
14const FORCE_PEER_STABLE_FOR: Duration = Duration::from_secs(90);
16
17impl P2pDisconnectedState {
18 pub fn reducer<Action, State>(
19 mut state_context: Substate<Action, State, P2pState>,
20 action: ActionWithMeta<P2pDisconnectionAction>,
21 ) -> Result<(), String>
22 where
23 State: crate::P2pStateTrait,
24 Action: crate::P2pActionTrait<State>,
25 {
26 let (action, meta) = action.split();
27 let p2p_state = state_context.get_substate_mut()?;
28
29 match action {
30 P2pDisconnectionAction::RandomTry => {
31 p2p_state.last_random_disconnection_try = meta.time();
32 if p2p_state.config.limits.max_stable_peers()
33 >= p2p_state.ready_peers_iter().count()
34 {
35 return Ok(());
36 }
37 let mut rng = pseudo_rng(meta.time());
38
39 let peer_id = p2p_state
40 .ready_peers_iter()
41 .filter(|(_, s)| s.connected_for(meta.time()) > FORCE_PEER_STABLE_FOR)
42 .map(|(id, _)| *id)
43 .choose(&mut rng);
44
45 if let Some(peer_id) = peer_id {
46 let dispatcher = state_context.into_dispatcher();
47 dispatcher.push(P2pDisconnectionAction::Init {
48 peer_id,
49 reason: P2pDisconnectionReason::FreeUpSpace,
50 });
51 }
52 Ok(())
53 }
54 P2pDisconnectionAction::Init { peer_id, reason } => {
55 let Some(peer) = p2p_state.peers.get_mut(&peer_id) else {
56 bug_condition!("Invalid state for: `P2pDisconnectionAction::Init`");
57 return Ok(());
58 };
59 peer.status = P2pPeerStatus::Disconnecting { time: meta.time() };
60
61 #[cfg(feature = "p2p-libp2p")]
62 if peer.is_libp2p() {
63 let connections = p2p_state
64 .network
65 .scheduler
66 .connections
67 .iter()
68 .filter(|(_, conn_state)| conn_state.peer_id() == Some(&peer_id))
69 .map(|(addr, _)| *addr)
70 .collect::<Vec<_>>();
71
72 let dispatcher = state_context.into_dispatcher();
73 for addr in connections {
74 dispatcher.push(P2pNetworkSchedulerAction::Disconnect {
75 addr,
76 reason: reason.clone(),
77 });
78 }
79
80 dispatcher.push(P2pDisconnectionAction::Finish { peer_id });
81 return Ok(());
82 }
83
84 let dispatcher = state_context.into_dispatcher();
85 dispatcher.push(P2pDisconnectionEffectfulAction::Init { peer_id });
86 Ok(())
87 }
88 P2pDisconnectionAction::PeerClosed { peer_id } => {
89 let dispatcher = state_context.into_dispatcher();
90 dispatcher.push(P2pDisconnectionEffectfulAction::Init { peer_id });
91 Ok(())
92 }
93 P2pDisconnectionAction::FailedCleanup { peer_id } => {
94 let Some(peer) = p2p_state.peers.get_mut(&peer_id) else {
95 bug_condition!("Invalid state for: `P2pDisconnectionAction::FailedCleanup`");
96 return Ok(());
97 };
98 peer.status = P2pPeerStatus::Disconnecting { time: meta.time() };
99
100 let dispatcher = state_context.into_dispatcher();
101 dispatcher.push(P2pDisconnectionEffectfulAction::Init { peer_id });
102 Ok(())
103 }
104 P2pDisconnectionAction::Finish { peer_id } => {
105 let Some(peer) = p2p_state.peers.get_mut(&peer_id) else {
106 bug_condition!("Invalid state for: `P2pDisconnectionAction::Finish`");
107 return Ok(());
108 };
109 if peer.is_libp2p()
110 && p2p_state
111 .network
112 .scheduler
113 .connections
114 .iter()
115 .any(|(_addr, conn_state)| {
116 conn_state.peer_id() == Some(&peer_id) && conn_state.closed.is_none()
117 })
118 {
119 return Ok(());
120 }
121
122 peer.status = P2pPeerStatus::Disconnected { time: meta.time() };
123
124 let (dispatcher, state) = state_context.into_dispatcher_and_state();
125 let p2p_state: &P2pState = state.substate()?;
126
127 if let Some((peer_id, _)) = p2p_state
129 .peers
130 .iter()
131 .filter_map(|(id, p)| {
132 Some((*id, p.status.disconnected_or_disconnecting_time()?))
133 })
134 .min_by_key(|(_, t)| *t)
135 {
136 dispatcher.push(P2pPeerAction::Remove { peer_id });
137 }
138
139 if let Some(callback) = &p2p_state.callbacks.on_p2p_disconnection_finish {
140 dispatcher.push_callback(callback.clone(), peer_id);
141 }
142 Ok(())
143 }
144 }
145 }
146}