p2p/network/kad/bootstrap/
p2p_network_kad_bootstrap_reducer.rs

1use std::mem;
2
3use openmina_core::{bug_condition, Substate, SubstateAccess};
4use redux::ActionWithMeta;
5
6use crate::{
7    bootstrap::{
8        P2pNetworkKadBootstrapFailedRequest, P2pNetworkKadBootstrapOngoingRequest,
9        P2pNetworkKadBootstrapRequestStat, P2pNetworkKadBootstrapSuccessfulRequest,
10    },
11    connection::outgoing::P2pConnectionOutgoingInitOpts,
12    P2pNetworkKadEffectfulAction, P2pNetworkKadRequestAction, P2pNetworkKadState,
13    P2pNetworkKademliaAction, P2pState,
14};
15
16use super::{P2pNetworkKadBootstrapAction, P2pNetworkKadBootstrapState};
17
18impl P2pNetworkKadBootstrapState {
19    pub fn reducer<Action, State>(
20        mut state_context: Substate<Action, State, P2pState>,
21        action: ActionWithMeta<P2pNetworkKadBootstrapAction>,
22        filter_addrs: bool,
23    ) -> Result<(), String>
24    where
25        State: crate::P2pStateTrait,
26        Action: crate::P2pActionTrait<State>,
27    {
28        let (action, meta) = action.split();
29
30        match action {
31            P2pNetworkKadBootstrapAction::CreateRequests => {
32                let discovery_state: &P2pNetworkKadState =
33                    state_context.get_substate()?.substate()?;
34                let routing_table = &discovery_state.routing_table;
35                let bootstrap_state: &Self = state_context.get_substate()?.substate()?;
36
37                let to_request = routing_table
38                    .closest_peers(&bootstrap_state.kademlia_key) // for the next request we take closest peer
39                    .filter(|entry| !bootstrap_state.processed_peers.contains(&entry.peer_id))
40                    .cloned()
41                    .collect::<Vec<_>>();
42
43                let bootstrap_state: &mut Self =
44                    state_context.get_substate_mut()?.substate_mut()?;
45                bootstrap_state.requests_number = to_request.len();
46                let empty = to_request.is_empty();
47
48                let dispatcher = state_context.into_dispatcher();
49                for entry in to_request {
50                    dispatcher.push(P2pNetworkKadEffectfulAction::MakeRequest {
51                        multiaddr: entry.addresses().clone(),
52                        filter_local: filter_addrs,
53                        peer_id: entry.peer_id,
54                    });
55                }
56                if empty {
57                    dispatcher.push(P2pNetworkKadBootstrapAction::FinalizeRequests);
58                }
59
60                Ok(())
61            }
62            P2pNetworkKadBootstrapAction::AppendRequest { request, peer_id } => {
63                let state: &mut Self = state_context.get_substate_mut()?.substate_mut()?;
64                state.requests_number -= 1;
65                if let Some(request) = request {
66                    state.peer_id_req_vec.push((peer_id, request));
67                }
68                if state.peer_id_req_vec.len() == 3 || state.requests_number == 0 {
69                    let dispatcher = state_context.into_dispatcher();
70                    dispatcher.push(P2pNetworkKadBootstrapAction::FinalizeRequests);
71                }
72                Ok(())
73            }
74            P2pNetworkKadBootstrapAction::FinalizeRequests => {
75                let state: &mut Self = state_context.get_substate_mut()?.substate_mut()?;
76                for (peer_id, request) in mem::take(&mut state.peer_id_req_vec) {
77                    state.processed_peers.insert(peer_id);
78                    let address =
79                        P2pConnectionOutgoingInitOpts::LibP2P((peer_id, request.addr).into());
80                    state
81                        .stats
82                        .requests
83                        .push(P2pNetworkKadBootstrapRequestStat::Ongoing(
84                            P2pNetworkKadBootstrapOngoingRequest {
85                                peer_id,
86                                address,
87                                start: meta.time(),
88                            },
89                        ));
90                    state.requests.insert(peer_id, request);
91                }
92
93                let (dispatcher, state) = state_context.into_dispatcher_and_state();
94                let bootstrap_state: &P2pNetworkKadBootstrapState = state.substate()?;
95                let discovery_state: &P2pNetworkKadState = state.substate()?;
96                let key = bootstrap_state.key;
97
98                if bootstrap_state.requests.is_empty() {
99                    dispatcher.push(P2pNetworkKademliaAction::BootstrapFinished {});
100                } else {
101                    bootstrap_state
102                        .requests
103                        .iter()
104                        .filter_map(|(peer_id, req)| {
105                            (!discovery_state.requests.contains_key(peer_id)).then_some(
106                                P2pNetworkKadRequestAction::New {
107                                    peer_id: *peer_id,
108                                    addr: req.addr,
109                                    key,
110                                },
111                            )
112                        })
113                        .for_each(|action| dispatcher.push(action));
114                }
115
116                Ok(())
117            }
118            P2pNetworkKadBootstrapAction::RequestDone {
119                peer_id,
120                closest_peers,
121            } => {
122                let state: &mut P2pNetworkKadBootstrapState =
123                    state_context.get_substate_mut()?.substate_mut()?;
124                let Some(req) = state.requests.remove(&peer_id) else {
125                    bug_condition!("cannot find request for peer {peer_id}");
126                    return Ok(());
127                };
128                state.successful_requests += 1;
129                let address = P2pConnectionOutgoingInitOpts::LibP2P((peer_id, req.addr).into());
130
131                if let Some(request_stats) =
132                    state.stats.requests.iter_mut().rev().find(|req_stat| {
133                        matches!(
134                            req_stat,
135                            P2pNetworkKadBootstrapRequestStat::Ongoing(
136                                P2pNetworkKadBootstrapOngoingRequest {
137                                    address: a,
138                                    ..
139                                },
140                            ) if a == &address
141                        )
142                    })
143                {
144                    *request_stats = P2pNetworkKadBootstrapRequestStat::Successful(
145                        P2pNetworkKadBootstrapSuccessfulRequest {
146                            peer_id,
147                            address,
148                            start: req.time,
149                            finish: meta.time(),
150                            closest_peers,
151                        },
152                    );
153                } else {
154                    bug_condition!("cannot find stats for request {req:?}");
155                };
156
157                if state.successful_requests < 20 {
158                    let dispatcher = state_context.into_dispatcher();
159                    dispatcher.push(P2pNetworkKadBootstrapAction::CreateRequests);
160                }
161
162                Ok(())
163            }
164            P2pNetworkKadBootstrapAction::RequestError { peer_id, error } => {
165                let bootstrap_state: &mut P2pNetworkKadBootstrapState =
166                    state_context.get_substate_mut()?.substate_mut()?;
167
168                let Some(req) = bootstrap_state.requests.remove(&peer_id) else {
169                    bug_condition!("cannot find request for peer {peer_id}");
170                    return Ok(());
171                };
172
173                let address = P2pConnectionOutgoingInitOpts::LibP2P((peer_id, req.addr).into());
174                if let Some(request_stats) =
175                    bootstrap_state
176                        .stats
177                        .requests
178                        .iter_mut()
179                        .rev()
180                        .find(|req_stat| {
181                            matches!(
182                                req_stat,
183                                P2pNetworkKadBootstrapRequestStat::Ongoing(
184                                    P2pNetworkKadBootstrapOngoingRequest {
185                                        address: a,
186                                        ..
187                                    },
188                                ) if a == &address
189                            )
190                        })
191                {
192                    *request_stats = P2pNetworkKadBootstrapRequestStat::Failed(
193                        P2pNetworkKadBootstrapFailedRequest {
194                            peer_id,
195                            address,
196                            start: req.time,
197                            finish: meta.time(),
198                            error,
199                        },
200                    );
201                } else {
202                    bug_condition!("cannot find stats for request {req:?}");
203                };
204
205                let dispatcher = state_context.into_dispatcher();
206                dispatcher.push(P2pNetworkKadBootstrapAction::CreateRequests);
207                Ok(())
208            }
209        }
210    }
211}