p2p/network/kad/
p2p_network_kad_reducer.rs1use crate::{P2pLimits, P2pNetworkKadEntry};
2use openmina_core::{debug, Substate, SubstateAccess};
3use redux::ActionWithMeta;
4
5use super::{
6 bootstrap::P2pNetworkKadBootstrapState,
7 request::P2pNetworkKadRequestState,
8 stream::{P2pNetworkKadStreamState, P2pNetworkKademliaStreamAction},
9 P2pNetworkKadAction, P2pNetworkKadBootstrapAction, P2pNetworkKadKey,
10 P2pNetworkKadLatestRequestPeerKind, P2pNetworkKadRequestAction, P2pNetworkKadState,
11 P2pNetworkKadStatus, P2pNetworkKademliaAction, P2pNetworkKademliaRpcReply,
12};
13
14impl super::P2pNetworkKadState {
15 pub fn reducer<State, Action>(
16 mut state_context: Substate<Action, State, Self>,
17 action: ActionWithMeta<P2pNetworkKadAction>,
18 limits: &P2pLimits,
19 ) -> Result<(), String>
20 where
21 State: crate::P2pStateTrait,
22 Action: crate::P2pActionTrait<State>,
23 {
24 let state = state_context.get_substate_mut()?;
25 let (action, meta) = action.split();
26 match action {
27 P2pNetworkKadAction::System(action) => P2pNetworkKadState::system_reducer(
28 Substate::from_compatible_substate(state_context),
29 meta.with_action(action),
30 ),
31 P2pNetworkKadAction::Bootstrap(action) => {
32 let filter_addrs = state.filter_addrs;
33 P2pNetworkKadBootstrapState::reducer(
34 Substate::from_compatible_substate(state_context),
35 meta.with_action(action),
36 filter_addrs,
37 )
38 }
39 P2pNetworkKadAction::Request(request) => {
40 P2pNetworkKadRequestState::reducer(state_context, meta.with_action(request))
41 }
42 P2pNetworkKadAction::Stream(action) => {
43 P2pNetworkKadStreamState::reducer(state_context, meta.with_action(action), limits)
44 }
45 }
46 }
47
48 pub fn system_reducer<State, Action>(
49 mut state_context: Substate<Action, State, Self>,
50 action: ActionWithMeta<P2pNetworkKademliaAction>,
51 ) -> Result<(), String>
52 where
53 State: SubstateAccess<Self>,
54 Action: crate::P2pActionTrait<State>,
55 {
56 let state = state_context.get_substate_mut()?;
57
58 let (action, meta) = action.split();
59 match (&mut state.status, action) {
60 (
61 _,
62 P2pNetworkKademliaAction::AnswerFindNodeRequest {
63 addr,
64 peer_id,
65 stream_id,
66 key,
67 },
68 ) => {
69 let kad_key = P2pNetworkKadKey::from(key);
70 let closer_peers: Vec<_> =
71 state.routing_table.find_node(&kad_key).cloned().collect();
72 debug!(meta.time(); "found {} peers", closer_peers.len());
73 let message = P2pNetworkKademliaRpcReply::FindNode { closer_peers };
74
75 let dispatcher = state_context.into_dispatcher();
76 dispatcher.push(P2pNetworkKademliaStreamAction::SendResponse {
77 addr,
78 peer_id,
79 stream_id,
80 data: message,
81 });
82 Ok(())
83 }
84 (
85 _,
86 P2pNetworkKademliaAction::UpdateFindNodeRequest {
87 closest_peers,
88 peer_id,
89 stream_id,
90 ..
91 },
92 ) => {
93 let mut latest_request_peers = Vec::new();
94 for entry in &closest_peers {
95 let kind = match state.routing_table.insert(entry.clone()) {
96 Ok(true) => P2pNetworkKadLatestRequestPeerKind::New,
97 Ok(false) => P2pNetworkKadLatestRequestPeerKind::Existing,
98 Err(_) => P2pNetworkKadLatestRequestPeerKind::Discarded,
99 };
100 latest_request_peers.push((entry.peer_id, kind));
101 }
102 state.latest_request_peers = latest_request_peers.into();
103
104 let dispatcher = state_context.into_dispatcher();
105 dispatcher.push(P2pNetworkKadRequestAction::ReplyReceived {
106 peer_id,
107 stream_id,
108 data: closest_peers,
109 });
110
111 Ok(())
112 }
113 (_, P2pNetworkKademliaAction::StartBootstrap { key }) => {
114 state.status = P2pNetworkKadStatus::Bootstrapping(
115 P2pNetworkKadBootstrapState::new(key).map_err(|k| k.to_string())?,
116 );
117
118 if state
119 .bootstrap_state()
120 .is_some_and(|bootstrap_state| bootstrap_state.requests.len() < super::ALPHA)
121 {
122 let dispatcher = state_context.into_dispatcher();
123 dispatcher.push(P2pNetworkKadBootstrapAction::CreateRequests);
124 }
125
126 Ok(())
127 }
128 (
129 P2pNetworkKadStatus::Bootstrapping(bootstrap_state),
130 P2pNetworkKademliaAction::BootstrapFinished {},
131 ) => {
132 state.status = P2pNetworkKadStatus::Bootstrapped {
133 time: meta.time(),
134 stats: bootstrap_state.stats.clone(),
135 };
136 Ok(())
137 }
138 (_, P2pNetworkKademliaAction::UpdateRoutingTable { peer_id, addrs }) => {
139 let _ = state.routing_table.insert(
140 P2pNetworkKadEntry::new(peer_id, addrs.clone()).map_err(|e| e.to_string())?,
141 );
142 Ok(())
143 }
144 (state, action) => Err(format!("invalid action {action:?} for state {state:?}")),
145 }
146 }
147}