p2p/connection/outgoing/
p2p_connection_outgoing_reducer.rs

1use std::net::SocketAddr;
2
3use openmina_core::{bug_condition, warn, Substate};
4use redux::ActionWithMeta;
5
6use crate::{
7    channels::signaling::discovery::P2pChannelsSignalingDiscoveryAction,
8    connection::{
9        outgoing_effectful::P2pConnectionOutgoingEffectfulAction, P2pConnectionErrorResponse,
10        P2pConnectionState,
11    },
12    disconnection::P2pDisconnectionAction,
13    webrtc::Host,
14    P2pNetworkKadRequestAction, P2pNetworkSchedulerAction, P2pPeerAction, P2pPeerState,
15    P2pPeerStatus, P2pState,
16};
17
18use super::{
19    libp2p_opts::P2pConnectionOutgoingInitLibp2pOptsTryToSocketAddrError,
20    P2pConnectionOutgoingAction, P2pConnectionOutgoingError, P2pConnectionOutgoingInitOpts,
21    P2pConnectionOutgoingState,
22};
23
24impl P2pConnectionOutgoingState {
25    /// Substate is accessed
26    pub fn reducer<Action, State>(
27        mut state_context: Substate<Action, State, P2pState>,
28        action: ActionWithMeta<P2pConnectionOutgoingAction>,
29    ) -> Result<(), String>
30    where
31        State: crate::P2pStateTrait,
32        Action: crate::P2pActionTrait<State>,
33    {
34        let (action, meta) = action.split();
35        let time = meta.time();
36        let p2p_state = state_context.get_substate_mut()?;
37
38        match action {
39            P2pConnectionOutgoingAction::RandomInit => {
40                let (dispatcher, state) = state_context.into_dispatcher_and_state();
41                let p2p_state: &P2pState = state.substate()?;
42                let peers = p2p_state.disconnected_peers().collect::<Vec<_>>();
43                dispatcher.push(P2pConnectionOutgoingEffectfulAction::RandomInit { peers });
44                Ok(())
45            }
46            P2pConnectionOutgoingAction::Init {
47                opts,
48                rpc_id,
49                on_success,
50            } => {
51                let peer_state =
52                    p2p_state
53                        .peers
54                        .entry(*opts.peer_id())
55                        .or_insert_with(|| P2pPeerState {
56                            is_libp2p: opts.is_libp2p(),
57                            dial_opts: Some(opts.clone()).filter(|v| v.can_connect_directly()),
58                            status: P2pPeerStatus::Connecting(P2pConnectionState::outgoing_init(
59                                &opts,
60                            )),
61                            identify: None,
62                        });
63
64                peer_state.status =
65                    P2pPeerStatus::Connecting(P2pConnectionState::Outgoing(Self::Init {
66                        time,
67                        opts: opts.clone(),
68                        rpc_id,
69                        on_success,
70                    }));
71
72                let dispatcher = state_context.into_dispatcher();
73
74                #[cfg(feature = "p2p-libp2p")]
75                if let P2pConnectionOutgoingInitOpts::LibP2P(libp2p_opts) = &opts {
76                    match SocketAddr::try_from(libp2p_opts) {
77                        Ok(addr) => {
78                            dispatcher.push(P2pNetworkSchedulerAction::OutgoingConnect { addr });
79                        }
80                        Err(
81                            P2pConnectionOutgoingInitLibp2pOptsTryToSocketAddrError::Unresolved(
82                                _name,
83                            ),
84                        ) => {
85                            // TODO: initiate name resolution
86                            warn!(meta.time(); "name resolution needed to connect to {}", opts);
87                        }
88                    }
89                    dispatcher.push(P2pConnectionOutgoingAction::FinalizePending {
90                        peer_id: libp2p_opts.peer_id,
91                    });
92                    return Ok(());
93                }
94
95                dispatcher.push(P2pConnectionOutgoingEffectfulAction::Init { opts, rpc_id });
96                Ok(())
97            }
98            P2pConnectionOutgoingAction::Reconnect { opts, rpc_id } => {
99                let peer_state = p2p_state
100                    .peers
101                    .get_mut(opts.peer_id())
102                    .ok_or("Missing peer state for: `P2pConnectionOutgoingAction::Reconnect`")?;
103
104                peer_state.status =
105                    P2pPeerStatus::Connecting(P2pConnectionState::Outgoing(Self::Init {
106                        time,
107                        opts: opts.clone(),
108                        rpc_id,
109                        on_success: None,
110                    }));
111
112                let dispatcher = state_context.into_dispatcher();
113
114                #[cfg(feature = "p2p-libp2p")]
115                if let P2pConnectionOutgoingInitOpts::LibP2P(libp2p_opts) = &opts {
116                    match SocketAddr::try_from(libp2p_opts) {
117                        Ok(addr) => {
118                            dispatcher.push(P2pNetworkSchedulerAction::OutgoingConnect { addr });
119                        }
120                        Err(
121                            P2pConnectionOutgoingInitLibp2pOptsTryToSocketAddrError::Unresolved(
122                                _name,
123                            ),
124                        ) => {
125                            // TODO: initiate name resolution
126                            warn!(meta.time(); "name resolution needed to connect to {}", opts);
127                        }
128                    }
129                    dispatcher.push(P2pConnectionOutgoingAction::FinalizePending {
130                        peer_id: *opts.peer_id(),
131                    });
132                    return Ok(());
133                }
134
135                dispatcher.push(P2pConnectionOutgoingEffectfulAction::Init { opts, rpc_id });
136                Ok(())
137            }
138            P2pConnectionOutgoingAction::OfferSdpCreatePending { peer_id, .. } => {
139                let state = p2p_state
140                    .outgoing_peer_connection_mut(&peer_id)
141                    .ok_or("Missing connection state for: `P2pConnectionOutgoingAction::OfferSdpCreatePending`")?;
142
143                if let Self::Init {
144                    opts,
145                    rpc_id,
146                    on_success,
147                    ..
148                } = state
149                {
150                    *state = Self::OfferSdpCreatePending {
151                        time,
152                        opts: opts.clone(),
153                        rpc_id: rpc_id.take(),
154                        on_success: on_success.take(),
155                    };
156                } else {
157                    bug_condition!("Invalid state for `P2pConnectionOutgoingAction::OfferSdpCreatePending`: {:?}", state);
158                }
159
160                Ok(())
161            }
162            P2pConnectionOutgoingAction::OfferSdpCreateError { error, peer_id, .. } => {
163                let dispatcher = state_context.into_dispatcher();
164                dispatcher.push(P2pConnectionOutgoingAction::Error {
165                    peer_id,
166                    error: P2pConnectionOutgoingError::SdpCreateError(error.to_owned()),
167                });
168                Ok(())
169            }
170            P2pConnectionOutgoingAction::OfferSdpCreateSuccess { sdp, peer_id } => {
171                let chain_id = p2p_state.chain_id.clone();
172                let state = p2p_state
173                    .outgoing_peer_connection_mut(&peer_id)
174                    .ok_or("Missing peer connection for `P2pConnectionOutgoingAction::OfferSdpCreateSuccess`")?;
175
176                if let Self::OfferSdpCreatePending {
177                    opts,
178                    rpc_id,
179                    on_success,
180                    ..
181                } = state
182                {
183                    *state = Self::OfferSdpCreateSuccess {
184                        time,
185                        opts: opts.clone(),
186                        sdp: sdp.clone(),
187                        rpc_id: rpc_id.take(),
188                        on_success: on_success.take(),
189                    };
190                } else {
191                    bug_condition!("Invalid state for `P2pConnectionOutgoingAction::OfferSdpCreateSuccess`: {:?}", state);
192                    return Ok(());
193                }
194
195                let offer = Box::new(crate::webrtc::Offer {
196                    sdp,
197                    chain_id,
198                    identity_pub_key: p2p_state.config.identity_pub_key.clone(),
199                    target_peer_id: peer_id,
200                    // TODO(vlad9486): put real address
201                    host: Host::Ipv4([127, 0, 0, 1].into()),
202                    listen_port: p2p_state.config.listen_port,
203                });
204                let dispatcher = state_context.into_dispatcher();
205                dispatcher.push(P2pConnectionOutgoingAction::OfferReady { peer_id, offer });
206                Ok(())
207            }
208            P2pConnectionOutgoingAction::OfferReady { offer, peer_id } => {
209                let state = p2p_state
210                    .outgoing_peer_connection_mut(&peer_id)
211                    .ok_or("Invalid state for `P2pConnectionOutgoingAction::OfferReady`")?;
212
213                let Self::OfferSdpCreateSuccess {
214                    opts,
215                    rpc_id,
216                    on_success,
217                    ..
218                } = state
219                else {
220                    bug_condition!(
221                        "Invalid state for `P2pConnectionOutgoingAction::OfferReady`: {:?}",
222                        state
223                    );
224                    return Ok(());
225                };
226                let opts = opts.clone();
227                *state = Self::OfferReady {
228                    time: meta.time(),
229                    opts: opts.clone(),
230                    offer: offer.clone(),
231                    rpc_id: rpc_id.take(),
232                    on_success: on_success.take(),
233                };
234
235                let dispatcher = state_context.into_dispatcher();
236
237                if let Some(relay_peer_id) = opts.webrtc_p2p_relay_peer_id() {
238                    dispatcher.push(P2pChannelsSignalingDiscoveryAction::DiscoveredAccept {
239                        peer_id: relay_peer_id,
240                        offer,
241                    });
242                } else {
243                    let signaling_method = match opts {
244                        P2pConnectionOutgoingInitOpts::WebRTC { signaling, .. } => signaling,
245                        #[allow(unreachable_patterns)]
246                        _ => return Ok(()),
247                    };
248
249                    dispatcher.push(P2pConnectionOutgoingEffectfulAction::OfferSend {
250                        peer_id,
251                        offer,
252                        signaling_method,
253                    });
254                }
255                Ok(())
256            }
257            P2pConnectionOutgoingAction::OfferSendSuccess { peer_id } => {
258                let state = p2p_state
259                    .outgoing_peer_connection_mut(&peer_id)
260                    .ok_or_else(|| format!("Invalid state: {:?}", action))?;
261                if let Self::OfferReady {
262                    opts,
263                    offer,
264                    rpc_id,
265                    on_success,
266                    ..
267                } = state
268                {
269                    *state = Self::OfferSendSuccess {
270                        time,
271                        opts: opts.clone(),
272                        offer: offer.clone(),
273                        rpc_id: rpc_id.take(),
274                        on_success: on_success.take(),
275                    };
276                } else {
277                    bug_condition!(
278                        "Invalid state for `P2pConnectionOutgoingAction::OfferSendSuccess`: {:?}",
279                        state
280                    );
281                    return Ok(());
282                }
283
284                let dispatcher = state_context.into_dispatcher();
285                dispatcher.push(P2pConnectionOutgoingAction::AnswerRecvPending { peer_id });
286                Ok(())
287            }
288            P2pConnectionOutgoingAction::AnswerRecvPending { peer_id } => {
289                let state = p2p_state
290                    .outgoing_peer_connection_mut(&peer_id)
291                    .ok_or_else(|| format!("Invalid state: {:?}", action))?;
292                if let Self::OfferSendSuccess {
293                    opts,
294                    offer,
295                    rpc_id,
296                    on_success,
297                    ..
298                } = state
299                {
300                    *state = Self::AnswerRecvPending {
301                        time,
302                        opts: opts.clone(),
303                        offer: offer.clone(),
304                        rpc_id: rpc_id.take(),
305                        on_success: on_success.take(),
306                    };
307                } else {
308                    bug_condition!(
309                        "Invalid state for `P2pConnectionOutgoingAction::AnswerRecvPending`: {:?}",
310                        state
311                    );
312                }
313                Ok(())
314            }
315            P2pConnectionOutgoingAction::AnswerRecvError { error, peer_id } => {
316                let dispatcher = state_context.into_dispatcher();
317
318                dispatcher.push(P2pConnectionOutgoingAction::Error {
319                    peer_id,
320                    error: match error {
321                        P2pConnectionErrorResponse::Rejected(reason) => {
322                            P2pConnectionOutgoingError::Rejected(reason)
323                        }
324                        P2pConnectionErrorResponse::SignalDecryptionFailed => {
325                            P2pConnectionOutgoingError::RemoteSignalDecryptionFailed
326                        }
327                        P2pConnectionErrorResponse::InternalError => {
328                            P2pConnectionOutgoingError::RemoteInternalError
329                        }
330                    },
331                });
332                Ok(())
333            }
334            P2pConnectionOutgoingAction::AnswerRecvSuccess { answer, peer_id } => {
335                let state = p2p_state.outgoing_peer_connection_mut(&peer_id).ok_or(
336                    "Missing peer connection for `P2pConnectionOutgoingAction::AnswerRecvSuccess`",
337                )?;
338
339                if let Self::AnswerRecvPending {
340                    opts,
341                    offer,
342                    rpc_id,
343                    on_success,
344                    ..
345                } = state
346                {
347                    *state = Self::AnswerRecvSuccess {
348                        time,
349                        opts: opts.clone(),
350                        offer: offer.clone(),
351                        answer: answer.clone(),
352                        rpc_id: rpc_id.take(),
353                        on_success: on_success.take(),
354                    };
355                } else {
356                    bug_condition!(
357                        "Invalid state for `P2pConnectionOutgoingAction::AnswerRecvSuccess`: {:?}",
358                        state
359                    );
360                }
361
362                let dispatcher = state_context.into_dispatcher();
363                dispatcher
364                    .push(P2pConnectionOutgoingEffectfulAction::AnswerSet { peer_id, answer });
365                Ok(())
366            }
367            P2pConnectionOutgoingAction::FinalizePending { peer_id } => {
368                let state = p2p_state
369                    .outgoing_peer_connection_mut(&peer_id)
370                    .ok_or_else(|| format!("Invalid state: {:?}", action))?;
371
372                let (auth, other_pub_key) = match state {
373                    Self::Init {
374                        opts,
375                        rpc_id,
376                        on_success,
377                        ..
378                    } => {
379                        *state = Self::FinalizePending {
380                            time,
381                            opts: opts.clone(),
382                            offer: None,
383                            answer: None,
384                            rpc_id: rpc_id.take(),
385                            on_success: on_success.take(),
386                        };
387                        return Ok(());
388                    }
389                    Self::AnswerRecvSuccess {
390                        opts,
391                        offer,
392                        answer,
393                        rpc_id,
394                        on_success,
395                        ..
396                    } => {
397                        let auth = offer.conn_auth(answer);
398                        let other_pub_key = answer.identity_pub_key.clone();
399
400                        *state = Self::FinalizePending {
401                            time,
402                            opts: opts.clone(),
403                            offer: Some(offer.clone()),
404                            answer: Some(answer.clone()),
405                            rpc_id: rpc_id.take(),
406                            on_success: on_success.take(),
407                        };
408
409                        (auth, other_pub_key)
410                    }
411                    _ => {
412                        bug_condition!("Invalid state for `P2pConnectionOutgoingAction::FinalizePending`: {state:?}");
413                        return Ok(());
414                    }
415                };
416
417                let dispatcher = state_context.into_dispatcher();
418                dispatcher.push(
419                    P2pConnectionOutgoingEffectfulAction::ConnectionAuthorizationEncryptAndSend {
420                        peer_id,
421                        other_pub_key,
422                        auth,
423                    },
424                );
425                Ok(())
426            }
427            P2pConnectionOutgoingAction::FinalizeError { error, peer_id } => {
428                let dispatcher = state_context.into_dispatcher();
429                dispatcher.push(P2pConnectionOutgoingAction::Error {
430                    peer_id,
431                    error: P2pConnectionOutgoingError::FinalizeError(error.to_owned()),
432                });
433                Ok(())
434            }
435            P2pConnectionOutgoingAction::FinalizeSuccess {
436                peer_id,
437                remote_auth: auth,
438            } => {
439                let state = p2p_state
440                    .outgoing_peer_connection_mut(&peer_id)
441                    .ok_or_else(|| {
442                        "Invalid state for: P2pConnectionOutgoingAction::FinalizeSuccess".to_owned()
443                    })?;
444
445                let values = if let Self::FinalizePending {
446                    opts,
447                    offer,
448                    answer,
449                    rpc_id,
450                    on_success,
451                    ..
452                } = state
453                {
454                    let values = None.or_else(|| {
455                        let answer = answer.as_ref()?;
456                        Some((
457                            auth?,
458                            offer.as_ref()?.conn_auth(answer),
459                            answer.identity_pub_key.clone(),
460                        ))
461                    });
462                    *state = Self::FinalizeSuccess {
463                        time,
464                        opts: opts.clone(),
465                        offer: offer.clone(),
466                        answer: answer.clone(),
467                        rpc_id: rpc_id.take(),
468                        on_success: on_success.take(),
469                    };
470                    values
471                } else {
472                    bug_condition!(
473                        "Invalid state for `P2pConnectionOutgoingAction::FinalizeSuccess`: {:?}",
474                        state
475                    );
476                    return Ok(());
477                };
478
479                let dispatcher = state_context.into_dispatcher();
480                if let Some((auth, expected_auth, other_pub_key)) = values {
481                    dispatcher.push(
482                        P2pConnectionOutgoingEffectfulAction::ConnectionAuthorizationDecryptAndCheck {
483                            peer_id,
484                            other_pub_key,
485                            expected_auth,
486                            auth,
487                        },
488                    );
489                } else {
490                    // libp2p
491                    dispatcher.push(P2pConnectionOutgoingAction::Success { peer_id });
492                }
493                Ok(())
494            }
495            P2pConnectionOutgoingAction::Timeout { peer_id } => {
496                let dispatcher = state_context.into_dispatcher();
497                dispatcher.push(P2pConnectionOutgoingAction::Error {
498                    peer_id,
499                    error: P2pConnectionOutgoingError::Timeout,
500                });
501                Ok(())
502            }
503            P2pConnectionOutgoingAction::Error { error, peer_id } => {
504                let state = p2p_state
505                    .outgoing_peer_connection_mut(&peer_id)
506                    .ok_or("Missing peer connection for `P2pConnectionOutgoingAction::Error`")?;
507
508                let rpc_id = state.rpc_id();
509                *state = Self::Error {
510                    time,
511                    error: error.clone(),
512                    rpc_id,
513                };
514
515                let (dispatcher, state) = state_context.into_dispatcher_and_state();
516                let p2p_state: &P2pState = state.substate()?;
517
518                #[cfg(feature = "p2p-libp2p")]
519                {
520                    if p2p_state
521                        .network
522                        .scheduler
523                        .discovery_state()
524                        .and_then(|discovery_state| discovery_state.request(&peer_id))
525                        .is_some()
526                    {
527                        dispatcher.push(P2pNetworkKadRequestAction::Error {
528                            peer_id,
529                            error: error.to_string(),
530                        });
531                    }
532                }
533
534                if let Some(rpc_id) = p2p_state.peer_connection_rpc_id(&peer_id) {
535                    if let Some(callback) = &p2p_state.callbacks.on_p2p_connection_outgoing_error {
536                        dispatcher.push_callback(callback.clone(), (rpc_id, error));
537                    }
538                }
539                dispatcher.push(P2pDisconnectionAction::FailedCleanup { peer_id });
540
541                Ok(())
542            }
543            P2pConnectionOutgoingAction::Success { peer_id } => {
544                let state = p2p_state
545                    .outgoing_peer_connection_mut(&peer_id)
546                    .ok_or_else(|| format!("Invalid state: {:?}", action))?;
547
548                let Self::FinalizeSuccess {
549                    offer,
550                    answer,
551                    rpc_id,
552                    on_success,
553                    ..
554                } = state
555                else {
556                    bug_condition!(
557                        "Invalid state for `P2pConnectionOutgoingAction::Success`: {:?}",
558                        state
559                    );
560                    return Ok(());
561                };
562
563                let callback = on_success.take();
564
565                *state = Self::Success {
566                    time,
567                    offer: offer.clone(),
568                    answer: answer.clone(),
569                    rpc_id: rpc_id.take(),
570                };
571
572                let (dispatcher, state) = state_context.into_dispatcher_and_state();
573                let p2p_state: &P2pState = state.substate()?;
574
575                dispatcher.push(P2pPeerAction::Ready {
576                    peer_id,
577                    incoming: false,
578                });
579
580                let rpc_id = p2p_state.peer_connection_rpc_id(&peer_id);
581                if let Some(rpc_id) = rpc_id {
582                    if let Some(callback) = &p2p_state.callbacks.on_p2p_connection_outgoing_success
583                    {
584                        dispatcher.push_callback(callback.clone(), rpc_id);
585                    }
586                }
587
588                if let Some(callback) = callback {
589                    dispatcher.push_callback(callback, (peer_id, rpc_id));
590                }
591                Ok(())
592            }
593        }
594    }
595}