1use openmina_core::{bug_condition, requests::RpcId, Substate, SubstateAccess};
2use redux::{ActionWithMeta, Dispatcher};
3
4use crate::{
5 connection::outgoing::P2pConnectionOutgoingAction, ConnectionAddr,
6 P2pNetworkConnectionMuxState, P2pNetworkKadBootstrapAction, P2pNetworkKadEffectfulAction,
7 P2pNetworkKadState, P2pNetworkKademliaRpcRequest, P2pNetworkKademliaStreamAction,
8 P2pNetworkYamuxAction, P2pPeerState, P2pState, PeerId,
9};
10
11use super::{P2pNetworkKadRequestAction, P2pNetworkKadRequestState, P2pNetworkKadRequestStatus};
12
13impl P2pNetworkKadRequestState {
14 pub fn reducer<State, Action>(
15 mut state_context: Substate<Action, State, P2pNetworkKadState>,
16 action: ActionWithMeta<P2pNetworkKadRequestAction>,
17 ) -> Result<(), String>
18 where
19 State: SubstateAccess<P2pNetworkKadState> + SubstateAccess<P2pState>,
20 Action: crate::P2pActionTrait<State>,
21 {
22 let (action, _meta) = action.split();
23 let state = state_context.get_substate_mut()?;
24 let filter_local_addrs = state.filter_addrs;
25
26 let request_state = match action {
27 P2pNetworkKadRequestAction::New { peer_id, addr, key } => state
28 .create_request(addr, peer_id, key)
29 .map_err(|_request| format!("kademlia request to {addr} is already in progress"))?,
30 P2pNetworkKadRequestAction::Prune { peer_id } => {
31 return state
32 .requests
33 .remove(&peer_id)
34 .map(|_| ())
35 .ok_or_else(|| "kademlia request for {peer_id} is not found".to_owned());
36 }
37 _ => state
38 .requests
39 .get_mut(action.peer_id())
40 .ok_or_else(|| format!("kademlia request for {} is not found", action.peer_id()))?,
41 };
42
43 match action {
44 P2pNetworkKadRequestAction::New { peer_id, addr, .. } => {
45 let (dispatcher, state) = state_context.into_dispatcher_and_state();
46 let p2p_state: &P2pState = state.substate()?;
47 let peer_state = p2p_state.peers.get(&peer_id);
48
49 let on_initialize_connection = |dispatcher: &mut Dispatcher<Action, State>| {
50 let opts = crate::connection::outgoing::P2pConnectionOutgoingInitOpts::LibP2P(
51 (peer_id, addr).into(),
52 );
53 let callback = redux::callback!(
54 on_p2p_connection_outgoing_kad_connection_success((peer_id: PeerId, _rpc_id: Option<RpcId>)) -> crate::P2pAction {
55 P2pNetworkKadRequestAction::PeerIsConnecting { peer_id }
56 }
57 );
58 dispatcher.push(P2pConnectionOutgoingAction::Init {
59 opts,
60 rpc_id: None,
61 on_success: Some(callback),
62 });
63 Ok(())
64 };
65
66 let on_connection_in_progress = |dispatcher: &mut Dispatcher<Action, State>| {
67 dispatcher.push(P2pNetworkKadRequestAction::PeerIsConnecting { peer_id });
68 Ok(())
69 };
70
71 let on_connection_established = |dispatcher: &mut Dispatcher<Action, State>| {
72 let Some((_, conn_state)) = p2p_state.network.scheduler.find_peer(&peer_id)
73 else {
74 bug_condition!(
75 "peer {peer_id} is connected, its network connection is {:?}",
76 p2p_state
77 .network
78 .scheduler
79 .find_peer(&peer_id)
80 .map(|(_, s)| s)
81 );
82
83 return Ok(());
84 };
85 if let Some(stream_id) = conn_state.mux.as_ref().and_then(
86 |P2pNetworkConnectionMuxState::Yamux(yamux)| {
87 yamux.next_stream_id(
88 crate::YamuxStreamKind::Kademlia,
89 conn_state.incoming,
90 )
91 },
92 ) {
93 dispatcher.push(P2pNetworkYamuxAction::OpenStream {
96 addr: crate::ConnectionAddr {
97 sock_addr: addr,
98 incoming: false,
99 },
100 stream_id,
101 stream_kind: crate::token::StreamKind::Discovery(
102 crate::token::DiscoveryAlgorithm::Kademlia1_0_0,
103 ),
104 });
105 dispatcher.push(P2pNetworkKadRequestAction::StreamIsCreating {
106 peer_id,
107 stream_id,
108 });
109 } else {
110 dispatcher.push(P2pNetworkKadRequestAction::PeerIsConnecting { peer_id });
112 }
113 Ok(())
114 };
115
116 match peer_state {
117 None => on_initialize_connection(dispatcher),
118 Some(P2pPeerState { status, .. }) if !status.is_connected_or_connecting() => {
119 on_initialize_connection(dispatcher)
120 }
121 Some(P2pPeerState { status, .. }) if status.as_ready().is_none() => {
122 on_connection_in_progress(dispatcher)
123 }
124 Some(P2pPeerState { status, .. }) if status.as_ready().is_some() => {
125 on_connection_established(dispatcher)
126 }
127 _ => {
128 bug_condition!("state must be either ready or not ready, peer {peer_id}");
129 Ok(())
130 }
131 }
132 }
133 P2pNetworkKadRequestAction::PeerIsConnecting { .. } => {
134 request_state.status = P2pNetworkKadRequestStatus::WaitingForConnection;
135 Ok(())
136 }
137 P2pNetworkKadRequestAction::MuxReady { peer_id, addr } => {
138 let (dispatcher, state) = state_context.into_dispatcher_and_state();
139 let p2p_state: &P2pState = state.substate()?;
140
141 let stream_id = p2p_state
142 .network
143 .scheduler
144 .connections
145 .get(&addr)
146 .ok_or_else(|| format!("connection with {addr} not found"))
147 .and_then(|conn| {
148 conn.mux
149 .as_ref()
150 .map(|mux| (mux, conn.incoming))
151 .ok_or_else(|| format!("multiplexing is not ready for {addr}"))
152 })
153 .and_then(|(P2pNetworkConnectionMuxState::Yamux(yamux), incoming)| {
154 yamux
155 .next_stream_id(crate::YamuxStreamKind::Kademlia, incoming)
156 .ok_or_else(|| format!("cannot get next stream for {addr}"))
157 })?;
158
159 dispatcher.push(P2pNetworkYamuxAction::OpenStream {
161 addr,
162 stream_id,
163 stream_kind: crate::token::StreamKind::Discovery(
164 crate::token::DiscoveryAlgorithm::Kademlia1_0_0,
165 ),
166 });
167 dispatcher
168 .push(P2pNetworkKadRequestAction::StreamIsCreating { peer_id, stream_id });
169 Ok(())
170 }
171 P2pNetworkKadRequestAction::StreamIsCreating { stream_id, .. } => {
172 request_state.status = P2pNetworkKadRequestStatus::WaitingForKadStream(stream_id);
173
174 Ok(())
175 }
176 P2pNetworkKadRequestAction::StreamReady {
177 peer_id,
178 stream_id,
179 addr,
180 callback,
181 } => {
182 let find_node = match P2pNetworkKademliaRpcRequest::find_node(request_state.key) {
183 Ok(find_node) => find_node,
184 Err(error) => {
185 bug_condition!(
186 "P2pNetworkKadRequestAction::StreamReady invalid request key error: {error}"
187 );
188 return Ok(());
189 }
190 };
191
192 let message = super::super::Message::from(&find_node);
193 request_state.status = quick_protobuf::serialize_into_vec(&message).map_or_else(
194 |e| {
195 super::P2pNetworkKadRequestStatus::Error(format!(
196 "error serializing message: {e}"
197 ))
198 },
199 super::P2pNetworkKadRequestStatus::Request,
200 );
201
202 let dispatcher = state_context.into_dispatcher();
203 dispatcher.push_callback(callback, (addr, peer_id, stream_id, find_node));
204 Ok(())
205 }
206 P2pNetworkKadRequestAction::RequestSent { .. } => {
207 request_state.status = P2pNetworkKadRequestStatus::WaitingForReply;
208 Ok(())
209 }
210 P2pNetworkKadRequestAction::ReplyReceived {
211 peer_id,
212 stream_id,
213 data,
214 } => {
215 request_state.status = P2pNetworkKadRequestStatus::Reply(data.clone());
216 let addr = request_state.addr;
217
218 let bootstrap_request = state
219 .bootstrap_state()
220 .and_then(|bootstrap_state| bootstrap_state.request(&peer_id))
221 .is_some();
222
223 let closest_peers = bootstrap_request
224 .then(|| state.latest_request_peers.clone())
225 .unwrap_or_default();
226
227 let dispatcher = state_context.into_dispatcher();
228
229 if bootstrap_request {
230 dispatcher.push(P2pNetworkKadBootstrapAction::RequestDone {
231 peer_id,
232 closest_peers,
233 });
234 }
235
236 for entry in data {
237 let peer_id = entry.peer_id;
238
239 for multiaddr in entry.addresses().iter() {
240 let multiaddr = multiaddr.clone();
241 dispatcher.push(P2pNetworkKadEffectfulAction::Discovered {
242 multiaddr,
243 filter_local: filter_local_addrs,
244 peer_id,
245 });
246 }
247 }
248 dispatcher.push(P2pNetworkKademliaStreamAction::Close {
249 addr: ConnectionAddr {
250 sock_addr: addr,
251 incoming: false,
252 },
253 peer_id,
254 stream_id,
255 });
256 dispatcher.push(P2pNetworkKadRequestAction::Prune { peer_id });
257 Ok(())
258 }
259 P2pNetworkKadRequestAction::Prune { .. } => {
260 bug_condition!("Handled above shouldn't happen");
261 Ok(())
262 }
263 P2pNetworkKadRequestAction::Error { peer_id, error } => {
264 request_state.status = P2pNetworkKadRequestStatus::Error(error.clone());
265 let bootstrap_request = state
266 .bootstrap_state()
267 .and_then(|bootstrap_state| bootstrap_state.request(&peer_id))
268 .is_some();
269
270 let dispatcher = state_context.into_dispatcher();
271
272 if bootstrap_request {
273 dispatcher.push(P2pNetworkKadBootstrapAction::RequestError { peer_id, error });
274 }
275
276 dispatcher.push(P2pNetworkKadRequestAction::Prune { peer_id });
277 Ok(())
278 }
279 }
280 }
281}