p2p/network/kad/bootstrap/
p2p_network_kad_bootstrap_reducer.rs1use std::mem;
2
3use openmina_core::{bug_condition, Substate, SubstateAccess};
4use redux::ActionWithMeta;
5
6use crate::{
7 bootstrap::{
8 P2pNetworkKadBootstrapFailedRequest, P2pNetworkKadBootstrapOngoingRequest,
9 P2pNetworkKadBootstrapRequestStat, P2pNetworkKadBootstrapSuccessfulRequest,
10 },
11 connection::outgoing::P2pConnectionOutgoingInitOpts,
12 P2pNetworkKadEffectfulAction, P2pNetworkKadRequestAction, P2pNetworkKadState,
13 P2pNetworkKademliaAction, P2pState,
14};
15
16use super::{P2pNetworkKadBootstrapAction, P2pNetworkKadBootstrapState};
17
18impl P2pNetworkKadBootstrapState {
19 pub fn reducer<Action, State>(
20 mut state_context: Substate<Action, State, P2pState>,
21 action: ActionWithMeta<P2pNetworkKadBootstrapAction>,
22 filter_addrs: bool,
23 ) -> Result<(), String>
24 where
25 State: crate::P2pStateTrait,
26 Action: crate::P2pActionTrait<State>,
27 {
28 let (action, meta) = action.split();
29
30 match action {
31 P2pNetworkKadBootstrapAction::CreateRequests => {
32 let discovery_state: &P2pNetworkKadState =
33 state_context.get_substate()?.substate()?;
34 let routing_table = &discovery_state.routing_table;
35 let bootstrap_state: &Self = state_context.get_substate()?.substate()?;
36
37 let to_request = routing_table
38 .closest_peers(&bootstrap_state.kademlia_key) .filter(|entry| !bootstrap_state.processed_peers.contains(&entry.peer_id))
40 .cloned()
41 .collect::<Vec<_>>();
42
43 let bootstrap_state: &mut Self =
44 state_context.get_substate_mut()?.substate_mut()?;
45 bootstrap_state.requests_number = to_request.len();
46 let empty = to_request.is_empty();
47
48 let dispatcher = state_context.into_dispatcher();
49 for entry in to_request {
50 dispatcher.push(P2pNetworkKadEffectfulAction::MakeRequest {
51 multiaddr: entry.addresses().clone(),
52 filter_local: filter_addrs,
53 peer_id: entry.peer_id,
54 });
55 }
56 if empty {
57 dispatcher.push(P2pNetworkKadBootstrapAction::FinalizeRequests);
58 }
59
60 Ok(())
61 }
62 P2pNetworkKadBootstrapAction::AppendRequest { request, peer_id } => {
63 let state: &mut Self = state_context.get_substate_mut()?.substate_mut()?;
64 state.requests_number -= 1;
65 if let Some(request) = request {
66 state.peer_id_req_vec.push((peer_id, request));
67 }
68 if state.peer_id_req_vec.len() == 3 || state.requests_number == 0 {
69 let dispatcher = state_context.into_dispatcher();
70 dispatcher.push(P2pNetworkKadBootstrapAction::FinalizeRequests);
71 }
72 Ok(())
73 }
74 P2pNetworkKadBootstrapAction::FinalizeRequests => {
75 let state: &mut Self = state_context.get_substate_mut()?.substate_mut()?;
76 for (peer_id, request) in mem::take(&mut state.peer_id_req_vec) {
77 state.processed_peers.insert(peer_id);
78 let address =
79 P2pConnectionOutgoingInitOpts::LibP2P((peer_id, request.addr).into());
80 state
81 .stats
82 .requests
83 .push(P2pNetworkKadBootstrapRequestStat::Ongoing(
84 P2pNetworkKadBootstrapOngoingRequest {
85 peer_id,
86 address,
87 start: meta.time(),
88 },
89 ));
90 state.requests.insert(peer_id, request);
91 }
92
93 let (dispatcher, state) = state_context.into_dispatcher_and_state();
94 let bootstrap_state: &P2pNetworkKadBootstrapState = state.substate()?;
95 let discovery_state: &P2pNetworkKadState = state.substate()?;
96 let key = bootstrap_state.key;
97
98 if bootstrap_state.requests.is_empty() {
99 dispatcher.push(P2pNetworkKademliaAction::BootstrapFinished {});
100 } else {
101 bootstrap_state
102 .requests
103 .iter()
104 .filter_map(|(peer_id, req)| {
105 (!discovery_state.requests.contains_key(peer_id)).then_some(
106 P2pNetworkKadRequestAction::New {
107 peer_id: *peer_id,
108 addr: req.addr,
109 key,
110 },
111 )
112 })
113 .for_each(|action| dispatcher.push(action));
114 }
115
116 Ok(())
117 }
118 P2pNetworkKadBootstrapAction::RequestDone {
119 peer_id,
120 closest_peers,
121 } => {
122 let state: &mut P2pNetworkKadBootstrapState =
123 state_context.get_substate_mut()?.substate_mut()?;
124 let Some(req) = state.requests.remove(&peer_id) else {
125 bug_condition!("cannot find request for peer {peer_id}");
126 return Ok(());
127 };
128 state.successful_requests += 1;
129 let address = P2pConnectionOutgoingInitOpts::LibP2P((peer_id, req.addr).into());
130
131 if let Some(request_stats) =
132 state.stats.requests.iter_mut().rev().find(|req_stat| {
133 matches!(
134 req_stat,
135 P2pNetworkKadBootstrapRequestStat::Ongoing(
136 P2pNetworkKadBootstrapOngoingRequest {
137 address: a,
138 ..
139 },
140 ) if a == &address
141 )
142 })
143 {
144 *request_stats = P2pNetworkKadBootstrapRequestStat::Successful(
145 P2pNetworkKadBootstrapSuccessfulRequest {
146 peer_id,
147 address,
148 start: req.time,
149 finish: meta.time(),
150 closest_peers,
151 },
152 );
153 } else {
154 bug_condition!("cannot find stats for request {req:?}");
155 };
156
157 if state.successful_requests < 20 {
158 let dispatcher = state_context.into_dispatcher();
159 dispatcher.push(P2pNetworkKadBootstrapAction::CreateRequests);
160 }
161
162 Ok(())
163 }
164 P2pNetworkKadBootstrapAction::RequestError { peer_id, error } => {
165 let bootstrap_state: &mut P2pNetworkKadBootstrapState =
166 state_context.get_substate_mut()?.substate_mut()?;
167
168 let Some(req) = bootstrap_state.requests.remove(&peer_id) else {
169 bug_condition!("cannot find request for peer {peer_id}");
170 return Ok(());
171 };
172
173 let address = P2pConnectionOutgoingInitOpts::LibP2P((peer_id, req.addr).into());
174 if let Some(request_stats) =
175 bootstrap_state
176 .stats
177 .requests
178 .iter_mut()
179 .rev()
180 .find(|req_stat| {
181 matches!(
182 req_stat,
183 P2pNetworkKadBootstrapRequestStat::Ongoing(
184 P2pNetworkKadBootstrapOngoingRequest {
185 address: a,
186 ..
187 },
188 ) if a == &address
189 )
190 })
191 {
192 *request_stats = P2pNetworkKadBootstrapRequestStat::Failed(
193 P2pNetworkKadBootstrapFailedRequest {
194 peer_id,
195 address,
196 start: req.time,
197 finish: meta.time(),
198 error,
199 },
200 );
201 } else {
202 bug_condition!("cannot find stats for request {req:?}");
203 };
204
205 let dispatcher = state_context.into_dispatcher();
206 dispatcher.push(P2pNetworkKadBootstrapAction::CreateRequests);
207 Ok(())
208 }
209 }
210 }
211}