p2p/
p2p_reducer.rs

1use crate::{
2    channels::{
3        rpc::P2pChannelsRpcAction, signaling::discovery::P2pChannelsSignalingDiscoveryAction,
4        streaming_rpc::P2pChannelsStreamingRpcAction, P2pChannelsState,
5    },
6    connection::{
7        incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction,
8        P2pConnectionState,
9    },
10    disconnection::{P2pDisconnectedState, P2pDisconnectionAction},
11    P2pAction, P2pNetworkKadKey, P2pNetworkKademliaAction, P2pNetworkPnetAction,
12    P2pNetworkPubsubAction, P2pNetworkRpcAction, P2pNetworkSelectAction, P2pNetworkState,
13    P2pPeerState, P2pState, PeerId,
14};
15use openmina_core::{bug_condition, Substate};
16use redux::{ActionMeta, ActionWithMeta, Dispatcher, Timestamp};
17
18impl P2pState {
19    pub fn reducer<State, Action>(
20        mut state_context: Substate<Action, State, Self>,
21        action: ActionWithMeta<P2pAction>,
22    ) -> Result<(), String>
23    where
24        State: crate::P2pStateTrait,
25        Action: crate::P2pActionTrait<State>,
26    {
27        let Ok(state) = state_context.get_substate_mut() else {
28            bug_condition!("no P2pState");
29            return Ok(());
30        };
31        let (action, meta) = action.split();
32
33        match action {
34            P2pAction::Initialization(_) => {
35                // noop
36                Ok(())
37            }
38            P2pAction::Connection(action) => {
39                P2pConnectionState::reducer(state_context, meta.with_action(action))
40            }
41            P2pAction::Disconnection(action) => {
42                P2pDisconnectedState::reducer(state_context, meta.with_action(action))
43            }
44            P2pAction::Peer(action) => P2pPeerState::reducer(
45                Substate::from_compatible_substate(state_context),
46                meta.with_action(action),
47            ),
48            P2pAction::Channels(action) => {
49                P2pChannelsState::reducer(state_context, meta.with_action(action))
50            }
51            P2pAction::Identify(_action) => {
52                #[cfg(feature = "p2p-libp2p")]
53                Self::identify_reducer(state_context, meta.with_action(_action))?;
54                Ok(())
55            }
56            P2pAction::Network(_action) => {
57                #[cfg(feature = "p2p-libp2p")]
58                {
59                    let limits = state.config.limits;
60                    P2pNetworkState::reducer(
61                        Substate::from_compatible_substate(state_context),
62                        meta.with_action(_action),
63                        &limits,
64                    )?;
65                }
66                Ok(())
67            }
68        }
69    }
70
71    pub fn p2p_timeout_dispatch<State, Action>(
72        state_context: Substate<Action, State, Self>,
73        meta: &ActionMeta,
74    ) -> Result<(), String>
75    where
76        State: crate::P2pStateTrait,
77        Action: crate::P2pActionTrait<State>,
78    {
79        let (dispatcher, state) = state_context.into_dispatcher_and_state();
80        let state: &P2pState = state.substate()?;
81        let time = meta.time();
82
83        state.p2p_connection_timeouts_dispatch(dispatcher, time)?;
84        dispatcher.push(P2pConnectionOutgoingAction::RandomInit);
85        dispatcher.push(P2pDisconnectionAction::RandomTry);
86
87        state.p2p_connect_initial_peers(dispatcher);
88        state.p2p_try_reconnect_disconnected_peers(dispatcher, time)?;
89        state.p2p_discovery(dispatcher, time)?;
90
91        #[cfg(feature = "p2p-libp2p")]
92        {
93            state.p2p_pnet_timeouts(dispatcher, time)?;
94            state.p2p_select_timeouts(dispatcher, time)?;
95            state.p2p_rpc_heartbeats(dispatcher, time)?;
96            dispatcher.push(P2pNetworkPubsubAction::PruneMessages {});
97        }
98
99        state.rpc_timeouts(dispatcher, time)?;
100        Ok(())
101    }
102
103    fn p2p_connection_timeouts_dispatch<State, Action>(
104        &self,
105        dispatcher: &mut Dispatcher<Action, State>,
106        time: Timestamp,
107    ) -> Result<(), String>
108    where
109        State: crate::P2pStateTrait,
110        Action: crate::P2pActionTrait<State>,
111    {
112        let timeouts = &self.config.timeouts;
113
114        self.peers
115            .iter()
116            .filter_map(|(peer_id, peer)| {
117                let state = peer.status.as_connecting()?;
118                state
119                    .is_timed_out(time, timeouts)
120                    .then(|| (*peer_id, state.as_outgoing().is_some()))
121            })
122            .for_each(|(peer_id, is_outgoing)| match is_outgoing {
123                true => dispatcher.push(P2pConnectionOutgoingAction::Timeout { peer_id }),
124                false => dispatcher.push(P2pConnectionIncomingAction::Timeout { peer_id }),
125            });
126
127        Ok(())
128    }
129
130    fn p2p_try_reconnect_disconnected_peers<State, Action>(
131        &self,
132        dispatcher: &mut Dispatcher<Action, State>,
133        time: Timestamp,
134    ) -> Result<(), String>
135    where
136        State: crate::P2pStateTrait,
137        Action: crate::P2pActionTrait<State>,
138    {
139        if self.already_has_min_peers() {
140            return Ok(());
141        }
142
143        let timeouts = &self.config.timeouts;
144
145        self.peers
146            .iter()
147            .filter_map(|(_, peer)| {
148                if peer.can_reconnect(time, timeouts) {
149                    peer.dial_opts.clone()
150                } else {
151                    None
152                }
153            })
154            .map(|opts| P2pConnectionOutgoingAction::Reconnect { opts, rpc_id: None })
155            .for_each(|action| dispatcher.push(action));
156        Ok(())
157    }
158
159    fn p2p_connect_initial_peers<State, Action>(&self, dispatcher: &mut Dispatcher<Action, State>)
160    where
161        State: crate::P2pStateTrait,
162        Action: crate::P2pActionTrait<State>,
163    {
164        if self.ready_peers_iter().count() >= self.config.initial_peers.len() {
165            return;
166        }
167
168        self.config
169            .initial_peers
170            .iter()
171            .filter(|opts| !self.peers.contains_key(opts.peer_id()))
172            .cloned()
173            .for_each(|opts| {
174                dispatcher.push(P2pConnectionOutgoingAction::Init {
175                    opts,
176                    rpc_id: None,
177                    on_success: None,
178                });
179            });
180    }
181
182    fn rpc_timeouts<State, Action>(
183        &self,
184        dispatcher: &mut Dispatcher<Action, State>,
185        time: Timestamp,
186    ) -> Result<(), String>
187    where
188        State: crate::P2pStateTrait,
189        Action: crate::P2pActionTrait<State>,
190    {
191        self.peer_rpc_timeouts(time)
192            .into_iter()
193            .for_each(|(peer_id, id, is_streaming)| {
194                if is_streaming {
195                    dispatcher.push(P2pChannelsStreamingRpcAction::Timeout { peer_id, id });
196                } else {
197                    dispatcher.push(P2pChannelsRpcAction::Timeout { peer_id, id });
198                }
199            });
200
201        Ok(())
202    }
203
204    fn p2p_discovery<State, Action>(
205        &self,
206        dispatcher: &mut Dispatcher<Action, State>,
207        time: Timestamp,
208    ) -> Result<(), String>
209    where
210        State: crate::P2pStateTrait,
211        Action: crate::P2pActionTrait<State>,
212    {
213        let config = &self.config;
214        let timeouts = &config.timeouts;
215
216        if !config.peer_discovery {
217            return Ok(());
218        }
219
220        for (&peer_id, _) in self
221            .ready_peers_iter()
222            .filter(|(_, peer)| peer.channels.signaling.discovery.is_ready())
223        {
224            dispatcher.push(P2pChannelsSignalingDiscoveryAction::RequestSend { peer_id });
225            dispatcher.push(P2pChannelsSignalingDiscoveryAction::DiscoveryRequestSend { peer_id });
226        }
227
228        if let Some(_d) = config.timeouts.initial_peers {
229            // ask initial peers
230            // TODO: use RPC to ask initial peers
231        }
232
233        #[cfg(feature = "p2p-libp2p")]
234        {
235            if let Some(discovery_state) = self.network.scheduler.discovery_state() {
236                let my_id = self.my_id();
237
238                match P2pNetworkKadKey::try_from(&my_id) {
239                    Ok(key) => {
240                        if discovery_state.status.can_bootstrap(time, timeouts)
241                            && discovery_state
242                                .routing_table
243                                .closest_peers(&key)
244                                .any(|_| true)
245                        {
246                            dispatcher
247                                .push(P2pNetworkKademliaAction::StartBootstrap { key: my_id });
248                        }
249                    }
250                    Err(e) => bug_condition!("p2p discovery error: {:?}", e),
251                }
252            }
253        }
254
255        Ok(())
256    }
257}
258
259#[cfg(feature = "p2p-libp2p")]
260impl P2pState {
261    fn p2p_pnet_timeouts<State, Action>(
262        &self,
263        dispatcher: &mut Dispatcher<Action, State>,
264        time: Timestamp,
265    ) -> Result<(), String>
266    where
267        State: crate::P2pStateTrait,
268        Action: crate::P2pActionTrait<State>,
269    {
270        let timeouts = &self.config.timeouts;
271
272        self.network
273            .scheduler
274            .connections
275            .iter()
276            .filter(|(_, state)| state.pnet.is_timed_out(time, timeouts))
277            .map(|(addr, _)| P2pNetworkPnetAction::Timeout { addr: *addr })
278            .for_each(|action| dispatcher.push(action));
279
280        Ok(())
281    }
282
283    fn p2p_select_timeouts<State, Action>(
284        &self,
285        dispatcher: &mut Dispatcher<Action, State>,
286        time: Timestamp,
287    ) -> Result<(), String>
288    where
289        State: crate::P2pStateTrait,
290        Action: crate::P2pActionTrait<State>,
291    {
292        let timeouts = &self.config.timeouts;
293
294        self.network
295            .scheduler
296            .connections
297            .iter()
298            .filter(|(_, state)| state.select_auth.is_timed_out(time, timeouts))
299            .map(|(addr, _)| P2pNetworkSelectAction::Timeout {
300                addr: *addr,
301                kind: crate::SelectKind::Authentication,
302            })
303            .for_each(|action| dispatcher.push(action));
304
305        self.network
306            .scheduler
307            .connections
308            .iter()
309            .filter(|(_, state)| state.select_mux.is_timed_out(time, timeouts))
310            .map(|(addr, _)| P2pNetworkSelectAction::Timeout {
311                addr: *addr,
312                kind: crate::SelectKind::MultiplexingNoPeerId,
313            })
314            .for_each(|action| dispatcher.push(action));
315
316        // TODO: better solution for PeerId
317        let dummy = PeerId::from_bytes([0u8; 32]);
318        self.network
319            .scheduler
320            .connections
321            .iter()
322            .flat_map(|(sock_addr, state)| {
323                state
324                    .streams
325                    .iter()
326                    .filter(|(_, stream)| stream.select.is_timed_out(time, timeouts))
327                    .map(|(stream_id, _)| (*sock_addr, *stream_id))
328            })
329            .map(|(addr, stream_id)| P2pNetworkSelectAction::Timeout {
330                addr,
331                kind: crate::SelectKind::Stream(dummy, stream_id),
332            })
333            .for_each(|action| dispatcher.push(action));
334
335        Ok(())
336    }
337
338    fn p2p_rpc_heartbeats<State, Action>(
339        &self,
340        dispatcher: &mut Dispatcher<Action, State>,
341        time: Timestamp,
342    ) -> Result<(), String>
343    where
344        State: crate::P2pStateTrait,
345        Action: crate::P2pActionTrait<State>,
346    {
347        let scheduler = &self.network.scheduler;
348
349        scheduler
350            .rpc_incoming_streams
351            .0
352            .iter()
353            .chain(&scheduler.rpc_outgoing_streams.0)
354            .flat_map(|(peer_id, state)| {
355                state
356                    .iter()
357                    .filter(|(_, s)| s.should_send_heartbeat(time))
358                    .map(|(stream_id, state)| P2pNetworkRpcAction::HeartbeatSend {
359                        addr: state.addr,
360                        peer_id: *peer_id,
361                        stream_id: *stream_id,
362                    })
363            })
364            .for_each(|action| dispatcher.push(action));
365
366        Ok(())
367    }
368}