p2p_testing/
redux.rs

1use openmina_core::{
2    impl_substate_access,
3    log::{
4        inner::{
5            field::{display, DisplayValue},
6            Value,
7        },
8        time_to_str, EventContext,
9    },
10    ActionEvent,
11};
12use p2p::{
13    bootstrap::P2pNetworkKadBootstrapState,
14    channels::{
15        best_tip::P2pChannelsBestTipAction,
16        rpc::P2pChannelsRpcAction,
17        signaling::{
18            discovery::P2pChannelsSignalingDiscoveryAction,
19            exchange::P2pChannelsSignalingExchangeAction,
20        },
21        snark::P2pChannelsSnarkAction,
22        snark_job_commitment::P2pChannelsSnarkJobCommitmentAction,
23        streaming_rpc::P2pChannelsStreamingRpcAction,
24        transaction::P2pChannelsTransactionAction,
25        P2pChannelsEffectfulAction,
26    },
27    connection::{
28        incoming_effectful::P2pConnectionIncomingEffectfulAction,
29        outgoing::P2pConnectionOutgoingAction,
30        outgoing_effectful::P2pConnectionOutgoingEffectfulAction,
31    },
32    disconnection::P2pDisconnectionAction,
33    disconnection_effectful::P2pDisconnectionEffectfulAction,
34    identify::P2pIdentifyAction,
35    network::identify::{
36        stream_effectful::P2pNetworkIdentifyStreamEffectfulAction, P2pNetworkIdentifyState,
37        P2pNetworkIdentifyStreamAction,
38    },
39    peer::P2pPeerAction,
40    MioEvent, P2pAction, P2pEffectfulAction, P2pEvent, P2pNetworkKadBootstrapAction,
41    P2pNetworkKadEffectfulAction, P2pNetworkKadRequestAction, P2pNetworkKademliaAction,
42    P2pNetworkKademliaStreamAction, P2pNetworkSchedulerAction, P2pNetworkYamuxAction, P2pState,
43    P2pStateTrait, PeerId,
44};
45use redux::{ActionMeta, EnablingCondition, SubStore};
46
47use crate::service::ClusterService;
48
49pub struct State(pub(crate) P2pState);
50pub type Store = redux::Store<State, ClusterService, Action>;
51
52impl State {
53    pub fn state(&self) -> &P2pState {
54        &self.0
55    }
56    pub fn state_mut(&mut self) -> &mut P2pState {
57        &mut self.0
58    }
59}
60
61impl EnablingCondition<State> for Action {
62    fn is_enabled(&self, state: &State, time: redux::Timestamp) -> bool {
63        match self {
64            Action::P2p(a) => a.is_enabled(&state.0, time),
65            Action::Idle(a) => a.is_enabled(state, time),
66            Action::P2pEffectful(a) => a.is_enabled(state.state(), time),
67        }
68    }
69}
70
71impl SubStore<State, P2pState> for Store {
72    type SubAction = P2pAction;
73
74    type Service = ClusterService;
75
76    fn state(&self) -> &P2pState {
77        &self.state.get().0
78    }
79
80    fn service(&mut self) -> &mut Self::Service {
81        &mut self.service
82    }
83
84    fn state_and_service(&mut self) -> (&P2pState, &mut Self::Service) {
85        (&self.state.get().0, &mut self.service)
86    }
87
88    fn dispatch<A>(&mut self, action: A) -> bool
89    where
90        A: Into<Self::SubAction> + redux::EnablingCondition<P2pState>,
91    {
92        self.sub_dispatch(action)
93    }
94
95    fn dispatch_callback<T>(&mut self, callback: redux::Callback<T>, args: T) -> bool
96    where
97        T: 'static,
98        P2pAction: From<redux::AnyAction> + redux::EnablingCondition<P2pState>,
99    {
100        Store::dispatch_callback(self, callback, args)
101    }
102}
103
104impl_substate_access!(State, P2pState, 0);
105
106macro_rules! impl_p2p_state_access {
107    ($state:ty, $substate_type:ty) => {
108        impl openmina_core::SubstateAccess<$substate_type> for $state {
109            fn substate(&self) -> openmina_core::SubstateResult<&$substate_type> {
110                let substate: &P2pState = self.substate()?;
111                substate.substate()
112            }
113
114            fn substate_mut(&mut self) -> openmina_core::SubstateResult<&mut $substate_type> {
115                let substate: &mut P2pState = self.substate_mut()?;
116                substate.substate_mut()
117            }
118        }
119    };
120}
121
122impl_p2p_state_access!(State, P2pNetworkIdentifyState);
123impl_p2p_state_access!(State, p2p::P2pNetworkState);
124impl_p2p_state_access!(State, P2pNetworkKadBootstrapState);
125impl_p2p_state_access!(State, p2p::P2pNetworkKadState);
126impl_p2p_state_access!(State, p2p::P2pNetworkSchedulerState);
127impl_p2p_state_access!(State, p2p::P2pLimits);
128impl_p2p_state_access!(State, p2p::P2pNetworkPubsubState);
129impl_p2p_state_access!(State, p2p::P2pConfig);
130
131impl P2pStateTrait for State {}
132
133#[derive(Debug, derive_more::From)]
134pub enum Action {
135    P2p(P2pAction),
136    P2pEffectful(P2pEffectfulAction),
137    Idle(IdleAction),
138}
139
140impl From<redux::AnyAction> for Action {
141    fn from(action: redux::AnyAction) -> Self {
142        match action.0.downcast() {
143            Ok(action) => *action,
144            Err(action) => Self::P2p(*action.downcast().expect("Downcast failed")),
145        }
146    }
147}
148
149#[derive(Debug)]
150pub struct IdleAction;
151
152impl EnablingCondition<State> for IdleAction {
153    fn is_enabled(&self, _state: &State, _time: redux::Timestamp) -> bool {
154        true
155    }
156}
157
158struct ActionLoggerContext {
159    time: redux::Timestamp,
160    time_str: String,
161    node_id: DisplayValue<PeerId>,
162}
163
164impl ActionLoggerContext {
165    fn new(time: redux::Timestamp, node_id: PeerId) -> Self {
166        ActionLoggerContext {
167            time,
168            time_str: time_to_str(time),
169            node_id: display(node_id),
170        }
171    }
172}
173
174impl EventContext for ActionLoggerContext {
175    fn timestamp(&self) -> redux::Timestamp {
176        self.time
177    }
178
179    fn time(&self) -> &'_ dyn Value {
180        &self.time_str
181    }
182
183    fn node_id(&self) -> &'_ dyn Value {
184        &self.node_id
185    }
186
187    fn log_node_id(&self) -> bool {
188        true
189    }
190}
191
192pub(super) fn log_action(action: &Action, meta: &ActionMeta, node_id: PeerId) {
193    if let Action::P2p(action) = action {
194        ActionEvent::action_event(action, &ActionLoggerContext::new(meta.time(), node_id));
195    }
196}
197
198pub(super) fn event_effect(store: &mut crate::redux::Store, event: P2pEvent) -> bool {
199    match event {
200        P2pEvent::MioEvent(event) => match event {
201            MioEvent::InterfaceDetected(ip) => {
202                SubStore::dispatch(store, P2pNetworkSchedulerAction::InterfaceDetected { ip })
203            }
204            MioEvent::InterfaceExpired(ip) => {
205                SubStore::dispatch(store, P2pNetworkSchedulerAction::InterfaceExpired { ip })
206            }
207            MioEvent::ListenerReady { listener } => {
208                SubStore::dispatch(store, P2pNetworkSchedulerAction::ListenerReady { listener })
209            }
210            MioEvent::ListenerError { listener, error } => SubStore::dispatch(
211                store,
212                P2pNetworkSchedulerAction::ListenerError { listener, error },
213            ),
214            MioEvent::IncomingConnectionIsReady { listener } => SubStore::dispatch(
215                store,
216                P2pNetworkSchedulerAction::IncomingConnectionIsReady { listener },
217            ),
218            MioEvent::IncomingConnectionDidAccept(addr, result) => SubStore::dispatch(
219                store,
220                P2pNetworkSchedulerAction::IncomingDidAccept { addr, result },
221            ),
222            MioEvent::OutgoingConnectionDidConnect(addr, result) => SubStore::dispatch(
223                store,
224                P2pNetworkSchedulerAction::OutgoingDidConnect { addr, result },
225            ),
226            MioEvent::IncomingDataIsReady(addr) => SubStore::dispatch(
227                store,
228                P2pNetworkSchedulerAction::IncomingDataIsReady { addr },
229            ),
230            MioEvent::IncomingDataDidReceive(addr, result) => SubStore::dispatch(
231                store,
232                P2pNetworkSchedulerAction::IncomingDataDidReceive { addr, result },
233            ),
234            MioEvent::OutgoingDataDidSend(_, _result) => true,
235            MioEvent::ConnectionDidClose(addr, result) => {
236                if let Err(e) = result {
237                    SubStore::dispatch(
238                        store,
239                        P2pNetworkSchedulerAction::Error {
240                            addr,
241                            error: p2p::P2pNetworkConnectionError::MioError(e),
242                        },
243                    )
244                } else {
245                    SubStore::dispatch(
246                        store,
247                        P2pNetworkSchedulerAction::Error {
248                            addr,
249                            error: p2p::P2pNetworkConnectionError::RemoteClosed,
250                        },
251                    )
252                }
253            }
254            MioEvent::ConnectionDidCloseOnDemand(addr) => {
255                SubStore::dispatch(store, P2pNetworkSchedulerAction::Prune { addr })
256            }
257        },
258        _ => false,
259    }
260}
261
262macro_rules! impl_from_p2p {
263    ($sub_action:ty) => {
264        impl From<$sub_action> for Action {
265            fn from(value: $sub_action) -> Self {
266                Self::P2p(P2pAction::from(value))
267            }
268        }
269    };
270    (effectful $sub_action:ty) => {
271        impl From<$sub_action> for Action {
272            fn from(value: $sub_action) -> Self {
273                Self::P2pEffectful(P2pEffectfulAction::from(value))
274            }
275        }
276    };
277}
278
279impl_from_p2p!(P2pNetworkKademliaAction);
280impl_from_p2p!(P2pNetworkKademliaStreamAction);
281impl_from_p2p!(P2pNetworkKadRequestAction);
282impl_from_p2p!(P2pNetworkKadBootstrapAction);
283impl_from_p2p!(P2pPeerAction);
284impl_from_p2p!(P2pNetworkYamuxAction);
285impl_from_p2p!(P2pConnectionOutgoingAction);
286impl_from_p2p!(P2pNetworkSchedulerAction);
287impl_from_p2p!(P2pNetworkIdentifyStreamAction);
288impl_from_p2p!(P2pIdentifyAction);
289impl_from_p2p!(p2p::P2pNetworkSelectAction);
290impl_from_p2p!(p2p::P2pNetworkPnetAction);
291impl_from_p2p!(p2p::P2pNetworkNoiseAction);
292impl_from_p2p!(p2p::connection::incoming::P2pConnectionIncomingAction);
293impl_from_p2p!(p2p::P2pNetworkPubsubAction);
294impl_from_p2p!(P2pChannelsSignalingDiscoveryAction);
295impl_from_p2p!(P2pChannelsSignalingExchangeAction);
296impl_from_p2p!(P2pChannelsTransactionAction);
297impl_from_p2p!(P2pChannelsSnarkAction);
298impl_from_p2p!(p2p::P2pNetworkRpcAction);
299impl_from_p2p!(P2pChannelsRpcAction);
300impl_from_p2p!(P2pDisconnectionAction);
301impl_from_p2p!(P2pChannelsBestTipAction);
302impl_from_p2p!(P2pChannelsSnarkJobCommitmentAction);
303impl_from_p2p!(P2pChannelsStreamingRpcAction);
304
305impl_from_p2p!(effectful P2pNetworkKadEffectfulAction);
306impl_from_p2p!(effectful P2pConnectionIncomingEffectfulAction);
307impl_from_p2p!(effectful p2p::P2pNetworkSchedulerEffectfulAction);
308impl_from_p2p!(effectful p2p::P2pNetworkPnetEffectfulAction);
309impl_from_p2p!(effectful p2p::P2pNetworkPubsubEffectfulAction);
310impl_from_p2p!(effectful P2pNetworkIdentifyStreamEffectfulAction);
311impl_from_p2p!(effectful P2pConnectionOutgoingEffectfulAction);
312impl_from_p2p!(effectful P2pDisconnectionEffectfulAction);
313impl_from_p2p!(effectful P2pChannelsEffectfulAction);
314
315impl p2p::P2pActionTrait<State> for Action {}