1#[cfg(feature = "p2p-libp2p")]
2use std::net::{IpAddr, SocketAddr};
3
4use openmina_core::{bug_condition, debug, warn, Substate};
5use redux::{ActionWithMeta, Dispatcher, Timestamp};
6
7use crate::{
8 channels::signaling::exchange::P2pChannelsSignalingExchangeAction,
9 connection::{
10 incoming::P2pConnectionIncomingError,
11 incoming_effectful::P2pConnectionIncomingEffectfulAction,
12 outgoing::{P2pConnectionOutgoingInitLibp2pOpts, P2pConnectionOutgoingInitOpts},
13 P2pConnectionResponse, P2pConnectionState,
14 },
15 disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
16 webrtc::{Host, HttpSignalingInfo, SignalingMethod},
17 ConnectionAddr, P2pNetworkSchedulerAction, P2pPeerAction, P2pPeerState, P2pPeerStatus,
18 P2pState, PeerId,
19};
20
21use super::{
22 super::{incoming::P2pConnectionIncomingState, RejectionReason},
23 IncomingSignalingMethod, P2pConnectionIncomingAction,
24};
25
26impl P2pConnectionIncomingState {
27 pub fn reducer<Action, State>(
29 mut state_context: Substate<Action, State, P2pState>,
30 action: ActionWithMeta<P2pConnectionIncomingAction>,
31 ) -> Result<(), String>
32 where
33 State: crate::P2pStateTrait,
34 Action: crate::P2pActionTrait<State>,
35 {
36 let (action, meta) = action.split();
37 let time = meta.time();
38 let peer_id = *action.peer_id();
39 let p2p_state = state_context.get_substate_mut()?;
40 let my_id = p2p_state.my_id();
41
42 match action {
43 P2pConnectionIncomingAction::Init { opts, rpc_id } => {
44 let state = p2p_state
45 .peers
46 .entry(peer_id)
47 .or_insert_with(|| P2pPeerState {
48 is_libp2p: false,
49 dial_opts: opts.offer.listen_port.and_then(|listen_port| {
50 let signaling = match opts.signaling {
51 IncomingSignalingMethod::Http => {
52 SignalingMethod::Http(HttpSignalingInfo {
53 host: opts.offer.host.clone(),
54 port: listen_port,
55 })
56 }
57 IncomingSignalingMethod::P2p { .. } => return None,
58 };
59 Some(P2pConnectionOutgoingInitOpts::WebRTC { peer_id, signaling })
60 }),
61 status: P2pPeerStatus::Connecting(P2pConnectionState::incoming_init(&opts)),
62 identify: None,
63 });
64
65 state.status =
66 P2pPeerStatus::Connecting(P2pConnectionState::Incoming(Self::Init {
67 time: meta.time(),
68 signaling: opts.signaling,
69 offer: opts.offer.clone(),
70 rpc_id,
71 }));
72
73 let dispatcher = state_context.into_dispatcher();
74 dispatcher.push(P2pConnectionIncomingEffectfulAction::Init { opts });
75 Ok(())
76 }
77 P2pConnectionIncomingAction::AnswerSdpCreatePending { .. } => {
78 let state = p2p_state
79 .incoming_peer_connection_mut(&peer_id)
80 .ok_or_else(|| format!("Invalid state for: {:?}", action))?;
81 if let Self::Init {
82 signaling,
83 offer,
84 rpc_id,
85 ..
86 } = state
87 {
88 *state = Self::AnswerSdpCreatePending {
89 time: meta.time(),
90 signaling: *signaling,
91 offer: offer.clone(),
92 rpc_id: rpc_id.take(),
93 };
94 } else {
95 bug_condition!(
96 "Invalid state for `P2pConnectionIncomingAction::AnswerSdpCreatePending`: {:?}",
97 state
98 );
99 }
100 Ok(())
101 }
102 P2pConnectionIncomingAction::AnswerSdpCreateError { peer_id, error } => {
103 let dispatcher = state_context.into_dispatcher();
104 dispatcher.push(P2pConnectionIncomingAction::Error {
105 peer_id,
106 error: P2pConnectionIncomingError::SdpCreateError(error.to_owned()),
107 });
108 Ok(())
109 }
110 P2pConnectionIncomingAction::AnswerSdpCreateSuccess { sdp, .. } => {
111 let state = p2p_state.incoming_peer_connection_mut(&peer_id).ok_or(
112 "Missing state for `P2pConnectionIncomingAction::AnswerSdpCreateSuccess`",
113 )?;
114 if let Self::AnswerSdpCreatePending {
115 signaling,
116 offer,
117 rpc_id,
118 ..
119 } = state
120 {
121 *state = Self::AnswerSdpCreateSuccess {
122 time: meta.time(),
123 signaling: *signaling,
124 offer: offer.clone(),
125 sdp: sdp.clone(),
126 rpc_id: rpc_id.take(),
127 };
128 } else {
129 bug_condition!(
130 "Invalid state for `P2pConnectionIncomingAction::AnswerSdpCreateSuccess`: {:?}",
131 state
132 );
133 return Ok(());
134 }
135
136 let (dispatcher, state) = state_context.into_dispatcher_and_state();
137 let p2p_state: &P2pState = state.substate()?;
138 let answer = Box::new(crate::webrtc::Answer {
139 sdp,
140 identity_pub_key: p2p_state.config.identity_pub_key.clone(),
141 target_peer_id: peer_id,
142 });
143 dispatcher.push(P2pConnectionIncomingAction::AnswerReady { peer_id, answer });
144 Ok(())
145 }
146 P2pConnectionIncomingAction::AnswerReady { peer_id, answer } => {
147 let state = p2p_state
148 .incoming_peer_connection_mut(&peer_id)
149 .ok_or("Invalid state for: `P2pConnectionIncomingAction::AnswerReady`")?;
150
151 let Self::AnswerSdpCreateSuccess {
152 signaling,
153 offer,
154 rpc_id,
155 ..
156 } = state
157 else {
158 bug_condition!(
159 "Invalid state for `P2pConnectionIncomingAction::AnswerReady`: {:?}",
160 state
161 );
162 return Ok(());
163 };
164 let signaling = *signaling;
165 *state = Self::AnswerReady {
166 time: meta.time(),
167 signaling,
168 offer: offer.clone(),
169 answer: answer.clone(),
170 rpc_id: rpc_id.take(),
171 };
172
173 let (dispatcher, state) = state_context.into_dispatcher_and_state();
174 let p2p_state: &P2pState = state.substate()?;
175
176 match signaling {
177 IncomingSignalingMethod::Http => {
178 }
180 IncomingSignalingMethod::P2p { relay_peer_id } => {
181 dispatcher.push(P2pChannelsSignalingExchangeAction::AnswerSend {
182 peer_id: relay_peer_id,
183 answer: P2pConnectionResponse::Accepted(answer.clone()),
184 });
185 }
186 }
187
188 if let Some(rpc_id) = p2p_state.peer_connection_rpc_id(&peer_id) {
189 if let Some(callback) =
190 &p2p_state.callbacks.on_p2p_connection_incoming_answer_ready
191 {
192 dispatcher.push_callback(
193 callback.clone(),
194 (rpc_id, peer_id, P2pConnectionResponse::Accepted(answer)),
195 );
196 }
197 }
198
199 Ok(())
200 }
201 P2pConnectionIncomingAction::AnswerSendSuccess { .. } => {
202 let state = p2p_state
203 .incoming_peer_connection_mut(&peer_id)
204 .ok_or_else(|| format!("Invalid state for: {:?}", action))?;
205 if let Self::AnswerReady {
206 signaling,
207 offer,
208 answer,
209 rpc_id,
210 ..
211 } = state
212 {
213 *state = Self::AnswerSendSuccess {
214 time: meta.time(),
215 signaling: *signaling,
216 offer: offer.clone(),
217 answer: answer.clone(),
218 rpc_id: rpc_id.take(),
219 };
220 } else {
221 bug_condition!(
222 "Invalid state for `P2pConnectionIncomingAction::AnswerSendSuccess`: {:?}",
223 state
224 );
225 return Ok(());
226 }
227
228 let dispatcher = state_context.into_dispatcher();
229 dispatcher.push(P2pConnectionIncomingAction::FinalizePending { peer_id });
230 Ok(())
231 }
232 P2pConnectionIncomingAction::FinalizePending { .. } => {
233 let state = p2p_state
234 .incoming_peer_connection_mut(&peer_id)
235 .ok_or_else(|| format!("Invalid state for: {:?}", action))?;
236 if let Self::AnswerSendSuccess {
237 signaling,
238 offer,
239 answer,
240 rpc_id,
241 ..
242 } = state
243 {
244 let auth = offer.conn_auth(answer);
245 let other_pub_key = offer.identity_pub_key.clone();
246
247 *state = Self::FinalizePending {
248 time: meta.time(),
249 signaling: *signaling,
250 offer: offer.clone(),
251 answer: answer.clone(),
252 rpc_id: rpc_id.take(),
253 };
254
255 let dispatcher = state_context.into_dispatcher();
256 dispatcher.push(P2pConnectionIncomingEffectfulAction::ConnectionAuthorizationEncryptAndSend { peer_id, other_pub_key, auth });
257 } else {
258 bug_condition!(
259 "Invalid state for `P2pConnectionIncomingAction::FinalizePending`: {:?}",
260 state
261 );
262 }
263
264 Ok(())
265 }
266 P2pConnectionIncomingAction::FinalizeError { error, .. } => {
267 let dispatcher = state_context.into_dispatcher();
268 dispatcher.push(P2pConnectionIncomingAction::Error {
269 peer_id,
270 error: P2pConnectionIncomingError::FinalizeError(error.to_owned()),
271 });
272 Ok(())
273 }
274 P2pConnectionIncomingAction::FinalizeSuccess { remote_auth, .. } => {
275 let state = p2p_state
276 .incoming_peer_connection_mut(&peer_id)
277 .ok_or_else(|| {
278 "Invalid state for: P2pConnectionIncomingAction::FinalizeSuccess".to_owned()
279 })?;
280 let (expected_auth, other_pub_key) = if let Self::FinalizePending {
281 signaling,
282 offer,
283 answer,
284 rpc_id,
285 ..
286 } = state
287 {
288 let expected_auth = offer.conn_auth(answer);
289 let other_pub_key = offer.identity_pub_key.clone();
290 *state = Self::FinalizeSuccess {
291 time: meta.time(),
292 signaling: *signaling,
293 offer: offer.clone(),
294 answer: answer.clone(),
295 rpc_id: rpc_id.take(),
296 };
297 (expected_auth, other_pub_key)
298 } else {
299 bug_condition!(
300 "Invalid state for `P2pConnectionIncomingAction::FinalizeSuccess`: {:?}",
301 state
302 );
303 return Ok(());
304 };
305
306 let dispatcher = state_context.into_dispatcher();
307
308 dispatcher.push(
309 P2pConnectionIncomingEffectfulAction::ConnectionAuthorizationDecryptAndCheck {
310 peer_id,
311 other_pub_key,
312 expected_auth,
313 auth: remote_auth,
314 },
315 );
316 Ok(())
317 }
318 P2pConnectionIncomingAction::Timeout { .. } => {
319 let (dispatcher, _state) = state_context.into_dispatcher_and_state();
320
321 #[cfg(feature = "p2p-libp2p")]
322 {
323 let p2p_state: &P2pState = _state.substate()?;
324 if let Some((addr, _)) = p2p_state
325 .network
326 .scheduler
327 .connections
328 .iter()
329 .find(|(_, state)| state.peer_id().is_some_and(|id| *id == peer_id))
330 {
331 dispatcher.push(P2pNetworkSchedulerAction::Disconnect {
332 addr: *addr,
333 reason: P2pDisconnectionReason::Timeout,
334 });
335 }
336 }
337
338 dispatcher.push(P2pConnectionIncomingAction::Error {
339 peer_id,
340 error: P2pConnectionIncomingError::Timeout,
341 });
342
343 Ok(())
344 }
345 P2pConnectionIncomingAction::Error { error, .. } => {
346 let state = p2p_state
347 .incoming_peer_connection_mut(&peer_id)
348 .ok_or("Missing state for `P2pConnectionIncomingAction::Error`")?;
349
350 let rpc_id = state.rpc_id();
351 let str_error = format!("{:?}", error);
352 *state = Self::Error {
353 time: meta.time(),
354 error,
355 rpc_id,
356 };
357
358 let (dispatcher, state) = state_context.into_dispatcher_and_state();
359 let p2p_state: &P2pState = state.substate()?;
360
361 if let Some(rpc_id) = p2p_state.peer_connection_rpc_id(&peer_id) {
362 if let Some(callback) = &p2p_state.callbacks.on_p2p_connection_incoming_error {
363 dispatcher.push_callback(callback.clone(), (rpc_id, str_error));
364 }
365 }
366 dispatcher.push(P2pDisconnectionAction::FailedCleanup { peer_id });
367
368 Ok(())
369 }
370 P2pConnectionIncomingAction::Success { .. } => {
371 let state = p2p_state
372 .incoming_peer_connection_mut(&peer_id)
373 .ok_or_else(|| format!("Invalid state for: {:?}", action))?;
374
375 if let Self::FinalizeSuccess {
376 signaling,
377 offer,
378 answer,
379 rpc_id,
380 ..
381 } = state
382 {
383 *state = Self::Success {
384 time: meta.time(),
385 signaling: *signaling,
386 offer: offer.clone(),
387 answer: answer.clone(),
388 rpc_id: rpc_id.take(),
389 };
390 } else {
391 bug_condition!(
392 "Invalid state for `P2pConnectionIncomingAction::Success`: {:?}",
393 state
394 );
395 return Ok(());
396 }
397
398 let (dispatcher, state) = state_context.into_dispatcher_and_state();
399 let p2p_state: &P2pState = state.substate()?;
400
401 dispatcher.push(P2pPeerAction::Ready {
402 peer_id,
403 incoming: true,
404 });
405
406 if let Some(rpc_id) = p2p_state.peer_connection_rpc_id(&peer_id) {
407 if let Some(callback) = &p2p_state.callbacks.on_p2p_connection_incoming_success
408 {
409 dispatcher.push_callback(callback.clone(), rpc_id);
410 }
411 }
412 Ok(())
413 }
414 P2pConnectionIncomingAction::FinalizePendingLibp2p { addr, .. } => {
415 #[cfg(feature = "p2p-libp2p")]
416 {
417 let state = p2p_state
418 .peers
419 .entry(peer_id)
420 .or_insert_with(|| P2pPeerState {
421 is_libp2p: true,
422 dial_opts: Some(P2pConnectionOutgoingInitOpts::LibP2P(
423 P2pConnectionOutgoingInitLibp2pOpts {
424 peer_id,
425 host: Host::from(addr.ip()),
426 port: addr.port(),
427 },
428 )),
429 status: P2pPeerStatus::Disconnected { time: meta.time() },
430 identify: None,
431 });
432
433 Self::reduce_finalize_libp2p_pending(state, addr, time, my_id, peer_id);
434
435 let (dispatcher, state) = state_context.into_dispatcher_and_state();
436 let p2p_state: &P2pState = state.substate()?;
437 Self::dispatch_finalize_libp2p_pending(
438 dispatcher, p2p_state, my_id, peer_id, time, addr,
439 );
440 }
441
442 Ok(())
443 }
444 P2pConnectionIncomingAction::Libp2pReceived { .. } => {
445 #[cfg(feature = "p2p-libp2p")]
446 {
447 let state = p2p_state
448 .incoming_peer_connection_mut(&peer_id)
449 .ok_or_else(|| format!("Invalid state for: {:?}", action))?;
450
451 if let Self::FinalizePendingLibp2p { time, .. } = state {
452 *state = Self::Libp2pReceived { time: *time };
453 } else {
454 bug_condition!(
455 "Invalid state for `P2pConnectionIncomingAction::Libp2pReceived`: {:?}",
456 state
457 );
458 return Ok(());
459 }
460
461 let dispatcher = state_context.into_dispatcher();
462 dispatcher.push(P2pPeerAction::Ready {
463 peer_id,
464 incoming: true,
465 });
466 }
467 Ok(())
468 }
469 }
470 }
471
472 #[cfg(feature = "p2p-libp2p")]
473 fn dispatch_finalize_libp2p_pending<Action, State>(
474 dispatcher: &mut Dispatcher<Action, State>,
475 p2p_state: &P2pState,
476 my_id: PeerId,
477 peer_id: PeerId,
478 time: Timestamp,
479 addr: SocketAddr,
480 ) where
481 State: crate::P2pStateTrait,
482 Action: crate::P2pActionTrait<State>,
483 {
484 let Some(peer_state) = p2p_state.peers.get(&peer_id) else {
485 bug_condition!("Peer State not found for {}", peer_id);
486 return;
487 };
488
489 if let Some(P2pConnectionIncomingState::FinalizePendingLibp2p {
490 close_duplicates, ..
491 }) = peer_state
492 .status
493 .as_connecting()
494 .and_then(|connecting| connecting.as_incoming())
495 {
496 if let Err(reason) = p2p_state.libp2p_incoming_accept(peer_id) {
497 warn!(time; node_id = display(my_id), summary = "rejecting incoming connection", peer_id = display(peer_id), reason = display(&reason));
498 dispatcher.push(P2pDisconnectionAction::Init {
499 peer_id,
500 reason: P2pDisconnectionReason::Libp2pIncomingRejected(reason),
501 });
502 } else {
503 debug!(time; "accepting incoming connection from {peer_id}");
504 if !close_duplicates.is_empty() {
505 let duplicates = p2p_state
506 .network
507 .scheduler
508 .connections
509 .keys()
510 .filter(
511 |ConnectionAddr {
512 sock_addr,
513 incoming,
514 }| {
515 *incoming
516 && sock_addr != &addr
517 && close_duplicates.contains(sock_addr)
518 },
519 )
520 .cloned()
521 .collect::<Vec<_>>();
522
523 for addr in duplicates {
524 warn!(time; node_id = display(my_id), summary = "closing duplicate connection", addr = display(addr));
525 dispatcher.push(P2pNetworkSchedulerAction::Disconnect {
526 addr,
527 reason: P2pDisconnectionReason::Libp2pIncomingRejected(
528 RejectionReason::AlreadyConnected,
529 ),
530 });
531 }
532 }
533 }
534 } else {
535 warn!(time; node_id = display(my_id), summary = "rejecting incoming connection as duplicate", peer_id = display(peer_id));
536 dispatcher.push(P2pNetworkSchedulerAction::Disconnect {
537 addr: ConnectionAddr {
538 sock_addr: addr,
539 incoming: true,
540 },
541 reason: P2pDisconnectionReason::Libp2pIncomingRejected(
542 RejectionReason::AlreadyConnected,
543 ),
544 });
545 }
546 }
547
548 #[cfg(feature = "p2p-libp2p")]
549 fn reduce_finalize_libp2p_pending(
550 state: &mut P2pPeerState,
551 addr: SocketAddr,
552 time: Timestamp,
553 my_id: PeerId,
554 peer_id: PeerId,
555 ) {
556 let incoming_state = match &state.status {
557 P2pPeerStatus::Disconnected { .. }
560 | P2pPeerStatus::Connecting(P2pConnectionState::Incoming(
561 P2pConnectionIncomingState::Error {
562 error: P2pConnectionIncomingError::Timeout,
563 ..
564 },
565 )) => Some(P2pConnectionIncomingState::FinalizePendingLibp2p {
566 addr,
567 close_duplicates: Vec::new(),
568 time,
569 }),
570 P2pPeerStatus::Connecting(P2pConnectionState::Outgoing(_)) if my_id < peer_id => {
571 None
573 }
574 P2pPeerStatus::Connecting(P2pConnectionState::Outgoing(_)) => {
575 let mut close_duplicates = Vec::new();
576 if let Some(identify) = state.identify.as_ref() {
577 close_duplicates.extend(identify.listen_addrs.iter().filter_map(|maddr| {
578 let mut iter = maddr.iter();
579 let ip: IpAddr = match iter.next()? {
580 multiaddr::Protocol::Ip4(ip4) => ip4.into(),
581 multiaddr::Protocol::Ip6(ip6) => ip6.into(),
582 _ => return None,
583 };
584 let port = match iter.next()? {
585 multiaddr::Protocol::Tcp(port) => port,
586 _ => return None,
587 };
588 Some(SocketAddr::from((ip, port)))
589 }))
590 }
591 if let Some(P2pConnectionOutgoingInitOpts::LibP2P(libp2p)) =
592 state.dial_opts.as_ref()
593 {
594 match libp2p.try_into() {
595 Ok(addr) if !close_duplicates.contains(&addr) => {
596 close_duplicates.push(addr)
597 }
598 _ => {}
599 }
600 };
601 Some(P2pConnectionIncomingState::FinalizePendingLibp2p {
602 addr,
603 close_duplicates,
604 time,
605 })
606 }
607 _ => None,
608 };
609 if let Some(incoming_state) = incoming_state {
610 state.status = P2pPeerStatus::Connecting(P2pConnectionState::Incoming(incoming_state));
611 }
612 }
613}