1use std::net::SocketAddr;
2
3use openmina_core::{bug_condition, warn, Substate};
4use redux::ActionWithMeta;
5
6use crate::{
7 channels::signaling::discovery::P2pChannelsSignalingDiscoveryAction,
8 connection::{
9 outgoing_effectful::P2pConnectionOutgoingEffectfulAction, P2pConnectionErrorResponse,
10 P2pConnectionState,
11 },
12 disconnection::P2pDisconnectionAction,
13 webrtc::Host,
14 P2pNetworkKadRequestAction, P2pNetworkSchedulerAction, P2pPeerAction, P2pPeerState,
15 P2pPeerStatus, P2pState,
16};
17
18use super::{
19 libp2p_opts::P2pConnectionOutgoingInitLibp2pOptsTryToSocketAddrError,
20 P2pConnectionOutgoingAction, P2pConnectionOutgoingError, P2pConnectionOutgoingInitOpts,
21 P2pConnectionOutgoingState,
22};
23
24impl P2pConnectionOutgoingState {
25 pub fn reducer<Action, State>(
27 mut state_context: Substate<Action, State, P2pState>,
28 action: ActionWithMeta<P2pConnectionOutgoingAction>,
29 ) -> Result<(), String>
30 where
31 State: crate::P2pStateTrait,
32 Action: crate::P2pActionTrait<State>,
33 {
34 let (action, meta) = action.split();
35 let time = meta.time();
36 let p2p_state = state_context.get_substate_mut()?;
37
38 match action {
39 P2pConnectionOutgoingAction::RandomInit => {
40 let (dispatcher, state) = state_context.into_dispatcher_and_state();
41 let p2p_state: &P2pState = state.substate()?;
42 let peers = p2p_state.disconnected_peers().collect::<Vec<_>>();
43 dispatcher.push(P2pConnectionOutgoingEffectfulAction::RandomInit { peers });
44 Ok(())
45 }
46 P2pConnectionOutgoingAction::Init {
47 opts,
48 rpc_id,
49 on_success,
50 } => {
51 let peer_state =
52 p2p_state
53 .peers
54 .entry(*opts.peer_id())
55 .or_insert_with(|| P2pPeerState {
56 is_libp2p: opts.is_libp2p(),
57 dial_opts: Some(opts.clone()).filter(|v| v.can_connect_directly()),
58 status: P2pPeerStatus::Connecting(P2pConnectionState::outgoing_init(
59 &opts,
60 )),
61 identify: None,
62 });
63
64 peer_state.status =
65 P2pPeerStatus::Connecting(P2pConnectionState::Outgoing(Self::Init {
66 time,
67 opts: opts.clone(),
68 rpc_id,
69 on_success,
70 }));
71
72 let dispatcher = state_context.into_dispatcher();
73
74 #[cfg(feature = "p2p-libp2p")]
75 if let P2pConnectionOutgoingInitOpts::LibP2P(libp2p_opts) = &opts {
76 match SocketAddr::try_from(libp2p_opts) {
77 Ok(addr) => {
78 dispatcher.push(P2pNetworkSchedulerAction::OutgoingConnect { addr });
79 }
80 Err(
81 P2pConnectionOutgoingInitLibp2pOptsTryToSocketAddrError::Unresolved(
82 _name,
83 ),
84 ) => {
85 warn!(meta.time(); "name resolution needed to connect to {}", opts);
87 }
88 }
89 dispatcher.push(P2pConnectionOutgoingAction::FinalizePending {
90 peer_id: libp2p_opts.peer_id,
91 });
92 return Ok(());
93 }
94
95 dispatcher.push(P2pConnectionOutgoingEffectfulAction::Init { opts, rpc_id });
96 Ok(())
97 }
98 P2pConnectionOutgoingAction::Reconnect { opts, rpc_id } => {
99 let peer_state = p2p_state
100 .peers
101 .get_mut(opts.peer_id())
102 .ok_or("Missing peer state for: `P2pConnectionOutgoingAction::Reconnect`")?;
103
104 peer_state.status =
105 P2pPeerStatus::Connecting(P2pConnectionState::Outgoing(Self::Init {
106 time,
107 opts: opts.clone(),
108 rpc_id,
109 on_success: None,
110 }));
111
112 let dispatcher = state_context.into_dispatcher();
113
114 #[cfg(feature = "p2p-libp2p")]
115 if let P2pConnectionOutgoingInitOpts::LibP2P(libp2p_opts) = &opts {
116 match SocketAddr::try_from(libp2p_opts) {
117 Ok(addr) => {
118 dispatcher.push(P2pNetworkSchedulerAction::OutgoingConnect { addr });
119 }
120 Err(
121 P2pConnectionOutgoingInitLibp2pOptsTryToSocketAddrError::Unresolved(
122 _name,
123 ),
124 ) => {
125 warn!(meta.time(); "name resolution needed to connect to {}", opts);
127 }
128 }
129 dispatcher.push(P2pConnectionOutgoingAction::FinalizePending {
130 peer_id: *opts.peer_id(),
131 });
132 return Ok(());
133 }
134
135 dispatcher.push(P2pConnectionOutgoingEffectfulAction::Init { opts, rpc_id });
136 Ok(())
137 }
138 P2pConnectionOutgoingAction::OfferSdpCreatePending { peer_id, .. } => {
139 let state = p2p_state
140 .outgoing_peer_connection_mut(&peer_id)
141 .ok_or("Missing connection state for: `P2pConnectionOutgoingAction::OfferSdpCreatePending`")?;
142
143 if let Self::Init {
144 opts,
145 rpc_id,
146 on_success,
147 ..
148 } = state
149 {
150 *state = Self::OfferSdpCreatePending {
151 time,
152 opts: opts.clone(),
153 rpc_id: rpc_id.take(),
154 on_success: on_success.take(),
155 };
156 } else {
157 bug_condition!("Invalid state for `P2pConnectionOutgoingAction::OfferSdpCreatePending`: {:?}", state);
158 }
159
160 Ok(())
161 }
162 P2pConnectionOutgoingAction::OfferSdpCreateError { error, peer_id, .. } => {
163 let dispatcher = state_context.into_dispatcher();
164 dispatcher.push(P2pConnectionOutgoingAction::Error {
165 peer_id,
166 error: P2pConnectionOutgoingError::SdpCreateError(error.to_owned()),
167 });
168 Ok(())
169 }
170 P2pConnectionOutgoingAction::OfferSdpCreateSuccess { sdp, peer_id } => {
171 let chain_id = p2p_state.chain_id.clone();
172 let state = p2p_state
173 .outgoing_peer_connection_mut(&peer_id)
174 .ok_or("Missing peer connection for `P2pConnectionOutgoingAction::OfferSdpCreateSuccess`")?;
175
176 if let Self::OfferSdpCreatePending {
177 opts,
178 rpc_id,
179 on_success,
180 ..
181 } = state
182 {
183 *state = Self::OfferSdpCreateSuccess {
184 time,
185 opts: opts.clone(),
186 sdp: sdp.clone(),
187 rpc_id: rpc_id.take(),
188 on_success: on_success.take(),
189 };
190 } else {
191 bug_condition!("Invalid state for `P2pConnectionOutgoingAction::OfferSdpCreateSuccess`: {:?}", state);
192 return Ok(());
193 }
194
195 let offer = Box::new(crate::webrtc::Offer {
196 sdp,
197 chain_id,
198 identity_pub_key: p2p_state.config.identity_pub_key.clone(),
199 target_peer_id: peer_id,
200 host: Host::Ipv4([127, 0, 0, 1].into()),
202 listen_port: p2p_state.config.listen_port,
203 });
204 let dispatcher = state_context.into_dispatcher();
205 dispatcher.push(P2pConnectionOutgoingAction::OfferReady { peer_id, offer });
206 Ok(())
207 }
208 P2pConnectionOutgoingAction::OfferReady { offer, peer_id } => {
209 let state = p2p_state
210 .outgoing_peer_connection_mut(&peer_id)
211 .ok_or("Invalid state for `P2pConnectionOutgoingAction::OfferReady`")?;
212
213 let Self::OfferSdpCreateSuccess {
214 opts,
215 rpc_id,
216 on_success,
217 ..
218 } = state
219 else {
220 bug_condition!(
221 "Invalid state for `P2pConnectionOutgoingAction::OfferReady`: {:?}",
222 state
223 );
224 return Ok(());
225 };
226 let opts = opts.clone();
227 *state = Self::OfferReady {
228 time: meta.time(),
229 opts: opts.clone(),
230 offer: offer.clone(),
231 rpc_id: rpc_id.take(),
232 on_success: on_success.take(),
233 };
234
235 let dispatcher = state_context.into_dispatcher();
236
237 if let Some(relay_peer_id) = opts.webrtc_p2p_relay_peer_id() {
238 dispatcher.push(P2pChannelsSignalingDiscoveryAction::DiscoveredAccept {
239 peer_id: relay_peer_id,
240 offer,
241 });
242 } else {
243 let signaling_method = match opts {
244 P2pConnectionOutgoingInitOpts::WebRTC { signaling, .. } => signaling,
245 #[allow(unreachable_patterns)]
246 _ => return Ok(()),
247 };
248
249 dispatcher.push(P2pConnectionOutgoingEffectfulAction::OfferSend {
250 peer_id,
251 offer,
252 signaling_method,
253 });
254 }
255 Ok(())
256 }
257 P2pConnectionOutgoingAction::OfferSendSuccess { peer_id } => {
258 let state = p2p_state
259 .outgoing_peer_connection_mut(&peer_id)
260 .ok_or_else(|| format!("Invalid state: {:?}", action))?;
261 if let Self::OfferReady {
262 opts,
263 offer,
264 rpc_id,
265 on_success,
266 ..
267 } = state
268 {
269 *state = Self::OfferSendSuccess {
270 time,
271 opts: opts.clone(),
272 offer: offer.clone(),
273 rpc_id: rpc_id.take(),
274 on_success: on_success.take(),
275 };
276 } else {
277 bug_condition!(
278 "Invalid state for `P2pConnectionOutgoingAction::OfferSendSuccess`: {:?}",
279 state
280 );
281 return Ok(());
282 }
283
284 let dispatcher = state_context.into_dispatcher();
285 dispatcher.push(P2pConnectionOutgoingAction::AnswerRecvPending { peer_id });
286 Ok(())
287 }
288 P2pConnectionOutgoingAction::AnswerRecvPending { peer_id } => {
289 let state = p2p_state
290 .outgoing_peer_connection_mut(&peer_id)
291 .ok_or_else(|| format!("Invalid state: {:?}", action))?;
292 if let Self::OfferSendSuccess {
293 opts,
294 offer,
295 rpc_id,
296 on_success,
297 ..
298 } = state
299 {
300 *state = Self::AnswerRecvPending {
301 time,
302 opts: opts.clone(),
303 offer: offer.clone(),
304 rpc_id: rpc_id.take(),
305 on_success: on_success.take(),
306 };
307 } else {
308 bug_condition!(
309 "Invalid state for `P2pConnectionOutgoingAction::AnswerRecvPending`: {:?}",
310 state
311 );
312 }
313 Ok(())
314 }
315 P2pConnectionOutgoingAction::AnswerRecvError { error, peer_id } => {
316 let dispatcher = state_context.into_dispatcher();
317
318 dispatcher.push(P2pConnectionOutgoingAction::Error {
319 peer_id,
320 error: match error {
321 P2pConnectionErrorResponse::Rejected(reason) => {
322 P2pConnectionOutgoingError::Rejected(reason)
323 }
324 P2pConnectionErrorResponse::SignalDecryptionFailed => {
325 P2pConnectionOutgoingError::RemoteSignalDecryptionFailed
326 }
327 P2pConnectionErrorResponse::InternalError => {
328 P2pConnectionOutgoingError::RemoteInternalError
329 }
330 },
331 });
332 Ok(())
333 }
334 P2pConnectionOutgoingAction::AnswerRecvSuccess { answer, peer_id } => {
335 let state = p2p_state.outgoing_peer_connection_mut(&peer_id).ok_or(
336 "Missing peer connection for `P2pConnectionOutgoingAction::AnswerRecvSuccess`",
337 )?;
338
339 if let Self::AnswerRecvPending {
340 opts,
341 offer,
342 rpc_id,
343 on_success,
344 ..
345 } = state
346 {
347 *state = Self::AnswerRecvSuccess {
348 time,
349 opts: opts.clone(),
350 offer: offer.clone(),
351 answer: answer.clone(),
352 rpc_id: rpc_id.take(),
353 on_success: on_success.take(),
354 };
355 } else {
356 bug_condition!(
357 "Invalid state for `P2pConnectionOutgoingAction::AnswerRecvSuccess`: {:?}",
358 state
359 );
360 }
361
362 let dispatcher = state_context.into_dispatcher();
363 dispatcher
364 .push(P2pConnectionOutgoingEffectfulAction::AnswerSet { peer_id, answer });
365 Ok(())
366 }
367 P2pConnectionOutgoingAction::FinalizePending { peer_id } => {
368 let state = p2p_state
369 .outgoing_peer_connection_mut(&peer_id)
370 .ok_or_else(|| format!("Invalid state: {:?}", action))?;
371
372 let (auth, other_pub_key) = match state {
373 Self::Init {
374 opts,
375 rpc_id,
376 on_success,
377 ..
378 } => {
379 *state = Self::FinalizePending {
380 time,
381 opts: opts.clone(),
382 offer: None,
383 answer: None,
384 rpc_id: rpc_id.take(),
385 on_success: on_success.take(),
386 };
387 return Ok(());
388 }
389 Self::AnswerRecvSuccess {
390 opts,
391 offer,
392 answer,
393 rpc_id,
394 on_success,
395 ..
396 } => {
397 let auth = offer.conn_auth(answer);
398 let other_pub_key = answer.identity_pub_key.clone();
399
400 *state = Self::FinalizePending {
401 time,
402 opts: opts.clone(),
403 offer: Some(offer.clone()),
404 answer: Some(answer.clone()),
405 rpc_id: rpc_id.take(),
406 on_success: on_success.take(),
407 };
408
409 (auth, other_pub_key)
410 }
411 _ => {
412 bug_condition!("Invalid state for `P2pConnectionOutgoingAction::FinalizePending`: {state:?}");
413 return Ok(());
414 }
415 };
416
417 let dispatcher = state_context.into_dispatcher();
418 dispatcher.push(
419 P2pConnectionOutgoingEffectfulAction::ConnectionAuthorizationEncryptAndSend {
420 peer_id,
421 other_pub_key,
422 auth,
423 },
424 );
425 Ok(())
426 }
427 P2pConnectionOutgoingAction::FinalizeError { error, peer_id } => {
428 let dispatcher = state_context.into_dispatcher();
429 dispatcher.push(P2pConnectionOutgoingAction::Error {
430 peer_id,
431 error: P2pConnectionOutgoingError::FinalizeError(error.to_owned()),
432 });
433 Ok(())
434 }
435 P2pConnectionOutgoingAction::FinalizeSuccess {
436 peer_id,
437 remote_auth: auth,
438 } => {
439 let state = p2p_state
440 .outgoing_peer_connection_mut(&peer_id)
441 .ok_or_else(|| {
442 "Invalid state for: P2pConnectionOutgoingAction::FinalizeSuccess".to_owned()
443 })?;
444
445 let values = if let Self::FinalizePending {
446 opts,
447 offer,
448 answer,
449 rpc_id,
450 on_success,
451 ..
452 } = state
453 {
454 let values = None.or_else(|| {
455 let answer = answer.as_ref()?;
456 Some((
457 auth?,
458 offer.as_ref()?.conn_auth(answer),
459 answer.identity_pub_key.clone(),
460 ))
461 });
462 *state = Self::FinalizeSuccess {
463 time,
464 opts: opts.clone(),
465 offer: offer.clone(),
466 answer: answer.clone(),
467 rpc_id: rpc_id.take(),
468 on_success: on_success.take(),
469 };
470 values
471 } else {
472 bug_condition!(
473 "Invalid state for `P2pConnectionOutgoingAction::FinalizeSuccess`: {:?}",
474 state
475 );
476 return Ok(());
477 };
478
479 let dispatcher = state_context.into_dispatcher();
480 if let Some((auth, expected_auth, other_pub_key)) = values {
481 dispatcher.push(
482 P2pConnectionOutgoingEffectfulAction::ConnectionAuthorizationDecryptAndCheck {
483 peer_id,
484 other_pub_key,
485 expected_auth,
486 auth,
487 },
488 );
489 } else {
490 dispatcher.push(P2pConnectionOutgoingAction::Success { peer_id });
492 }
493 Ok(())
494 }
495 P2pConnectionOutgoingAction::Timeout { peer_id } => {
496 let dispatcher = state_context.into_dispatcher();
497 dispatcher.push(P2pConnectionOutgoingAction::Error {
498 peer_id,
499 error: P2pConnectionOutgoingError::Timeout,
500 });
501 Ok(())
502 }
503 P2pConnectionOutgoingAction::Error { error, peer_id } => {
504 let state = p2p_state
505 .outgoing_peer_connection_mut(&peer_id)
506 .ok_or("Missing peer connection for `P2pConnectionOutgoingAction::Error`")?;
507
508 let rpc_id = state.rpc_id();
509 *state = Self::Error {
510 time,
511 error: error.clone(),
512 rpc_id,
513 };
514
515 let (dispatcher, state) = state_context.into_dispatcher_and_state();
516 let p2p_state: &P2pState = state.substate()?;
517
518 #[cfg(feature = "p2p-libp2p")]
519 {
520 if p2p_state
521 .network
522 .scheduler
523 .discovery_state()
524 .and_then(|discovery_state| discovery_state.request(&peer_id))
525 .is_some()
526 {
527 dispatcher.push(P2pNetworkKadRequestAction::Error {
528 peer_id,
529 error: error.to_string(),
530 });
531 }
532 }
533
534 if let Some(rpc_id) = p2p_state.peer_connection_rpc_id(&peer_id) {
535 if let Some(callback) = &p2p_state.callbacks.on_p2p_connection_outgoing_error {
536 dispatcher.push_callback(callback.clone(), (rpc_id, error));
537 }
538 }
539 dispatcher.push(P2pDisconnectionAction::FailedCleanup { peer_id });
540
541 Ok(())
542 }
543 P2pConnectionOutgoingAction::Success { peer_id } => {
544 let state = p2p_state
545 .outgoing_peer_connection_mut(&peer_id)
546 .ok_or_else(|| format!("Invalid state: {:?}", action))?;
547
548 let Self::FinalizeSuccess {
549 offer,
550 answer,
551 rpc_id,
552 on_success,
553 ..
554 } = state
555 else {
556 bug_condition!(
557 "Invalid state for `P2pConnectionOutgoingAction::Success`: {:?}",
558 state
559 );
560 return Ok(());
561 };
562
563 let callback = on_success.take();
564
565 *state = Self::Success {
566 time,
567 offer: offer.clone(),
568 answer: answer.clone(),
569 rpc_id: rpc_id.take(),
570 };
571
572 let (dispatcher, state) = state_context.into_dispatcher_and_state();
573 let p2p_state: &P2pState = state.substate()?;
574
575 dispatcher.push(P2pPeerAction::Ready {
576 peer_id,
577 incoming: false,
578 });
579
580 let rpc_id = p2p_state.peer_connection_rpc_id(&peer_id);
581 if let Some(rpc_id) = rpc_id {
582 if let Some(callback) = &p2p_state.callbacks.on_p2p_connection_outgoing_success
583 {
584 dispatcher.push_callback(callback.clone(), rpc_id);
585 }
586 }
587
588 if let Some(callback) = callback {
589 dispatcher.push_callback(callback, (peer_id, rpc_id));
590 }
591 Ok(())
592 }
593 }
594 }
595}