p2p/network/
p2p_network_reducer.rs1use crate::P2pLimits;
2use identify::P2pNetworkIdentifyState;
3use openmina_core::Substate;
4
5use super::*;
6
7impl P2pNetworkState {
8 pub fn reducer<State, Action>(
9 state_context: Substate<Action, State, Self>,
10 action: redux::ActionWithMeta<P2pNetworkAction>,
11 limits: &P2pLimits,
12 ) -> Result<(), String>
13 where
14 State: crate::P2pStateTrait,
15 Action: crate::P2pActionTrait<State>,
16 {
17 let (action, meta) = action.split();
18 match action {
19 P2pNetworkAction::Pnet(a) => P2pNetworkPnetState::reducer(
20 Substate::from_compatible_substate(state_context),
21 meta.with_action(a),
22 ),
23 P2pNetworkAction::Scheduler(a) => P2pNetworkSchedulerState::reducer(
24 Substate::from_compatible_substate(state_context),
25 meta.with_action(a),
26 ),
27 P2pNetworkAction::Select(a) => P2pNetworkSelectState::reducer(
28 Substate::from_compatible_substate(state_context),
29 meta.with_action(a),
30 ),
31 P2pNetworkAction::Noise(a) => P2pNetworkNoiseState::reducer(
32 Substate::from_compatible_substate(state_context),
33 meta.with_action(a),
34 ),
35 P2pNetworkAction::Yamux(a) => P2pNetworkYamuxState::reducer(
36 Substate::from_compatible_substate(state_context),
37 meta.with_action(a),
38 ),
39 P2pNetworkAction::Identify(a) => P2pNetworkIdentifyState::reducer(
40 Substate::from_compatible_substate(state_context),
41 meta.with_action(a),
42 limits,
43 ),
44 P2pNetworkAction::Kad(a) => P2pNetworkKadState::reducer(
45 Substate::from_compatible_substate(state_context),
46 meta.with_action(a),
47 limits,
48 ),
49 P2pNetworkAction::Pubsub(a) => P2pNetworkPubsubState::reducer(
50 Substate::from_compatible_substate(state_context),
51 meta.with_action(a),
52 ),
53 P2pNetworkAction::Rpc(a) => P2pNetworkRpcState::reducer(
54 Substate::from_compatible_substate(state_context),
55 meta.with_action(a),
56 limits,
57 ),
58 }
59 }
60
61 pub fn find_rpc_state_mut(
62 &mut self,
63 a: &P2pNetworkRpcAction,
64 ) -> Option<&mut P2pNetworkRpcState> {
65 match a.stream_id() {
66 RpcStreamId::Exact(stream_id) => self
67 .scheduler
68 .rpc_incoming_streams
69 .get_mut(a.peer_id())
70 .and_then(|cn| cn.get_mut(&stream_id))
71 .or_else(|| {
72 self.scheduler
73 .rpc_outgoing_streams
74 .get_mut(a.peer_id())
75 .and_then(|cn| cn.get_mut(&stream_id))
76 }),
77 RpcStreamId::WithQuery(id) => self
78 .scheduler
79 .rpc_incoming_streams
80 .get_mut(a.peer_id())
81 .and_then(|streams| {
82 streams.iter_mut().find_map(|(_, state)| {
83 if state
84 .pending
85 .as_ref()
86 .is_some_and(|query_header| query_header.id == id)
87 {
88 Some(state)
89 } else {
90 None
91 }
92 })
93 }),
94 RpcStreamId::AnyIncoming => {
95 if let Some(streams) = self.scheduler.rpc_incoming_streams.get_mut(a.peer_id()) {
96 if let Some((k, _)) = streams.first_key_value() {
97 let k = *k;
98 return Some(streams.get_mut(&k).expect("checked above"));
99 }
100 }
101
102 None
103 }
104 RpcStreamId::AnyOutgoing => {
105 if let Some(streams) = self.scheduler.rpc_outgoing_streams.get_mut(a.peer_id()) {
106 if let Some((k, _)) = streams.first_key_value() {
107 let k = *k;
108 return Some(streams.get_mut(&k).expect("checked above"));
109 }
110 }
111
112 None
113 }
114 }
115 }
116}