p2p/identify/
p2p_identify_reducer.rs

1use openmina_core::{bug_condition, Substate};
2use redux::ActionWithMeta;
3
4use crate::{
5    connection::outgoing::P2pConnectionOutgoingInitOpts,
6    disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
7    token::{BroadcastAlgorithm, DiscoveryAlgorithm, IdentifyAlgorithm, RpcAlgorithm, StreamKind},
8    P2pNetworkConnectionMuxState, P2pNetworkKadRequestAction, P2pNetworkKadState,
9    P2pNetworkKademliaAction, P2pNetworkYamuxAction, P2pState, YamuxStreamKind,
10};
11
12use super::P2pIdentifyAction;
13
14impl P2pState {
15    #[cfg(feature = "p2p-libp2p")]
16    pub fn identify_reducer<Action, State>(
17        mut state_context: Substate<Action, State, P2pState>,
18        action: ActionWithMeta<P2pIdentifyAction>,
19    ) -> Result<(), String>
20    where
21        State: crate::P2pStateTrait,
22        Action: crate::P2pActionTrait<State>,
23    {
24        let (action, _meta) = action.split();
25        let p2p_state = state_context.get_substate_mut()?;
26
27        match action {
28            P2pIdentifyAction::NewRequest { addr, .. } => {
29                let (dispatcher, state) = state_context.into_dispatcher_and_state();
30                let p2p_state: &P2pState = state.substate()?;
31                let scheduler = &p2p_state.network.scheduler;
32                let stream_id = scheduler
33                    .connections
34                    .get(&addr)
35                    .ok_or_else(|| format!("connection with {addr} not found"))
36                    .and_then(|conn| {
37                        conn.mux
38                            .as_ref()
39                            .map(|mux| (mux, conn.incoming))
40                            .ok_or_else(|| format!("multiplexing is not ready for {addr}"))
41                    })
42                    .and_then(|(P2pNetworkConnectionMuxState::Yamux(yamux), incoming)| {
43                        yamux
44                            .next_stream_id(crate::YamuxStreamKind::Identify, incoming)
45                            .ok_or_else(|| format!("cannot get next stream for {addr}"))
46                    })?;
47
48                dispatcher.push(P2pNetworkYamuxAction::OpenStream {
49                    addr,
50                    stream_id,
51                    stream_kind: StreamKind::Identify(IdentifyAlgorithm::Identify1_0_0),
52                });
53
54                Ok(())
55            }
56            P2pIdentifyAction::UpdatePeerInformation {
57                peer_id,
58                info,
59                addr,
60            } => {
61                let info = *info;
62                if let Some(peer) = p2p_state.peers.get_mut(&peer_id) {
63                    peer.identify = Some(info.clone());
64                    if let Some(P2pConnectionOutgoingInitOpts::LibP2P(opts)) = &mut peer.dial_opts {
65                        opts.update_host_if_needed(info.listen_addrs.iter());
66                    }
67                } else {
68                    bug_condition!(
69                        "Peer state not found for `P2pIdentifyAction::UpdatePeerInformation`"
70                    );
71                    return Ok(());
72                }
73
74                let (dispatcher, state) = state_context.into_dispatcher_and_state();
75
76                dispatcher.push(P2pNetworkKademliaAction::UpdateRoutingTable {
77                    peer_id,
78                    addrs: info.listen_addrs,
79                });
80
81                let stream_id = YamuxStreamKind::Rpc.stream_id(addr.incoming);
82
83                let stream_kind = StreamKind::Rpc(RpcAlgorithm::Rpc0_0_1);
84                if !info.protocols.contains(&stream_kind) {
85                    dispatcher.push(P2pDisconnectionAction::Init {
86                        peer_id,
87                        reason: P2pDisconnectionReason::Unsupported,
88                    });
89                    return Ok(());
90                }
91
92                {
93                    let state: &P2pState = state.substate()?;
94                    state.channels_init(dispatcher, peer_id);
95                }
96
97                dispatcher.push(P2pNetworkYamuxAction::OpenStream {
98                    addr,
99                    stream_id,
100                    stream_kind,
101                });
102
103                let stream_kind = StreamKind::Broadcast(BroadcastAlgorithm::Meshsub1_1_0);
104                if info.protocols.contains(&stream_kind) {
105                    dispatcher.push(P2pNetworkYamuxAction::OpenStream {
106                        addr,
107                        stream_id: stream_id + 2,
108                        stream_kind,
109                    });
110                }
111
112                let kad_state: Option<&P2pNetworkKadState> = state.substate().ok();
113                let protocol = StreamKind::Discovery(DiscoveryAlgorithm::Kademlia1_0_0);
114                if kad_state.is_some_and(|state| state.request(&peer_id).is_some())
115                    && info.protocols.contains(&protocol)
116                {
117                    dispatcher.push(P2pNetworkKadRequestAction::MuxReady { peer_id, addr });
118                }
119
120                Ok(())
121            }
122        }
123    }
124}