p2p/peer/
p2p_peer_reducer.rs

1use openmina_core::{bug_condition, Substate};
2use redux::{ActionWithMeta, Timestamp};
3
4use crate::{P2pPeerState, P2pPeerStatus, P2pPeerStatusReady, P2pState};
5
6use super::P2pPeerAction;
7
8impl P2pPeerState {
9    /// Substate is accessed
10    pub fn reducer<Action, State>(
11        mut state_context: Substate<Action, State, P2pState>,
12        action: ActionWithMeta<P2pPeerAction>,
13    ) -> Result<(), String>
14    where
15        State: crate::P2pStateTrait,
16        Action: crate::P2pActionTrait<State>,
17    {
18        let p2p_state = state_context.get_substate_mut()?;
19        let (action, meta) = action.split();
20
21        match action {
22            P2pPeerAction::Discovered { peer_id, dial_opts } => {
23                // TODO: add bound to peers
24                let peer_state = p2p_state
25                    .peers
26                    .entry(peer_id)
27                    .or_insert_with(|| P2pPeerState {
28                        is_libp2p: true,
29                        dial_opts: None,
30                        identify: None,
31                        status: P2pPeerStatus::Disconnected {
32                            time: Timestamp::ZERO,
33                        },
34                    });
35
36                if let Some(dial_opts) = dial_opts {
37                    peer_state.dial_opts.get_or_insert(dial_opts);
38                }
39                Ok(())
40            }
41            P2pPeerAction::Ready { peer_id, incoming } => {
42                let Some(peer) = p2p_state.peers.get_mut(&peer_id) else {
43                    return Ok(());
44                };
45                peer.status = P2pPeerStatus::Ready(P2pPeerStatusReady::new(
46                    incoming,
47                    meta.time(),
48                    &p2p_state.config.enabled_channels,
49                ));
50
51                if !peer.is_libp2p {
52                    let (dispatcher, state) = state_context.into_dispatcher_and_state();
53                    let state: &P2pState = state.substate()?;
54                    state.channels_init(dispatcher, peer_id);
55                }
56
57                Ok(())
58            }
59            P2pPeerAction::BestTipUpdate { peer_id, best_tip } => {
60                let Some(peer) = p2p_state.get_ready_peer_mut(&peer_id) else {
61                    bug_condition!("Peer state not found for `P2pPeerAction::BestTipUpdate`");
62                    return Ok(());
63                };
64                peer.best_tip = Some(best_tip.clone());
65
66                let (dispatcher, state) = state_context.into_dispatcher_and_state();
67                let p2p_state: &P2pState = state.substate()?;
68
69                if let Some(callback) = &p2p_state.callbacks.on_p2p_peer_best_tip_update {
70                    dispatcher.push_callback(callback.clone(), best_tip);
71                }
72                Ok(())
73            }
74            P2pPeerAction::Remove { peer_id } => {
75                if p2p_state.peers.remove(&peer_id).is_none() {
76                    bug_condition!(
77                        "Missing state for peer {peer_id} action: `P2pPeerAction::Remove`"
78                    );
79                }
80
81                Ok(())
82            }
83        }
84    }
85}