p2p/disconnection/
p2p_disconnection_reducer.rs

1use std::time::Duration;
2
3use openmina_core::{bug_condition, pseudo_rng, Substate};
4use rand::prelude::*;
5use redux::ActionWithMeta;
6
7use crate::{
8    disconnection_effectful::P2pDisconnectionEffectfulAction, P2pNetworkSchedulerAction,
9    P2pPeerAction, P2pPeerStatus, P2pState,
10};
11
12use super::{P2pDisconnectedState, P2pDisconnectionAction, P2pDisconnectionReason};
13
14/// Do not disconnect peer for this duration just for freeing up peer space.
15const FORCE_PEER_STABLE_FOR: Duration = Duration::from_secs(90);
16
17impl P2pDisconnectedState {
18    pub fn reducer<Action, State>(
19        mut state_context: Substate<Action, State, P2pState>,
20        action: ActionWithMeta<P2pDisconnectionAction>,
21    ) -> Result<(), String>
22    where
23        State: crate::P2pStateTrait,
24        Action: crate::P2pActionTrait<State>,
25    {
26        let (action, meta) = action.split();
27        let p2p_state = state_context.get_substate_mut()?;
28
29        match action {
30            P2pDisconnectionAction::RandomTry => {
31                p2p_state.last_random_disconnection_try = meta.time();
32                if p2p_state.config.limits.max_stable_peers()
33                    >= p2p_state.ready_peers_iter().count()
34                {
35                    return Ok(());
36                }
37                let mut rng = pseudo_rng(meta.time());
38
39                let peer_id = p2p_state
40                    .ready_peers_iter()
41                    .filter(|(_, s)| s.connected_for(meta.time()) > FORCE_PEER_STABLE_FOR)
42                    .map(|(id, _)| *id)
43                    .choose(&mut rng);
44
45                if let Some(peer_id) = peer_id {
46                    let dispatcher = state_context.into_dispatcher();
47                    dispatcher.push(P2pDisconnectionAction::Init {
48                        peer_id,
49                        reason: P2pDisconnectionReason::FreeUpSpace,
50                    });
51                }
52                Ok(())
53            }
54            P2pDisconnectionAction::Init { peer_id, reason } => {
55                let Some(peer) = p2p_state.peers.get_mut(&peer_id) else {
56                    bug_condition!("Invalid state for: `P2pDisconnectionAction::Init`");
57                    return Ok(());
58                };
59                peer.status = P2pPeerStatus::Disconnecting { time: meta.time() };
60
61                #[cfg(feature = "p2p-libp2p")]
62                if peer.is_libp2p() {
63                    let connections = p2p_state
64                        .network
65                        .scheduler
66                        .connections
67                        .iter()
68                        .filter(|(_, conn_state)| conn_state.peer_id() == Some(&peer_id))
69                        .map(|(addr, _)| *addr)
70                        .collect::<Vec<_>>();
71
72                    let dispatcher = state_context.into_dispatcher();
73                    for addr in connections {
74                        dispatcher.push(P2pNetworkSchedulerAction::Disconnect {
75                            addr,
76                            reason: reason.clone(),
77                        });
78                    }
79
80                    dispatcher.push(P2pDisconnectionAction::Finish { peer_id });
81                    return Ok(());
82                }
83
84                let dispatcher = state_context.into_dispatcher();
85                dispatcher.push(P2pDisconnectionEffectfulAction::Init { peer_id });
86                Ok(())
87            }
88            P2pDisconnectionAction::PeerClosed { peer_id } => {
89                let dispatcher = state_context.into_dispatcher();
90                dispatcher.push(P2pDisconnectionEffectfulAction::Init { peer_id });
91                Ok(())
92            }
93            P2pDisconnectionAction::FailedCleanup { peer_id } => {
94                let Some(peer) = p2p_state.peers.get_mut(&peer_id) else {
95                    bug_condition!("Invalid state for: `P2pDisconnectionAction::FailedCleanup`");
96                    return Ok(());
97                };
98                peer.status = P2pPeerStatus::Disconnecting { time: meta.time() };
99
100                let dispatcher = state_context.into_dispatcher();
101                dispatcher.push(P2pDisconnectionEffectfulAction::Init { peer_id });
102                Ok(())
103            }
104            P2pDisconnectionAction::Finish { peer_id } => {
105                let Some(peer) = p2p_state.peers.get_mut(&peer_id) else {
106                    bug_condition!("Invalid state for: `P2pDisconnectionAction::Finish`");
107                    return Ok(());
108                };
109                if peer.is_libp2p()
110                    && p2p_state
111                        .network
112                        .scheduler
113                        .connections
114                        .iter()
115                        .any(|(_addr, conn_state)| {
116                            conn_state.peer_id() == Some(&peer_id) && conn_state.closed.is_none()
117                        })
118                {
119                    return Ok(());
120                }
121
122                peer.status = P2pPeerStatus::Disconnected { time: meta.time() };
123
124                let (dispatcher, state) = state_context.into_dispatcher_and_state();
125                let p2p_state: &P2pState = state.substate()?;
126
127                // remove oldest disconnected peer
128                if let Some((peer_id, _)) = p2p_state
129                    .peers
130                    .iter()
131                    .filter_map(|(id, p)| {
132                        Some((*id, p.status.disconnected_or_disconnecting_time()?))
133                    })
134                    .min_by_key(|(_, t)| *t)
135                {
136                    dispatcher.push(P2pPeerAction::Remove { peer_id });
137                }
138
139                if let Some(callback) = &p2p_state.callbacks.on_p2p_disconnection_finish {
140                    dispatcher.push_callback(callback.clone(), peer_id);
141                }
142                Ok(())
143            }
144        }
145    }
146}