1use openmina_core::{
2 impl_substate_access,
3 log::{
4 inner::{
5 field::{display, DisplayValue},
6 Value,
7 },
8 time_to_str, EventContext,
9 },
10 ActionEvent,
11};
12use p2p::{
13 bootstrap::P2pNetworkKadBootstrapState,
14 channels::{
15 best_tip::P2pChannelsBestTipAction,
16 rpc::P2pChannelsRpcAction,
17 signaling::{
18 discovery::P2pChannelsSignalingDiscoveryAction,
19 exchange::P2pChannelsSignalingExchangeAction,
20 },
21 snark::P2pChannelsSnarkAction,
22 snark_job_commitment::P2pChannelsSnarkJobCommitmentAction,
23 streaming_rpc::P2pChannelsStreamingRpcAction,
24 transaction::P2pChannelsTransactionAction,
25 P2pChannelsEffectfulAction,
26 },
27 connection::{
28 incoming_effectful::P2pConnectionIncomingEffectfulAction,
29 outgoing::P2pConnectionOutgoingAction,
30 outgoing_effectful::P2pConnectionOutgoingEffectfulAction,
31 },
32 disconnection::P2pDisconnectionAction,
33 disconnection_effectful::P2pDisconnectionEffectfulAction,
34 identify::P2pIdentifyAction,
35 network::identify::{
36 stream_effectful::P2pNetworkIdentifyStreamEffectfulAction, P2pNetworkIdentifyState,
37 P2pNetworkIdentifyStreamAction,
38 },
39 peer::P2pPeerAction,
40 MioEvent, P2pAction, P2pEffectfulAction, P2pEvent, P2pNetworkKadBootstrapAction,
41 P2pNetworkKadEffectfulAction, P2pNetworkKadRequestAction, P2pNetworkKademliaAction,
42 P2pNetworkKademliaStreamAction, P2pNetworkSchedulerAction, P2pNetworkYamuxAction, P2pState,
43 P2pStateTrait, PeerId,
44};
45use redux::{ActionMeta, EnablingCondition, SubStore};
46
47use crate::service::ClusterService;
48
49pub struct State(pub(crate) P2pState);
50pub type Store = redux::Store<State, ClusterService, Action>;
51
52impl State {
53 pub fn state(&self) -> &P2pState {
54 &self.0
55 }
56 pub fn state_mut(&mut self) -> &mut P2pState {
57 &mut self.0
58 }
59}
60
61impl EnablingCondition<State> for Action {
62 fn is_enabled(&self, state: &State, time: redux::Timestamp) -> bool {
63 match self {
64 Action::P2p(a) => a.is_enabled(&state.0, time),
65 Action::Idle(a) => a.is_enabled(state, time),
66 Action::P2pEffectful(a) => a.is_enabled(state.state(), time),
67 }
68 }
69}
70
71impl SubStore<State, P2pState> for Store {
72 type SubAction = P2pAction;
73
74 type Service = ClusterService;
75
76 fn state(&self) -> &P2pState {
77 &self.state.get().0
78 }
79
80 fn service(&mut self) -> &mut Self::Service {
81 &mut self.service
82 }
83
84 fn state_and_service(&mut self) -> (&P2pState, &mut Self::Service) {
85 (&self.state.get().0, &mut self.service)
86 }
87
88 fn dispatch<A>(&mut self, action: A) -> bool
89 where
90 A: Into<Self::SubAction> + redux::EnablingCondition<P2pState>,
91 {
92 self.sub_dispatch(action)
93 }
94
95 fn dispatch_callback<T>(&mut self, callback: redux::Callback<T>, args: T) -> bool
96 where
97 T: 'static,
98 P2pAction: From<redux::AnyAction> + redux::EnablingCondition<P2pState>,
99 {
100 Store::dispatch_callback(self, callback, args)
101 }
102}
103
104impl_substate_access!(State, P2pState, 0);
105
106macro_rules! impl_p2p_state_access {
107 ($state:ty, $substate_type:ty) => {
108 impl openmina_core::SubstateAccess<$substate_type> for $state {
109 fn substate(&self) -> openmina_core::SubstateResult<&$substate_type> {
110 let substate: &P2pState = self.substate()?;
111 substate.substate()
112 }
113
114 fn substate_mut(&mut self) -> openmina_core::SubstateResult<&mut $substate_type> {
115 let substate: &mut P2pState = self.substate_mut()?;
116 substate.substate_mut()
117 }
118 }
119 };
120}
121
122impl_p2p_state_access!(State, P2pNetworkIdentifyState);
123impl_p2p_state_access!(State, p2p::P2pNetworkState);
124impl_p2p_state_access!(State, P2pNetworkKadBootstrapState);
125impl_p2p_state_access!(State, p2p::P2pNetworkKadState);
126impl_p2p_state_access!(State, p2p::P2pNetworkSchedulerState);
127impl_p2p_state_access!(State, p2p::P2pLimits);
128impl_p2p_state_access!(State, p2p::P2pNetworkPubsubState);
129impl_p2p_state_access!(State, p2p::P2pConfig);
130
131impl P2pStateTrait for State {}
132
133#[derive(Debug, derive_more::From)]
134pub enum Action {
135 P2p(P2pAction),
136 P2pEffectful(P2pEffectfulAction),
137 Idle(IdleAction),
138}
139
140impl From<redux::AnyAction> for Action {
141 fn from(action: redux::AnyAction) -> Self {
142 match action.0.downcast() {
143 Ok(action) => *action,
144 Err(action) => Self::P2p(*action.downcast().expect("Downcast failed")),
145 }
146 }
147}
148
149#[derive(Debug)]
150pub struct IdleAction;
151
152impl EnablingCondition<State> for IdleAction {
153 fn is_enabled(&self, _state: &State, _time: redux::Timestamp) -> bool {
154 true
155 }
156}
157
158struct ActionLoggerContext {
159 time: redux::Timestamp,
160 time_str: String,
161 node_id: DisplayValue<PeerId>,
162}
163
164impl ActionLoggerContext {
165 fn new(time: redux::Timestamp, node_id: PeerId) -> Self {
166 ActionLoggerContext {
167 time,
168 time_str: time_to_str(time),
169 node_id: display(node_id),
170 }
171 }
172}
173
174impl EventContext for ActionLoggerContext {
175 fn timestamp(&self) -> redux::Timestamp {
176 self.time
177 }
178
179 fn time(&self) -> &'_ dyn Value {
180 &self.time_str
181 }
182
183 fn node_id(&self) -> &'_ dyn Value {
184 &self.node_id
185 }
186
187 fn log_node_id(&self) -> bool {
188 true
189 }
190}
191
192pub(super) fn log_action(action: &Action, meta: &ActionMeta, node_id: PeerId) {
193 if let Action::P2p(action) = action {
194 ActionEvent::action_event(action, &ActionLoggerContext::new(meta.time(), node_id));
195 }
196}
197
198pub(super) fn event_effect(store: &mut crate::redux::Store, event: P2pEvent) -> bool {
199 match event {
200 P2pEvent::MioEvent(event) => match event {
201 MioEvent::InterfaceDetected(ip) => {
202 SubStore::dispatch(store, P2pNetworkSchedulerAction::InterfaceDetected { ip })
203 }
204 MioEvent::InterfaceExpired(ip) => {
205 SubStore::dispatch(store, P2pNetworkSchedulerAction::InterfaceExpired { ip })
206 }
207 MioEvent::ListenerReady { listener } => {
208 SubStore::dispatch(store, P2pNetworkSchedulerAction::ListenerReady { listener })
209 }
210 MioEvent::ListenerError { listener, error } => SubStore::dispatch(
211 store,
212 P2pNetworkSchedulerAction::ListenerError { listener, error },
213 ),
214 MioEvent::IncomingConnectionIsReady { listener } => SubStore::dispatch(
215 store,
216 P2pNetworkSchedulerAction::IncomingConnectionIsReady { listener },
217 ),
218 MioEvent::IncomingConnectionDidAccept(addr, result) => SubStore::dispatch(
219 store,
220 P2pNetworkSchedulerAction::IncomingDidAccept { addr, result },
221 ),
222 MioEvent::OutgoingConnectionDidConnect(addr, result) => SubStore::dispatch(
223 store,
224 P2pNetworkSchedulerAction::OutgoingDidConnect { addr, result },
225 ),
226 MioEvent::IncomingDataIsReady(addr) => SubStore::dispatch(
227 store,
228 P2pNetworkSchedulerAction::IncomingDataIsReady { addr },
229 ),
230 MioEvent::IncomingDataDidReceive(addr, result) => SubStore::dispatch(
231 store,
232 P2pNetworkSchedulerAction::IncomingDataDidReceive { addr, result },
233 ),
234 MioEvent::OutgoingDataDidSend(_, _result) => true,
235 MioEvent::ConnectionDidClose(addr, result) => {
236 if let Err(e) = result {
237 SubStore::dispatch(
238 store,
239 P2pNetworkSchedulerAction::Error {
240 addr,
241 error: p2p::P2pNetworkConnectionError::MioError(e),
242 },
243 )
244 } else {
245 SubStore::dispatch(
246 store,
247 P2pNetworkSchedulerAction::Error {
248 addr,
249 error: p2p::P2pNetworkConnectionError::RemoteClosed,
250 },
251 )
252 }
253 }
254 MioEvent::ConnectionDidCloseOnDemand(addr) => {
255 SubStore::dispatch(store, P2pNetworkSchedulerAction::Prune { addr })
256 }
257 },
258 _ => false,
259 }
260}
261
262macro_rules! impl_from_p2p {
263 ($sub_action:ty) => {
264 impl From<$sub_action> for Action {
265 fn from(value: $sub_action) -> Self {
266 Self::P2p(P2pAction::from(value))
267 }
268 }
269 };
270 (effectful $sub_action:ty) => {
271 impl From<$sub_action> for Action {
272 fn from(value: $sub_action) -> Self {
273 Self::P2pEffectful(P2pEffectfulAction::from(value))
274 }
275 }
276 };
277}
278
279impl_from_p2p!(P2pNetworkKademliaAction);
280impl_from_p2p!(P2pNetworkKademliaStreamAction);
281impl_from_p2p!(P2pNetworkKadRequestAction);
282impl_from_p2p!(P2pNetworkKadBootstrapAction);
283impl_from_p2p!(P2pPeerAction);
284impl_from_p2p!(P2pNetworkYamuxAction);
285impl_from_p2p!(P2pConnectionOutgoingAction);
286impl_from_p2p!(P2pNetworkSchedulerAction);
287impl_from_p2p!(P2pNetworkIdentifyStreamAction);
288impl_from_p2p!(P2pIdentifyAction);
289impl_from_p2p!(p2p::P2pNetworkSelectAction);
290impl_from_p2p!(p2p::P2pNetworkPnetAction);
291impl_from_p2p!(p2p::P2pNetworkNoiseAction);
292impl_from_p2p!(p2p::connection::incoming::P2pConnectionIncomingAction);
293impl_from_p2p!(p2p::P2pNetworkPubsubAction);
294impl_from_p2p!(P2pChannelsSignalingDiscoveryAction);
295impl_from_p2p!(P2pChannelsSignalingExchangeAction);
296impl_from_p2p!(P2pChannelsTransactionAction);
297impl_from_p2p!(P2pChannelsSnarkAction);
298impl_from_p2p!(p2p::P2pNetworkRpcAction);
299impl_from_p2p!(P2pChannelsRpcAction);
300impl_from_p2p!(P2pDisconnectionAction);
301impl_from_p2p!(P2pChannelsBestTipAction);
302impl_from_p2p!(P2pChannelsSnarkJobCommitmentAction);
303impl_from_p2p!(P2pChannelsStreamingRpcAction);
304
305impl_from_p2p!(effectful P2pNetworkKadEffectfulAction);
306impl_from_p2p!(effectful P2pConnectionIncomingEffectfulAction);
307impl_from_p2p!(effectful p2p::P2pNetworkSchedulerEffectfulAction);
308impl_from_p2p!(effectful p2p::P2pNetworkPnetEffectfulAction);
309impl_from_p2p!(effectful p2p::P2pNetworkPubsubEffectfulAction);
310impl_from_p2p!(effectful P2pNetworkIdentifyStreamEffectfulAction);
311impl_from_p2p!(effectful P2pConnectionOutgoingEffectfulAction);
312impl_from_p2p!(effectful P2pDisconnectionEffectfulAction);
313impl_from_p2p!(effectful P2pChannelsEffectfulAction);
314
315impl p2p::P2pActionTrait<State> for Action {}