p2p/identify/
p2p_identify_reducer.rs1use openmina_core::{bug_condition, Substate};
2use redux::ActionWithMeta;
3
4use crate::{
5 connection::outgoing::P2pConnectionOutgoingInitOpts,
6 disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
7 token::{BroadcastAlgorithm, DiscoveryAlgorithm, IdentifyAlgorithm, RpcAlgorithm, StreamKind},
8 P2pNetworkConnectionMuxState, P2pNetworkKadRequestAction, P2pNetworkKadState,
9 P2pNetworkKademliaAction, P2pNetworkYamuxAction, P2pState, YamuxStreamKind,
10};
11
12use super::P2pIdentifyAction;
13
14impl P2pState {
15 #[cfg(feature = "p2p-libp2p")]
16 pub fn identify_reducer<Action, State>(
17 mut state_context: Substate<Action, State, P2pState>,
18 action: ActionWithMeta<P2pIdentifyAction>,
19 ) -> Result<(), String>
20 where
21 State: crate::P2pStateTrait,
22 Action: crate::P2pActionTrait<State>,
23 {
24 let (action, _meta) = action.split();
25 let p2p_state = state_context.get_substate_mut()?;
26
27 match action {
28 P2pIdentifyAction::NewRequest { addr, .. } => {
29 let (dispatcher, state) = state_context.into_dispatcher_and_state();
30 let p2p_state: &P2pState = state.substate()?;
31 let scheduler = &p2p_state.network.scheduler;
32 let stream_id = scheduler
33 .connections
34 .get(&addr)
35 .ok_or_else(|| format!("connection with {addr} not found"))
36 .and_then(|conn| {
37 conn.mux
38 .as_ref()
39 .map(|mux| (mux, conn.incoming))
40 .ok_or_else(|| format!("multiplexing is not ready for {addr}"))
41 })
42 .and_then(|(P2pNetworkConnectionMuxState::Yamux(yamux), incoming)| {
43 yamux
44 .next_stream_id(crate::YamuxStreamKind::Identify, incoming)
45 .ok_or_else(|| format!("cannot get next stream for {addr}"))
46 })?;
47
48 dispatcher.push(P2pNetworkYamuxAction::OpenStream {
49 addr,
50 stream_id,
51 stream_kind: StreamKind::Identify(IdentifyAlgorithm::Identify1_0_0),
52 });
53
54 Ok(())
55 }
56 P2pIdentifyAction::UpdatePeerInformation {
57 peer_id,
58 info,
59 addr,
60 } => {
61 let info = *info;
62 if let Some(peer) = p2p_state.peers.get_mut(&peer_id) {
63 peer.identify = Some(info.clone());
64 if let Some(P2pConnectionOutgoingInitOpts::LibP2P(opts)) = &mut peer.dial_opts {
65 opts.update_host_if_needed(info.listen_addrs.iter());
66 }
67 } else {
68 bug_condition!(
69 "Peer state not found for `P2pIdentifyAction::UpdatePeerInformation`"
70 );
71 return Ok(());
72 }
73
74 let (dispatcher, state) = state_context.into_dispatcher_and_state();
75
76 dispatcher.push(P2pNetworkKademliaAction::UpdateRoutingTable {
77 peer_id,
78 addrs: info.listen_addrs,
79 });
80
81 let stream_id = YamuxStreamKind::Rpc.stream_id(addr.incoming);
82
83 let stream_kind = StreamKind::Rpc(RpcAlgorithm::Rpc0_0_1);
84 if !info.protocols.contains(&stream_kind) {
85 dispatcher.push(P2pDisconnectionAction::Init {
86 peer_id,
87 reason: P2pDisconnectionReason::Unsupported,
88 });
89 return Ok(());
90 }
91
92 {
93 let state: &P2pState = state.substate()?;
94 state.channels_init(dispatcher, peer_id);
95 }
96
97 dispatcher.push(P2pNetworkYamuxAction::OpenStream {
98 addr,
99 stream_id,
100 stream_kind,
101 });
102
103 let stream_kind = StreamKind::Broadcast(BroadcastAlgorithm::Meshsub1_1_0);
104 if info.protocols.contains(&stream_kind) {
105 dispatcher.push(P2pNetworkYamuxAction::OpenStream {
106 addr,
107 stream_id: stream_id + 2,
108 stream_kind,
109 });
110 }
111
112 let kad_state: Option<&P2pNetworkKadState> = state.substate().ok();
113 let protocol = StreamKind::Discovery(DiscoveryAlgorithm::Kademlia1_0_0);
114 if kad_state.is_some_and(|state| state.request(&peer_id).is_some())
115 && info.protocols.contains(&protocol)
116 {
117 dispatcher.push(P2pNetworkKadRequestAction::MuxReady { peer_id, addr });
118 }
119
120 Ok(())
121 }
122 }
123 }
124}