p2p/connection/incoming/
p2p_connection_incoming_state.rs

1use std::net::SocketAddr;
2
3use malloc_size_of_derive::MallocSizeOf;
4use redux::Timestamp;
5use serde::{Deserialize, Serialize};
6
7use openmina_core::requests::RpcId;
8
9use crate::{webrtc, P2pTimeouts};
10
11use super::IncomingSignalingMethod;
12
13#[derive(Serialize, Deserialize, Debug, Clone, MallocSizeOf)]
14pub enum P2pConnectionIncomingState {
15    Init {
16        #[ignore_malloc_size_of = "doesn't allocate"]
17        time: redux::Timestamp,
18        signaling: IncomingSignalingMethod,
19        offer: Box<webrtc::Offer>,
20        rpc_id: Option<RpcId>,
21    },
22    AnswerSdpCreatePending {
23        #[ignore_malloc_size_of = "doesn't allocate"]
24        time: redux::Timestamp,
25        signaling: IncomingSignalingMethod,
26        offer: Box<webrtc::Offer>,
27        rpc_id: Option<RpcId>,
28    },
29    AnswerSdpCreateSuccess {
30        #[ignore_malloc_size_of = "doesn't allocate"]
31        time: redux::Timestamp,
32        signaling: IncomingSignalingMethod,
33        offer: Box<webrtc::Offer>,
34        sdp: String,
35        rpc_id: Option<RpcId>,
36    },
37    AnswerReady {
38        #[ignore_malloc_size_of = "doesn't allocate"]
39        time: redux::Timestamp,
40        signaling: IncomingSignalingMethod,
41        offer: Box<webrtc::Offer>,
42        answer: Box<webrtc::Answer>,
43        rpc_id: Option<RpcId>,
44    },
45    AnswerSendSuccess {
46        #[ignore_malloc_size_of = "doesn't allocate"]
47        time: redux::Timestamp,
48        signaling: IncomingSignalingMethod,
49        offer: Box<webrtc::Offer>,
50        answer: Box<webrtc::Answer>,
51        rpc_id: Option<RpcId>,
52    },
53    FinalizePending {
54        #[ignore_malloc_size_of = "doesn't allocate"]
55        time: redux::Timestamp,
56        signaling: IncomingSignalingMethod,
57        offer: Box<webrtc::Offer>,
58        answer: Box<webrtc::Answer>,
59        rpc_id: Option<RpcId>,
60    },
61    FinalizeSuccess {
62        #[ignore_malloc_size_of = "doesn't allocate"]
63        time: redux::Timestamp,
64        signaling: IncomingSignalingMethod,
65        offer: Box<webrtc::Offer>,
66        answer: Box<webrtc::Answer>,
67        rpc_id: Option<RpcId>,
68    },
69    Error {
70        #[ignore_malloc_size_of = "doesn't allocate"]
71        time: redux::Timestamp,
72        error: P2pConnectionIncomingError,
73        rpc_id: Option<RpcId>,
74    },
75    Success {
76        #[ignore_malloc_size_of = "doesn't allocate"]
77        time: redux::Timestamp,
78        signaling: IncomingSignalingMethod,
79        offer: Box<webrtc::Offer>,
80        answer: Box<webrtc::Answer>,
81        rpc_id: Option<RpcId>,
82    },
83    FinalizePendingLibp2p {
84        #[ignore_malloc_size_of = "doesn't allocate"]
85        addr: SocketAddr,
86        #[with_malloc_size_of_func = "measurement::socket_vec"]
87        close_duplicates: Vec<SocketAddr>,
88        #[ignore_malloc_size_of = "doesn't allocate"]
89        time: redux::Timestamp,
90    },
91    Libp2pReceived {
92        #[ignore_malloc_size_of = "doesn't allocate"]
93        time: redux::Timestamp,
94    },
95}
96
97impl P2pConnectionIncomingState {
98    pub fn time(&self) -> Timestamp {
99        match self {
100            Self::Init { time, .. } => *time,
101            Self::AnswerSdpCreatePending { time, .. } => *time,
102            Self::AnswerSdpCreateSuccess { time, .. } => *time,
103            Self::AnswerReady { time, .. } => *time,
104            Self::AnswerSendSuccess { time, .. } => *time,
105            Self::FinalizePending { time, .. } => *time,
106            Self::FinalizeSuccess { time, .. } => *time,
107            Self::Error { time, .. } => *time,
108            Self::Success { time, .. } => *time,
109            Self::FinalizePendingLibp2p { time, .. } => *time,
110            Self::Libp2pReceived { time } => *time,
111        }
112    }
113
114    pub fn rpc_id(&self) -> Option<RpcId> {
115        match self {
116            Self::Init { rpc_id, .. } => *rpc_id,
117            Self::AnswerSdpCreatePending { rpc_id, .. } => *rpc_id,
118            Self::AnswerSdpCreateSuccess { rpc_id, .. } => *rpc_id,
119            Self::AnswerReady { rpc_id, .. } => *rpc_id,
120            Self::AnswerSendSuccess { rpc_id, .. } => *rpc_id,
121            Self::FinalizePending { rpc_id, .. } => *rpc_id,
122            Self::FinalizeSuccess { rpc_id, .. } => *rpc_id,
123            Self::Error { rpc_id, .. } => *rpc_id,
124            Self::Success { rpc_id, .. } => *rpc_id,
125            Self::FinalizePendingLibp2p { .. } | Self::Libp2pReceived { .. } => None,
126        }
127    }
128
129    pub fn is_timed_out(&self, now: Timestamp, timeouts: &P2pTimeouts) -> bool {
130        !matches!(self, Self::Error { .. })
131            && now
132                .checked_sub(self.time())
133                .and_then(|dur| timeouts.incoming_connection_timeout.map(|to| dur >= to))
134                .unwrap_or(false)
135    }
136}
137
138#[derive(Serialize, Deserialize, Debug, Clone, thiserror::Error, MallocSizeOf)]
139pub enum P2pConnectionIncomingError {
140    #[error("error creating SDP: {0}")]
141    SdpCreateError(String),
142    #[error("finalization error: {0}")]
143    FinalizeError(String),
144    #[error("connection authentication failed")]
145    ConnectionAuthError,
146    #[error("timeout error")]
147    Timeout,
148}
149
150mod measurement {
151    use std::{mem, net::SocketAddr};
152
153    use malloc_size_of::MallocSizeOfOps;
154
155    pub fn socket_vec(val: &Vec<SocketAddr>, _ops: &mut MallocSizeOfOps) -> usize {
156        val.capacity() * mem::size_of::<SocketAddr>()
157    }
158}