p2p/connection/incoming/
p2p_connection_incoming_reducer.rs

1#[cfg(feature = "p2p-libp2p")]
2use std::net::{IpAddr, SocketAddr};
3
4use openmina_core::{bug_condition, debug, warn, Substate};
5use redux::{ActionWithMeta, Dispatcher, Timestamp};
6
7use crate::{
8    channels::signaling::exchange::P2pChannelsSignalingExchangeAction,
9    connection::{
10        incoming::P2pConnectionIncomingError,
11        incoming_effectful::P2pConnectionIncomingEffectfulAction,
12        outgoing::{P2pConnectionOutgoingInitLibp2pOpts, P2pConnectionOutgoingInitOpts},
13        P2pConnectionResponse, P2pConnectionState,
14    },
15    disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
16    webrtc::{Host, HttpSignalingInfo, SignalingMethod},
17    ConnectionAddr, P2pNetworkSchedulerAction, P2pPeerAction, P2pPeerState, P2pPeerStatus,
18    P2pState, PeerId,
19};
20
21use super::{
22    super::{incoming::P2pConnectionIncomingState, RejectionReason},
23    IncomingSignalingMethod, P2pConnectionIncomingAction,
24};
25
26impl P2pConnectionIncomingState {
27    /// Substate is accessed
28    pub fn reducer<Action, State>(
29        mut state_context: Substate<Action, State, P2pState>,
30        action: ActionWithMeta<P2pConnectionIncomingAction>,
31    ) -> Result<(), String>
32    where
33        State: crate::P2pStateTrait,
34        Action: crate::P2pActionTrait<State>,
35    {
36        let (action, meta) = action.split();
37        let time = meta.time();
38        let peer_id = *action.peer_id();
39        let p2p_state = state_context.get_substate_mut()?;
40        let my_id = p2p_state.my_id();
41
42        match action {
43            P2pConnectionIncomingAction::Init { opts, rpc_id } => {
44                let state = p2p_state
45                    .peers
46                    .entry(peer_id)
47                    .or_insert_with(|| P2pPeerState {
48                        is_libp2p: false,
49                        dial_opts: opts.offer.listen_port.and_then(|listen_port| {
50                            let signaling = match opts.signaling {
51                                IncomingSignalingMethod::Http => {
52                                    SignalingMethod::Http(HttpSignalingInfo {
53                                        host: opts.offer.host.clone(),
54                                        port: listen_port,
55                                    })
56                                }
57                                IncomingSignalingMethod::P2p { .. } => return None,
58                            };
59                            Some(P2pConnectionOutgoingInitOpts::WebRTC { peer_id, signaling })
60                        }),
61                        status: P2pPeerStatus::Connecting(P2pConnectionState::incoming_init(&opts)),
62                        identify: None,
63                    });
64
65                state.status =
66                    P2pPeerStatus::Connecting(P2pConnectionState::Incoming(Self::Init {
67                        time: meta.time(),
68                        signaling: opts.signaling,
69                        offer: opts.offer.clone(),
70                        rpc_id,
71                    }));
72
73                let dispatcher = state_context.into_dispatcher();
74                dispatcher.push(P2pConnectionIncomingEffectfulAction::Init { opts });
75                Ok(())
76            }
77            P2pConnectionIncomingAction::AnswerSdpCreatePending { .. } => {
78                let state = p2p_state
79                    .incoming_peer_connection_mut(&peer_id)
80                    .ok_or_else(|| format!("Invalid state for: {:?}", action))?;
81                if let Self::Init {
82                    signaling,
83                    offer,
84                    rpc_id,
85                    ..
86                } = state
87                {
88                    *state = Self::AnswerSdpCreatePending {
89                        time: meta.time(),
90                        signaling: *signaling,
91                        offer: offer.clone(),
92                        rpc_id: rpc_id.take(),
93                    };
94                } else {
95                    bug_condition!(
96                        "Invalid state for `P2pConnectionIncomingAction::AnswerSdpCreatePending`: {:?}",
97                        state
98                    );
99                }
100                Ok(())
101            }
102            P2pConnectionIncomingAction::AnswerSdpCreateError { peer_id, error } => {
103                let dispatcher = state_context.into_dispatcher();
104                dispatcher.push(P2pConnectionIncomingAction::Error {
105                    peer_id,
106                    error: P2pConnectionIncomingError::SdpCreateError(error.to_owned()),
107                });
108                Ok(())
109            }
110            P2pConnectionIncomingAction::AnswerSdpCreateSuccess { sdp, .. } => {
111                let state = p2p_state.incoming_peer_connection_mut(&peer_id).ok_or(
112                    "Missing state for `P2pConnectionIncomingAction::AnswerSdpCreateSuccess`",
113                )?;
114                if let Self::AnswerSdpCreatePending {
115                    signaling,
116                    offer,
117                    rpc_id,
118                    ..
119                } = state
120                {
121                    *state = Self::AnswerSdpCreateSuccess {
122                        time: meta.time(),
123                        signaling: *signaling,
124                        offer: offer.clone(),
125                        sdp: sdp.clone(),
126                        rpc_id: rpc_id.take(),
127                    };
128                } else {
129                    bug_condition!(
130                        "Invalid state for `P2pConnectionIncomingAction::AnswerSdpCreateSuccess`: {:?}",
131                        state
132                    );
133                    return Ok(());
134                }
135
136                let (dispatcher, state) = state_context.into_dispatcher_and_state();
137                let p2p_state: &P2pState = state.substate()?;
138                let answer = Box::new(crate::webrtc::Answer {
139                    sdp,
140                    identity_pub_key: p2p_state.config.identity_pub_key.clone(),
141                    target_peer_id: peer_id,
142                });
143                dispatcher.push(P2pConnectionIncomingAction::AnswerReady { peer_id, answer });
144                Ok(())
145            }
146            P2pConnectionIncomingAction::AnswerReady { peer_id, answer } => {
147                let state = p2p_state
148                    .incoming_peer_connection_mut(&peer_id)
149                    .ok_or("Invalid state for: `P2pConnectionIncomingAction::AnswerReady`")?;
150
151                let Self::AnswerSdpCreateSuccess {
152                    signaling,
153                    offer,
154                    rpc_id,
155                    ..
156                } = state
157                else {
158                    bug_condition!(
159                        "Invalid state for `P2pConnectionIncomingAction::AnswerReady`: {:?}",
160                        state
161                    );
162                    return Ok(());
163                };
164                let signaling = *signaling;
165                *state = Self::AnswerReady {
166                    time: meta.time(),
167                    signaling,
168                    offer: offer.clone(),
169                    answer: answer.clone(),
170                    rpc_id: rpc_id.take(),
171                };
172
173                let (dispatcher, state) = state_context.into_dispatcher_and_state();
174                let p2p_state: &P2pState = state.substate()?;
175
176                match signaling {
177                    IncomingSignalingMethod::Http => {
178                        // Will respond to the rpc below.
179                    }
180                    IncomingSignalingMethod::P2p { relay_peer_id } => {
181                        dispatcher.push(P2pChannelsSignalingExchangeAction::AnswerSend {
182                            peer_id: relay_peer_id,
183                            answer: P2pConnectionResponse::Accepted(answer.clone()),
184                        });
185                    }
186                }
187
188                if let Some(rpc_id) = p2p_state.peer_connection_rpc_id(&peer_id) {
189                    if let Some(callback) =
190                        &p2p_state.callbacks.on_p2p_connection_incoming_answer_ready
191                    {
192                        dispatcher.push_callback(
193                            callback.clone(),
194                            (rpc_id, peer_id, P2pConnectionResponse::Accepted(answer)),
195                        );
196                    }
197                }
198
199                Ok(())
200            }
201            P2pConnectionIncomingAction::AnswerSendSuccess { .. } => {
202                let state = p2p_state
203                    .incoming_peer_connection_mut(&peer_id)
204                    .ok_or_else(|| format!("Invalid state for: {:?}", action))?;
205                if let Self::AnswerReady {
206                    signaling,
207                    offer,
208                    answer,
209                    rpc_id,
210                    ..
211                } = state
212                {
213                    *state = Self::AnswerSendSuccess {
214                        time: meta.time(),
215                        signaling: *signaling,
216                        offer: offer.clone(),
217                        answer: answer.clone(),
218                        rpc_id: rpc_id.take(),
219                    };
220                } else {
221                    bug_condition!(
222                        "Invalid state for `P2pConnectionIncomingAction::AnswerSendSuccess`: {:?}",
223                        state
224                    );
225                    return Ok(());
226                }
227
228                let dispatcher = state_context.into_dispatcher();
229                dispatcher.push(P2pConnectionIncomingAction::FinalizePending { peer_id });
230                Ok(())
231            }
232            P2pConnectionIncomingAction::FinalizePending { .. } => {
233                let state = p2p_state
234                    .incoming_peer_connection_mut(&peer_id)
235                    .ok_or_else(|| format!("Invalid state for: {:?}", action))?;
236                if let Self::AnswerSendSuccess {
237                    signaling,
238                    offer,
239                    answer,
240                    rpc_id,
241                    ..
242                } = state
243                {
244                    let auth = offer.conn_auth(answer);
245                    let other_pub_key = offer.identity_pub_key.clone();
246
247                    *state = Self::FinalizePending {
248                        time: meta.time(),
249                        signaling: *signaling,
250                        offer: offer.clone(),
251                        answer: answer.clone(),
252                        rpc_id: rpc_id.take(),
253                    };
254
255                    let dispatcher = state_context.into_dispatcher();
256                    dispatcher.push(P2pConnectionIncomingEffectfulAction::ConnectionAuthorizationEncryptAndSend { peer_id, other_pub_key, auth });
257                } else {
258                    bug_condition!(
259                        "Invalid state for `P2pConnectionIncomingAction::FinalizePending`: {:?}",
260                        state
261                    );
262                }
263
264                Ok(())
265            }
266            P2pConnectionIncomingAction::FinalizeError { error, .. } => {
267                let dispatcher = state_context.into_dispatcher();
268                dispatcher.push(P2pConnectionIncomingAction::Error {
269                    peer_id,
270                    error: P2pConnectionIncomingError::FinalizeError(error.to_owned()),
271                });
272                Ok(())
273            }
274            P2pConnectionIncomingAction::FinalizeSuccess { remote_auth, .. } => {
275                let state = p2p_state
276                    .incoming_peer_connection_mut(&peer_id)
277                    .ok_or_else(|| {
278                        "Invalid state for: P2pConnectionIncomingAction::FinalizeSuccess".to_owned()
279                    })?;
280                let (expected_auth, other_pub_key) = if let Self::FinalizePending {
281                    signaling,
282                    offer,
283                    answer,
284                    rpc_id,
285                    ..
286                } = state
287                {
288                    let expected_auth = offer.conn_auth(answer);
289                    let other_pub_key = offer.identity_pub_key.clone();
290                    *state = Self::FinalizeSuccess {
291                        time: meta.time(),
292                        signaling: *signaling,
293                        offer: offer.clone(),
294                        answer: answer.clone(),
295                        rpc_id: rpc_id.take(),
296                    };
297                    (expected_auth, other_pub_key)
298                } else {
299                    bug_condition!(
300                        "Invalid state for `P2pConnectionIncomingAction::FinalizeSuccess`: {:?}",
301                        state
302                    );
303                    return Ok(());
304                };
305
306                let dispatcher = state_context.into_dispatcher();
307
308                dispatcher.push(
309                    P2pConnectionIncomingEffectfulAction::ConnectionAuthorizationDecryptAndCheck {
310                        peer_id,
311                        other_pub_key,
312                        expected_auth,
313                        auth: remote_auth,
314                    },
315                );
316                Ok(())
317            }
318            P2pConnectionIncomingAction::Timeout { .. } => {
319                let (dispatcher, _state) = state_context.into_dispatcher_and_state();
320
321                #[cfg(feature = "p2p-libp2p")]
322                {
323                    let p2p_state: &P2pState = _state.substate()?;
324                    if let Some((addr, _)) = p2p_state
325                        .network
326                        .scheduler
327                        .connections
328                        .iter()
329                        .find(|(_, state)| state.peer_id().is_some_and(|id| *id == peer_id))
330                    {
331                        dispatcher.push(P2pNetworkSchedulerAction::Disconnect {
332                            addr: *addr,
333                            reason: P2pDisconnectionReason::Timeout,
334                        });
335                    }
336                }
337
338                dispatcher.push(P2pConnectionIncomingAction::Error {
339                    peer_id,
340                    error: P2pConnectionIncomingError::Timeout,
341                });
342
343                Ok(())
344            }
345            P2pConnectionIncomingAction::Error { error, .. } => {
346                let state = p2p_state
347                    .incoming_peer_connection_mut(&peer_id)
348                    .ok_or("Missing state for `P2pConnectionIncomingAction::Error`")?;
349
350                let rpc_id = state.rpc_id();
351                let str_error = format!("{:?}", error);
352                *state = Self::Error {
353                    time: meta.time(),
354                    error,
355                    rpc_id,
356                };
357
358                let (dispatcher, state) = state_context.into_dispatcher_and_state();
359                let p2p_state: &P2pState = state.substate()?;
360
361                if let Some(rpc_id) = p2p_state.peer_connection_rpc_id(&peer_id) {
362                    if let Some(callback) = &p2p_state.callbacks.on_p2p_connection_incoming_error {
363                        dispatcher.push_callback(callback.clone(), (rpc_id, str_error));
364                    }
365                }
366                dispatcher.push(P2pDisconnectionAction::FailedCleanup { peer_id });
367
368                Ok(())
369            }
370            P2pConnectionIncomingAction::Success { .. } => {
371                let state = p2p_state
372                    .incoming_peer_connection_mut(&peer_id)
373                    .ok_or_else(|| format!("Invalid state for: {:?}", action))?;
374
375                if let Self::FinalizeSuccess {
376                    signaling,
377                    offer,
378                    answer,
379                    rpc_id,
380                    ..
381                } = state
382                {
383                    *state = Self::Success {
384                        time: meta.time(),
385                        signaling: *signaling,
386                        offer: offer.clone(),
387                        answer: answer.clone(),
388                        rpc_id: rpc_id.take(),
389                    };
390                } else {
391                    bug_condition!(
392                        "Invalid state for `P2pConnectionIncomingAction::Success`: {:?}",
393                        state
394                    );
395                    return Ok(());
396                }
397
398                let (dispatcher, state) = state_context.into_dispatcher_and_state();
399                let p2p_state: &P2pState = state.substate()?;
400
401                dispatcher.push(P2pPeerAction::Ready {
402                    peer_id,
403                    incoming: true,
404                });
405
406                if let Some(rpc_id) = p2p_state.peer_connection_rpc_id(&peer_id) {
407                    if let Some(callback) = &p2p_state.callbacks.on_p2p_connection_incoming_success
408                    {
409                        dispatcher.push_callback(callback.clone(), rpc_id);
410                    }
411                }
412                Ok(())
413            }
414            P2pConnectionIncomingAction::FinalizePendingLibp2p { addr, .. } => {
415                #[cfg(feature = "p2p-libp2p")]
416                {
417                    let state = p2p_state
418                        .peers
419                        .entry(peer_id)
420                        .or_insert_with(|| P2pPeerState {
421                            is_libp2p: true,
422                            dial_opts: Some(P2pConnectionOutgoingInitOpts::LibP2P(
423                                P2pConnectionOutgoingInitLibp2pOpts {
424                                    peer_id,
425                                    host: Host::from(addr.ip()),
426                                    port: addr.port(),
427                                },
428                            )),
429                            status: P2pPeerStatus::Disconnected { time: meta.time() },
430                            identify: None,
431                        });
432
433                    Self::reduce_finalize_libp2p_pending(state, addr, time, my_id, peer_id);
434
435                    let (dispatcher, state) = state_context.into_dispatcher_and_state();
436                    let p2p_state: &P2pState = state.substate()?;
437                    Self::dispatch_finalize_libp2p_pending(
438                        dispatcher, p2p_state, my_id, peer_id, time, addr,
439                    );
440                }
441
442                Ok(())
443            }
444            P2pConnectionIncomingAction::Libp2pReceived { .. } => {
445                #[cfg(feature = "p2p-libp2p")]
446                {
447                    let state = p2p_state
448                        .incoming_peer_connection_mut(&peer_id)
449                        .ok_or_else(|| format!("Invalid state for: {:?}", action))?;
450
451                    if let Self::FinalizePendingLibp2p { time, .. } = state {
452                        *state = Self::Libp2pReceived { time: *time };
453                    } else {
454                        bug_condition!(
455                            "Invalid state for `P2pConnectionIncomingAction::Libp2pReceived`: {:?}",
456                            state
457                        );
458                        return Ok(());
459                    }
460
461                    let dispatcher = state_context.into_dispatcher();
462                    dispatcher.push(P2pPeerAction::Ready {
463                        peer_id,
464                        incoming: true,
465                    });
466                }
467                Ok(())
468            }
469        }
470    }
471
472    #[cfg(feature = "p2p-libp2p")]
473    fn dispatch_finalize_libp2p_pending<Action, State>(
474        dispatcher: &mut Dispatcher<Action, State>,
475        p2p_state: &P2pState,
476        my_id: PeerId,
477        peer_id: PeerId,
478        time: Timestamp,
479        addr: SocketAddr,
480    ) where
481        State: crate::P2pStateTrait,
482        Action: crate::P2pActionTrait<State>,
483    {
484        let Some(peer_state) = p2p_state.peers.get(&peer_id) else {
485            bug_condition!("Peer State not found for {}", peer_id);
486            return;
487        };
488
489        if let Some(P2pConnectionIncomingState::FinalizePendingLibp2p {
490            close_duplicates, ..
491        }) = peer_state
492            .status
493            .as_connecting()
494            .and_then(|connecting| connecting.as_incoming())
495        {
496            if let Err(reason) = p2p_state.libp2p_incoming_accept(peer_id) {
497                warn!(time; node_id = display(my_id), summary = "rejecting incoming connection", peer_id = display(peer_id), reason = display(&reason));
498                dispatcher.push(P2pDisconnectionAction::Init {
499                    peer_id,
500                    reason: P2pDisconnectionReason::Libp2pIncomingRejected(reason),
501                });
502            } else {
503                debug!(time; "accepting incoming connection from {peer_id}");
504                if !close_duplicates.is_empty() {
505                    let duplicates = p2p_state
506                        .network
507                        .scheduler
508                        .connections
509                        .keys()
510                        .filter(
511                            |ConnectionAddr {
512                                 sock_addr,
513                                 incoming,
514                             }| {
515                                *incoming
516                                    && sock_addr != &addr
517                                    && close_duplicates.contains(sock_addr)
518                            },
519                        )
520                        .cloned()
521                        .collect::<Vec<_>>();
522
523                    for addr in duplicates {
524                        warn!(time; node_id = display(my_id), summary = "closing duplicate connection", addr = display(addr));
525                        dispatcher.push(P2pNetworkSchedulerAction::Disconnect {
526                            addr,
527                            reason: P2pDisconnectionReason::Libp2pIncomingRejected(
528                                RejectionReason::AlreadyConnected,
529                            ),
530                        });
531                    }
532                }
533            }
534        } else {
535            warn!(time; node_id = display(my_id), summary = "rejecting incoming connection as duplicate", peer_id = display(peer_id));
536            dispatcher.push(P2pNetworkSchedulerAction::Disconnect {
537                addr: ConnectionAddr {
538                    sock_addr: addr,
539                    incoming: true,
540                },
541                reason: P2pDisconnectionReason::Libp2pIncomingRejected(
542                    RejectionReason::AlreadyConnected,
543                ),
544            });
545        }
546    }
547
548    #[cfg(feature = "p2p-libp2p")]
549    fn reduce_finalize_libp2p_pending(
550        state: &mut P2pPeerState,
551        addr: SocketAddr,
552        time: Timestamp,
553        my_id: PeerId,
554        peer_id: PeerId,
555    ) {
556        let incoming_state = match &state.status {
557            // No duplicate connection
558            // Timeout connections should be already closed at this point
559            P2pPeerStatus::Disconnected { .. }
560            | P2pPeerStatus::Connecting(P2pConnectionState::Incoming(
561                P2pConnectionIncomingState::Error {
562                    error: P2pConnectionIncomingError::Timeout,
563                    ..
564                },
565            )) => Some(P2pConnectionIncomingState::FinalizePendingLibp2p {
566                addr,
567                close_duplicates: Vec::new(),
568                time,
569            }),
570            P2pPeerStatus::Connecting(P2pConnectionState::Outgoing(_)) if my_id < peer_id => {
571                // connection from lesser peer_id to greater one is kept in favour of the opposite one (incoming in this case)
572                None
573            }
574            P2pPeerStatus::Connecting(P2pConnectionState::Outgoing(_)) => {
575                let mut close_duplicates = Vec::new();
576                if let Some(identify) = state.identify.as_ref() {
577                    close_duplicates.extend(identify.listen_addrs.iter().filter_map(|maddr| {
578                        let mut iter = maddr.iter();
579                        let ip: IpAddr = match iter.next()? {
580                            multiaddr::Protocol::Ip4(ip4) => ip4.into(),
581                            multiaddr::Protocol::Ip6(ip6) => ip6.into(),
582                            _ => return None,
583                        };
584                        let port = match iter.next()? {
585                            multiaddr::Protocol::Tcp(port) => port,
586                            _ => return None,
587                        };
588                        Some(SocketAddr::from((ip, port)))
589                    }))
590                }
591                if let Some(P2pConnectionOutgoingInitOpts::LibP2P(libp2p)) =
592                    state.dial_opts.as_ref()
593                {
594                    match libp2p.try_into() {
595                        Ok(addr) if !close_duplicates.contains(&addr) => {
596                            close_duplicates.push(addr)
597                        }
598                        _ => {}
599                    }
600                };
601                Some(P2pConnectionIncomingState::FinalizePendingLibp2p {
602                    addr,
603                    close_duplicates,
604                    time,
605                })
606            }
607            _ => None,
608        };
609        if let Some(incoming_state) = incoming_state {
610            state.status = P2pPeerStatus::Connecting(P2pConnectionState::Incoming(incoming_state));
611        }
612    }
613}