p2p/network/kad/request/
p2p_network_kad_request_reducer.rs

1use openmina_core::{bug_condition, requests::RpcId, Substate, SubstateAccess};
2use redux::{ActionWithMeta, Dispatcher};
3
4use crate::{
5    connection::outgoing::P2pConnectionOutgoingAction, ConnectionAddr,
6    P2pNetworkConnectionMuxState, P2pNetworkKadBootstrapAction, P2pNetworkKadEffectfulAction,
7    P2pNetworkKadState, P2pNetworkKademliaRpcRequest, P2pNetworkKademliaStreamAction,
8    P2pNetworkYamuxAction, P2pPeerState, P2pState, PeerId,
9};
10
11use super::{P2pNetworkKadRequestAction, P2pNetworkKadRequestState, P2pNetworkKadRequestStatus};
12
13impl P2pNetworkKadRequestState {
14    pub fn reducer<State, Action>(
15        mut state_context: Substate<Action, State, P2pNetworkKadState>,
16        action: ActionWithMeta<P2pNetworkKadRequestAction>,
17    ) -> Result<(), String>
18    where
19        State: SubstateAccess<P2pNetworkKadState> + SubstateAccess<P2pState>,
20        Action: crate::P2pActionTrait<State>,
21    {
22        let (action, _meta) = action.split();
23        let state = state_context.get_substate_mut()?;
24        let filter_local_addrs = state.filter_addrs;
25
26        let request_state = match action {
27            P2pNetworkKadRequestAction::New { peer_id, addr, key } => state
28                .create_request(addr, peer_id, key)
29                .map_err(|_request| format!("kademlia request to {addr} is already in progress"))?,
30            P2pNetworkKadRequestAction::Prune { peer_id } => {
31                return state
32                    .requests
33                    .remove(&peer_id)
34                    .map(|_| ())
35                    .ok_or_else(|| "kademlia request for {peer_id} is not found".to_owned());
36            }
37            _ => state
38                .requests
39                .get_mut(action.peer_id())
40                .ok_or_else(|| format!("kademlia request for {} is not found", action.peer_id()))?,
41        };
42
43        match action {
44            P2pNetworkKadRequestAction::New { peer_id, addr, .. } => {
45                let (dispatcher, state) = state_context.into_dispatcher_and_state();
46                let p2p_state: &P2pState = state.substate()?;
47                let peer_state = p2p_state.peers.get(&peer_id);
48
49                let on_initialize_connection = |dispatcher: &mut Dispatcher<Action, State>| {
50                    let opts = crate::connection::outgoing::P2pConnectionOutgoingInitOpts::LibP2P(
51                        (peer_id, addr).into(),
52                    );
53                    let callback = redux::callback!(
54                        on_p2p_connection_outgoing_kad_connection_success((peer_id: PeerId, _rpc_id: Option<RpcId>)) -> crate::P2pAction {
55                            P2pNetworkKadRequestAction::PeerIsConnecting { peer_id }
56                        }
57                    );
58                    dispatcher.push(P2pConnectionOutgoingAction::Init {
59                        opts,
60                        rpc_id: None,
61                        on_success: Some(callback),
62                    });
63                    Ok(())
64                };
65
66                let on_connection_in_progress = |dispatcher: &mut Dispatcher<Action, State>| {
67                    dispatcher.push(P2pNetworkKadRequestAction::PeerIsConnecting { peer_id });
68                    Ok(())
69                };
70
71                let on_connection_established = |dispatcher: &mut Dispatcher<Action, State>| {
72                    let Some((_, conn_state)) = p2p_state.network.scheduler.find_peer(&peer_id)
73                    else {
74                        bug_condition!(
75                            "peer {peer_id} is connected, its network connection is {:?}",
76                            p2p_state
77                                .network
78                                .scheduler
79                                .find_peer(&peer_id)
80                                .map(|(_, s)| s)
81                        );
82
83                        return Ok(());
84                    };
85                    if let Some(stream_id) = conn_state.mux.as_ref().and_then(
86                        |P2pNetworkConnectionMuxState::Yamux(yamux)| {
87                            yamux.next_stream_id(
88                                crate::YamuxStreamKind::Kademlia,
89                                conn_state.incoming,
90                            )
91                        },
92                    ) {
93                        // multiplexing is ready, open a stream
94                        // TODO: add callbacks
95                        dispatcher.push(P2pNetworkYamuxAction::OpenStream {
96                            addr: crate::ConnectionAddr {
97                                sock_addr: addr,
98                                incoming: false,
99                            },
100                            stream_id,
101                            stream_kind: crate::token::StreamKind::Discovery(
102                                crate::token::DiscoveryAlgorithm::Kademlia1_0_0,
103                            ),
104                        });
105                        dispatcher.push(P2pNetworkKadRequestAction::StreamIsCreating {
106                            peer_id,
107                            stream_id,
108                        });
109                    } else {
110                        // connection is in progress, so wait for multiplexing to be ready
111                        dispatcher.push(P2pNetworkKadRequestAction::PeerIsConnecting { peer_id });
112                    }
113                    Ok(())
114                };
115
116                match peer_state {
117                    None => on_initialize_connection(dispatcher),
118                    Some(P2pPeerState { status, .. }) if !status.is_connected_or_connecting() => {
119                        on_initialize_connection(dispatcher)
120                    }
121                    Some(P2pPeerState { status, .. }) if status.as_ready().is_none() => {
122                        on_connection_in_progress(dispatcher)
123                    }
124                    Some(P2pPeerState { status, .. }) if status.as_ready().is_some() => {
125                        on_connection_established(dispatcher)
126                    }
127                    _ => {
128                        bug_condition!("state must be either ready or not ready, peer {peer_id}");
129                        Ok(())
130                    }
131                }
132            }
133            P2pNetworkKadRequestAction::PeerIsConnecting { .. } => {
134                request_state.status = P2pNetworkKadRequestStatus::WaitingForConnection;
135                Ok(())
136            }
137            P2pNetworkKadRequestAction::MuxReady { peer_id, addr } => {
138                let (dispatcher, state) = state_context.into_dispatcher_and_state();
139                let p2p_state: &P2pState = state.substate()?;
140
141                let stream_id = p2p_state
142                    .network
143                    .scheduler
144                    .connections
145                    .get(&addr)
146                    .ok_or_else(|| format!("connection with {addr} not found"))
147                    .and_then(|conn| {
148                        conn.mux
149                            .as_ref()
150                            .map(|mux| (mux, conn.incoming))
151                            .ok_or_else(|| format!("multiplexing is not ready for {addr}"))
152                    })
153                    .and_then(|(P2pNetworkConnectionMuxState::Yamux(yamux), incoming)| {
154                        yamux
155                            .next_stream_id(crate::YamuxStreamKind::Kademlia, incoming)
156                            .ok_or_else(|| format!("cannot get next stream for {addr}"))
157                    })?;
158
159                // TODO: add callbacks
160                dispatcher.push(P2pNetworkYamuxAction::OpenStream {
161                    addr,
162                    stream_id,
163                    stream_kind: crate::token::StreamKind::Discovery(
164                        crate::token::DiscoveryAlgorithm::Kademlia1_0_0,
165                    ),
166                });
167                dispatcher
168                    .push(P2pNetworkKadRequestAction::StreamIsCreating { peer_id, stream_id });
169                Ok(())
170            }
171            P2pNetworkKadRequestAction::StreamIsCreating { stream_id, .. } => {
172                request_state.status = P2pNetworkKadRequestStatus::WaitingForKadStream(stream_id);
173
174                Ok(())
175            }
176            P2pNetworkKadRequestAction::StreamReady {
177                peer_id,
178                stream_id,
179                addr,
180                callback,
181            } => {
182                let find_node = match P2pNetworkKademliaRpcRequest::find_node(request_state.key) {
183                    Ok(find_node) => find_node,
184                    Err(error) => {
185                        bug_condition!(
186                            "P2pNetworkKadRequestAction::StreamReady invalid request key error: {error}"
187                        );
188                        return Ok(());
189                    }
190                };
191
192                let message = super::super::Message::from(&find_node);
193                request_state.status = quick_protobuf::serialize_into_vec(&message).map_or_else(
194                    |e| {
195                        super::P2pNetworkKadRequestStatus::Error(format!(
196                            "error serializing message: {e}"
197                        ))
198                    },
199                    super::P2pNetworkKadRequestStatus::Request,
200                );
201
202                let dispatcher = state_context.into_dispatcher();
203                dispatcher.push_callback(callback, (addr, peer_id, stream_id, find_node));
204                Ok(())
205            }
206            P2pNetworkKadRequestAction::RequestSent { .. } => {
207                request_state.status = P2pNetworkKadRequestStatus::WaitingForReply;
208                Ok(())
209            }
210            P2pNetworkKadRequestAction::ReplyReceived {
211                peer_id,
212                stream_id,
213                data,
214            } => {
215                request_state.status = P2pNetworkKadRequestStatus::Reply(data.clone());
216                let addr = request_state.addr;
217
218                let bootstrap_request = state
219                    .bootstrap_state()
220                    .and_then(|bootstrap_state| bootstrap_state.request(&peer_id))
221                    .is_some();
222
223                let closest_peers = bootstrap_request
224                    .then(|| state.latest_request_peers.clone())
225                    .unwrap_or_default();
226
227                let dispatcher = state_context.into_dispatcher();
228
229                if bootstrap_request {
230                    dispatcher.push(P2pNetworkKadBootstrapAction::RequestDone {
231                        peer_id,
232                        closest_peers,
233                    });
234                }
235
236                for entry in data {
237                    let peer_id = entry.peer_id;
238
239                    for multiaddr in entry.addresses().iter() {
240                        let multiaddr = multiaddr.clone();
241                        dispatcher.push(P2pNetworkKadEffectfulAction::Discovered {
242                            multiaddr,
243                            filter_local: filter_local_addrs,
244                            peer_id,
245                        });
246                    }
247                }
248                dispatcher.push(P2pNetworkKademliaStreamAction::Close {
249                    addr: ConnectionAddr {
250                        sock_addr: addr,
251                        incoming: false,
252                    },
253                    peer_id,
254                    stream_id,
255                });
256                dispatcher.push(P2pNetworkKadRequestAction::Prune { peer_id });
257                Ok(())
258            }
259            P2pNetworkKadRequestAction::Prune { .. } => {
260                bug_condition!("Handled above shouldn't happen");
261                Ok(())
262            }
263            P2pNetworkKadRequestAction::Error { peer_id, error } => {
264                request_state.status = P2pNetworkKadRequestStatus::Error(error.clone());
265                let bootstrap_request = state
266                    .bootstrap_state()
267                    .and_then(|bootstrap_state| bootstrap_state.request(&peer_id))
268                    .is_some();
269
270                let dispatcher = state_context.into_dispatcher();
271
272                if bootstrap_request {
273                    dispatcher.push(P2pNetworkKadBootstrapAction::RequestError { peer_id, error });
274                }
275
276                dispatcher.push(P2pNetworkKadRequestAction::Prune { peer_id });
277                Ok(())
278            }
279        }
280    }
281}