p2p/network/kad/
p2p_network_kad_reducer.rs

1use crate::{P2pLimits, P2pNetworkKadEntry};
2use openmina_core::{debug, Substate, SubstateAccess};
3use redux::ActionWithMeta;
4
5use super::{
6    bootstrap::P2pNetworkKadBootstrapState,
7    request::P2pNetworkKadRequestState,
8    stream::{P2pNetworkKadStreamState, P2pNetworkKademliaStreamAction},
9    P2pNetworkKadAction, P2pNetworkKadBootstrapAction, P2pNetworkKadKey,
10    P2pNetworkKadLatestRequestPeerKind, P2pNetworkKadRequestAction, P2pNetworkKadState,
11    P2pNetworkKadStatus, P2pNetworkKademliaAction, P2pNetworkKademliaRpcReply,
12};
13
14impl super::P2pNetworkKadState {
15    pub fn reducer<State, Action>(
16        mut state_context: Substate<Action, State, Self>,
17        action: ActionWithMeta<P2pNetworkKadAction>,
18        limits: &P2pLimits,
19    ) -> Result<(), String>
20    where
21        State: crate::P2pStateTrait,
22        Action: crate::P2pActionTrait<State>,
23    {
24        let state = state_context.get_substate_mut()?;
25        let (action, meta) = action.split();
26        match action {
27            P2pNetworkKadAction::System(action) => P2pNetworkKadState::system_reducer(
28                Substate::from_compatible_substate(state_context),
29                meta.with_action(action),
30            ),
31            P2pNetworkKadAction::Bootstrap(action) => {
32                let filter_addrs = state.filter_addrs;
33                P2pNetworkKadBootstrapState::reducer(
34                    Substate::from_compatible_substate(state_context),
35                    meta.with_action(action),
36                    filter_addrs,
37                )
38            }
39            P2pNetworkKadAction::Request(request) => {
40                P2pNetworkKadRequestState::reducer(state_context, meta.with_action(request))
41            }
42            P2pNetworkKadAction::Stream(action) => {
43                P2pNetworkKadStreamState::reducer(state_context, meta.with_action(action), limits)
44            }
45        }
46    }
47
48    pub fn system_reducer<State, Action>(
49        mut state_context: Substate<Action, State, Self>,
50        action: ActionWithMeta<P2pNetworkKademliaAction>,
51    ) -> Result<(), String>
52    where
53        State: SubstateAccess<Self>,
54        Action: crate::P2pActionTrait<State>,
55    {
56        let state = state_context.get_substate_mut()?;
57
58        let (action, meta) = action.split();
59        match (&mut state.status, action) {
60            (
61                _,
62                P2pNetworkKademliaAction::AnswerFindNodeRequest {
63                    addr,
64                    peer_id,
65                    stream_id,
66                    key,
67                },
68            ) => {
69                let kad_key = P2pNetworkKadKey::from(key);
70                let closer_peers: Vec<_> =
71                    state.routing_table.find_node(&kad_key).cloned().collect();
72                debug!(meta.time(); "found {} peers", closer_peers.len());
73                let message = P2pNetworkKademliaRpcReply::FindNode { closer_peers };
74
75                let dispatcher = state_context.into_dispatcher();
76                dispatcher.push(P2pNetworkKademliaStreamAction::SendResponse {
77                    addr,
78                    peer_id,
79                    stream_id,
80                    data: message,
81                });
82                Ok(())
83            }
84            (
85                _,
86                P2pNetworkKademliaAction::UpdateFindNodeRequest {
87                    closest_peers,
88                    peer_id,
89                    stream_id,
90                    ..
91                },
92            ) => {
93                let mut latest_request_peers = Vec::new();
94                for entry in &closest_peers {
95                    let kind = match state.routing_table.insert(entry.clone()) {
96                        Ok(true) => P2pNetworkKadLatestRequestPeerKind::New,
97                        Ok(false) => P2pNetworkKadLatestRequestPeerKind::Existing,
98                        Err(_) => P2pNetworkKadLatestRequestPeerKind::Discarded,
99                    };
100                    latest_request_peers.push((entry.peer_id, kind));
101                }
102                state.latest_request_peers = latest_request_peers.into();
103
104                let dispatcher = state_context.into_dispatcher();
105                dispatcher.push(P2pNetworkKadRequestAction::ReplyReceived {
106                    peer_id,
107                    stream_id,
108                    data: closest_peers,
109                });
110
111                Ok(())
112            }
113            (_, P2pNetworkKademliaAction::StartBootstrap { key }) => {
114                state.status = P2pNetworkKadStatus::Bootstrapping(
115                    P2pNetworkKadBootstrapState::new(key).map_err(|k| k.to_string())?,
116                );
117
118                if state
119                    .bootstrap_state()
120                    .is_some_and(|bootstrap_state| bootstrap_state.requests.len() < super::ALPHA)
121                {
122                    let dispatcher = state_context.into_dispatcher();
123                    dispatcher.push(P2pNetworkKadBootstrapAction::CreateRequests);
124                }
125
126                Ok(())
127            }
128            (
129                P2pNetworkKadStatus::Bootstrapping(bootstrap_state),
130                P2pNetworkKademliaAction::BootstrapFinished {},
131            ) => {
132                state.status = P2pNetworkKadStatus::Bootstrapped {
133                    time: meta.time(),
134                    stats: bootstrap_state.stats.clone(),
135                };
136                Ok(())
137            }
138            (_, P2pNetworkKademliaAction::UpdateRoutingTable { peer_id, addrs }) => {
139                let _ = state.routing_table.insert(
140                    P2pNetworkKadEntry::new(peer_id, addrs.clone()).map_err(|e| e.to_string())?,
141                );
142                Ok(())
143            }
144            (state, action) => Err(format!("invalid action {action:?} for state {state:?}")),
145        }
146    }
147}