p2p/network/
p2p_network_reducer.rs

1use crate::P2pLimits;
2use identify::P2pNetworkIdentifyState;
3use openmina_core::Substate;
4
5use super::*;
6
7impl P2pNetworkState {
8    pub fn reducer<State, Action>(
9        state_context: Substate<Action, State, Self>,
10        action: redux::ActionWithMeta<P2pNetworkAction>,
11        limits: &P2pLimits,
12    ) -> Result<(), String>
13    where
14        State: crate::P2pStateTrait,
15        Action: crate::P2pActionTrait<State>,
16    {
17        let (action, meta) = action.split();
18        match action {
19            P2pNetworkAction::Pnet(a) => P2pNetworkPnetState::reducer(
20                Substate::from_compatible_substate(state_context),
21                meta.with_action(a),
22            ),
23            P2pNetworkAction::Scheduler(a) => P2pNetworkSchedulerState::reducer(
24                Substate::from_compatible_substate(state_context),
25                meta.with_action(a),
26            ),
27            P2pNetworkAction::Select(a) => P2pNetworkSelectState::reducer(
28                Substate::from_compatible_substate(state_context),
29                meta.with_action(a),
30            ),
31            P2pNetworkAction::Noise(a) => P2pNetworkNoiseState::reducer(
32                Substate::from_compatible_substate(state_context),
33                meta.with_action(a),
34            ),
35            P2pNetworkAction::Yamux(a) => P2pNetworkYamuxState::reducer(
36                Substate::from_compatible_substate(state_context),
37                meta.with_action(a),
38            ),
39            P2pNetworkAction::Identify(a) => P2pNetworkIdentifyState::reducer(
40                Substate::from_compatible_substate(state_context),
41                meta.with_action(a),
42                limits,
43            ),
44            P2pNetworkAction::Kad(a) => P2pNetworkKadState::reducer(
45                Substate::from_compatible_substate(state_context),
46                meta.with_action(a),
47                limits,
48            ),
49            P2pNetworkAction::Pubsub(a) => P2pNetworkPubsubState::reducer(
50                Substate::from_compatible_substate(state_context),
51                meta.with_action(a),
52            ),
53            P2pNetworkAction::Rpc(a) => P2pNetworkRpcState::reducer(
54                Substate::from_compatible_substate(state_context),
55                meta.with_action(a),
56                limits,
57            ),
58        }
59    }
60
61    pub fn find_rpc_state_mut(
62        &mut self,
63        a: &P2pNetworkRpcAction,
64    ) -> Option<&mut P2pNetworkRpcState> {
65        match a.stream_id() {
66            RpcStreamId::Exact(stream_id) => self
67                .scheduler
68                .rpc_incoming_streams
69                .get_mut(a.peer_id())
70                .and_then(|cn| cn.get_mut(&stream_id))
71                .or_else(|| {
72                    self.scheduler
73                        .rpc_outgoing_streams
74                        .get_mut(a.peer_id())
75                        .and_then(|cn| cn.get_mut(&stream_id))
76                }),
77            RpcStreamId::WithQuery(id) => self
78                .scheduler
79                .rpc_incoming_streams
80                .get_mut(a.peer_id())
81                .and_then(|streams| {
82                    streams.iter_mut().find_map(|(_, state)| {
83                        if state
84                            .pending
85                            .as_ref()
86                            .is_some_and(|query_header| query_header.id == id)
87                        {
88                            Some(state)
89                        } else {
90                            None
91                        }
92                    })
93                }),
94            RpcStreamId::AnyIncoming => {
95                if let Some(streams) = self.scheduler.rpc_incoming_streams.get_mut(a.peer_id()) {
96                    if let Some((k, _)) = streams.first_key_value() {
97                        let k = *k;
98                        return Some(streams.get_mut(&k).expect("checked above"));
99                    }
100                }
101
102                None
103            }
104            RpcStreamId::AnyOutgoing => {
105                if let Some(streams) = self.scheduler.rpc_outgoing_streams.get_mut(a.peer_id()) {
106                    if let Some((k, _)) = streams.first_key_value() {
107                        let k = *k;
108                        return Some(streams.get_mut(&k).expect("checked above"));
109                    }
110                }
111
112                None
113            }
114        }
115    }
116}