p2p/connection/outgoing/
mod.rs

1mod p2p_connection_outgoing_state;
2pub use p2p_connection_outgoing_state::*;
3
4mod p2p_connection_outgoing_actions;
5pub use p2p_connection_outgoing_actions::*;
6
7mod p2p_connection_outgoing_reducer;
8
9#[cfg(feature = "p2p-libp2p")]
10use std::net::SocketAddr;
11use std::{fmt, net::IpAddr, str::FromStr};
12
13use binprot_derive::{BinProtRead, BinProtWrite};
14use multiaddr::{Multiaddr, Protocol};
15use serde::{Deserialize, Serialize};
16use thiserror::Error;
17
18#[cfg(feature = "p2p-libp2p")]
19use mina_p2p_messages::v2;
20
21use crate::{
22    webrtc::{self, Host},
23    PeerId,
24};
25
26#[cfg(feature = "p2p-libp2p")]
27use crate::webrtc::{HttpSignalingInfo, SignalingMethod};
28
29// TODO(binier): maybe move to `crate::webrtc` module
30#[derive(
31    BinProtWrite, BinProtRead, derive_more::From, Debug, Ord, PartialOrd, Eq, PartialEq, Clone,
32)]
33pub enum P2pConnectionOutgoingInitOpts {
34    WebRTC {
35        peer_id: PeerId,
36        signaling: webrtc::SignalingMethod,
37    },
38    LibP2P(P2pConnectionOutgoingInitLibp2pOpts),
39}
40
41impl P2pConnectionOutgoingInitOpts {
42    pub fn with_host_resolved(self) -> Option<Self> {
43        if let Self::LibP2P(libp2p_opts) = self {
44            Some(Self::LibP2P(libp2p_opts.with_host_resolved()?))
45        } else {
46            Some(self)
47        }
48    }
49}
50
51#[derive(BinProtWrite, BinProtRead, Eq, PartialEq, Ord, PartialOrd, Debug, Clone)]
52pub struct P2pConnectionOutgoingInitLibp2pOpts {
53    pub peer_id: PeerId,
54    pub host: Host,
55    pub port: u16,
56}
57
58impl P2pConnectionOutgoingInitLibp2pOpts {
59    /// If the current host is local and there is a better host among the `addrs`,
60    /// replace the current one with the better one.
61    pub fn update_host_if_needed<'a>(&mut self, mut addrs: impl Iterator<Item = &'a Multiaddr>) {
62        fn is_local(ip: impl Into<IpAddr>) -> bool {
63            match ip.into() {
64                IpAddr::V4(ip) => ip.is_loopback() || ip.is_private(),
65                IpAddr::V6(ip) => ip.is_loopback(),
66            }
67        }
68
69        // if current dial opts is not good enough
70        let update = match &self.host {
71            Host::Domain(_) => false,
72            Host::Ipv4(ip) => is_local(*ip),
73            Host::Ipv6(ip) => is_local(*ip),
74        };
75        if update {
76            // if new options is better
77            let new = addrs.find_map(|x| {
78                x.iter().find_map(|x| match x {
79                    Protocol::Dns4(hostname) | Protocol::Dns6(hostname) => {
80                        Some(Host::Domain(hostname.into_owned()))
81                    }
82                    Protocol::Ip4(ip) if !is_local(ip) => Some(Host::Ipv4(ip)),
83                    Protocol::Ip6(ip) if !is_local(ip) => Some(Host::Ipv6(ip)),
84                    _ => None,
85                })
86            });
87            if let Some(new) = new {
88                self.host = new;
89            }
90        }
91    }
92
93    pub fn with_host_resolved(mut self) -> Option<Self> {
94        self.host = self.host.resolve()?;
95        Some(self)
96    }
97}
98
99pub(crate) mod libp2p_opts {
100    use std::net::{IpAddr, SocketAddr};
101
102    use multiaddr::Multiaddr;
103
104    use crate::{webrtc::Host, PeerId};
105
106    impl super::P2pConnectionOutgoingInitLibp2pOpts {
107        fn to_peer_id_multiaddr(&self) -> (PeerId, Multiaddr) {
108            (
109                self.peer_id,
110                Multiaddr::from_iter([(&self.host).into(), multiaddr::Protocol::Tcp(self.port)]),
111            )
112        }
113        fn into_peer_id_multiaddr(self) -> (PeerId, Multiaddr) {
114            (
115                self.peer_id,
116                Multiaddr::from_iter([(&self.host).into(), multiaddr::Protocol::Tcp(self.port)]),
117            )
118        }
119
120        pub fn matches_socket_addr(&self, addr: SocketAddr) -> bool {
121            self.port == addr.port() && self.matches_socket_ip(addr)
122        }
123
124        pub fn matches_socket_ip(&self, addr: SocketAddr) -> bool {
125            match (&self.host, addr) {
126                (Host::Ipv4(ip), SocketAddr::V4(addr)) => ip == addr.ip(),
127                (Host::Ipv6(ip), SocketAddr::V6(addr)) => ip == addr.ip(),
128                _ => false,
129            }
130        }
131    }
132
133    impl From<&super::P2pConnectionOutgoingInitLibp2pOpts> for (PeerId, Multiaddr) {
134        fn from(value: &super::P2pConnectionOutgoingInitLibp2pOpts) -> Self {
135            value.to_peer_id_multiaddr()
136        }
137    }
138
139    impl From<super::P2pConnectionOutgoingInitLibp2pOpts> for (PeerId, Multiaddr) {
140        fn from(value: super::P2pConnectionOutgoingInitLibp2pOpts) -> Self {
141            value.into_peer_id_multiaddr()
142        }
143    }
144
145    impl From<(PeerId, SocketAddr)> for super::P2pConnectionOutgoingInitLibp2pOpts {
146        fn from((peer_id, addr): (PeerId, SocketAddr)) -> Self {
147            let (host, port) = match addr {
148                SocketAddr::V4(v4) => (Host::Ipv4(*v4.ip()), v4.port()),
149                SocketAddr::V6(v6) => (Host::Ipv6(*v6.ip()), v6.port()),
150            };
151            super::P2pConnectionOutgoingInitLibp2pOpts {
152                peer_id,
153                host,
154                port,
155            }
156        }
157    }
158
159    #[derive(Debug, thiserror::Error)]
160    pub enum P2pConnectionOutgoingInitLibp2pOptsTryToSocketAddrError {
161        #[error("name unresolved: {0}")]
162        Unresolved(String),
163    }
164
165    impl TryFrom<&super::P2pConnectionOutgoingInitLibp2pOpts> for SocketAddr {
166        type Error = P2pConnectionOutgoingInitLibp2pOptsTryToSocketAddrError;
167
168        fn try_from(
169            value: &super::P2pConnectionOutgoingInitLibp2pOpts,
170        ) -> Result<Self, Self::Error> {
171            match &value.host {
172                Host::Domain(name) => Err(
173                    P2pConnectionOutgoingInitLibp2pOptsTryToSocketAddrError::Unresolved(
174                        name.clone(),
175                    ),
176                ),
177                Host::Ipv4(ip) => Ok(SocketAddr::new(IpAddr::V4(*ip), value.port)),
178                Host::Ipv6(ip) => Ok(SocketAddr::new(IpAddr::V6(*ip), value.port)),
179            }
180        }
181    }
182}
183
184impl P2pConnectionOutgoingInitOpts {
185    pub fn is_libp2p(&self) -> bool {
186        matches!(self, Self::LibP2P(_))
187    }
188
189    pub fn peer_id(&self) -> &PeerId {
190        match self {
191            Self::WebRTC { peer_id, .. } => peer_id,
192            Self::LibP2P(v) => &v.peer_id,
193        }
194    }
195
196    pub fn kind(&self) -> &'static str {
197        match self {
198            Self::WebRTC { .. } => "webrtc",
199
200            Self::LibP2P(_) => "libp2p",
201        }
202    }
203
204    pub fn can_connect_directly(&self) -> bool {
205        match self {
206            Self::LibP2P(..) => true,
207            Self::WebRTC { signaling, .. } => signaling.can_connect_directly(),
208        }
209    }
210
211    pub fn webrtc_p2p_relay_peer_id(&self) -> Option<PeerId> {
212        match self {
213            Self::WebRTC { signaling, .. } => signaling.p2p_relay_peer_id(),
214            _ => None,
215        }
216    }
217
218    /// The OCaml implementation of Mina uses the `get_some_initial_peers` RPC to exchange peer information.
219    /// Try to convert this RPC response into our peer address representation.
220    /// Recognize a hack for marking the webrtc signaling server.
221    /// Prefixes "http://" or "https://" are schemas that indicates the host is webrtc signaling.
222    #[cfg(feature = "p2p-libp2p")]
223    pub fn try_from_mina_rpc(msg: v2::NetworkPeerPeerStableV1) -> Option<Self> {
224        let peer_id_str = String::try_from(&msg.peer_id.0).ok()?;
225        let peer_id = peer_id_str.parse::<libp2p_identity::PeerId>().ok()?;
226        if peer_id.as_ref().code() == 0x12 {
227            // the peer_id is not supported
228            return None;
229        }
230
231        let host = String::try_from(&msg.host).ok()?;
232
233        let opts = if host.contains(':') {
234            let mut it = host.split(':');
235            let schema = it.next()?;
236            let host = it.next()?.trim_start_matches('/');
237            let signaling = match schema {
238                "http" => SignalingMethod::Http(HttpSignalingInfo {
239                    host: host.parse().ok()?,
240                    port: msg.libp2p_port.as_u64() as u16,
241                }),
242                "https" => SignalingMethod::Https(HttpSignalingInfo {
243                    host: host.parse().ok()?,
244                    port: msg.libp2p_port.as_u64() as u16,
245                }),
246                _ => return None,
247            };
248            Self::WebRTC {
249                peer_id: peer_id.try_into().ok()?,
250                signaling,
251            }
252        } else {
253            let opts = P2pConnectionOutgoingInitLibp2pOpts {
254                peer_id: peer_id.try_into().ok()?,
255                host: host.parse().ok()?,
256                port: msg.libp2p_port.as_u64() as u16,
257            };
258            Self::LibP2P(opts)
259        };
260        Some(opts)
261    }
262
263    /// Try to convert our peer address representation into mina RPC response.
264    /// Use a hack to mark the webrtc signaling server. Add "http://" or "https://" schema to the host address.
265    /// The OCaml node will recognize this address as incorrect and ignore it.
266    #[cfg(feature = "p2p-libp2p")]
267    pub fn try_into_mina_rpc(&self) -> Option<v2::NetworkPeerPeerStableV1> {
268        match self {
269            P2pConnectionOutgoingInitOpts::LibP2P(opts) => Some(v2::NetworkPeerPeerStableV1 {
270                host: opts.host.to_string().as_bytes().into(),
271                libp2p_port: (opts.port as u64).into(),
272                peer_id: v2::NetworkPeerPeerIdStableV1(
273                    libp2p_identity::PeerId::try_from(opts.peer_id)
274                        .ok()?
275                        .to_string()
276                        .into_bytes()
277                        .into(),
278                ),
279            }),
280            P2pConnectionOutgoingInitOpts::WebRTC { peer_id, signaling } => match signaling {
281                SignalingMethod::Http(info) => Some(v2::NetworkPeerPeerStableV1 {
282                    host: format!("http://{}", info.host).as_bytes().into(),
283                    libp2p_port: (info.port as u64).into(),
284                    peer_id: v2::NetworkPeerPeerIdStableV1(
285                        (*peer_id).to_string().into_bytes().into(),
286                    ),
287                }),
288                SignalingMethod::Https(info) => Some(v2::NetworkPeerPeerStableV1 {
289                    host: format!("https://{}", info.host).as_bytes().into(),
290                    libp2p_port: (info.port as u64).into(),
291                    peer_id: v2::NetworkPeerPeerIdStableV1(
292                        (*peer_id).to_string().into_bytes().into(),
293                    ),
294                }),
295                SignalingMethod::HttpsProxy(cluster_id, info) => {
296                    Some(v2::NetworkPeerPeerStableV1 {
297                        host: format!("https://{}/clusters/{cluster_id}", info.host)
298                            .as_bytes()
299                            .into(),
300                        libp2p_port: (info.port as u64).into(),
301                        peer_id: v2::NetworkPeerPeerIdStableV1(
302                            (*peer_id).to_string().into_bytes().into(),
303                        ),
304                    })
305                }
306                SignalingMethod::P2p { .. } => None,
307            },
308        }
309    }
310
311    #[cfg(feature = "p2p-libp2p")]
312    pub fn from_libp2p_socket_addr(peer_id: PeerId, addr: SocketAddr) -> Self {
313        P2pConnectionOutgoingInitOpts::LibP2P((peer_id, addr).into())
314    }
315}
316
317impl P2pConnectionOutgoingInitLibp2pOpts {
318    pub fn to_maddr(&self) -> Option<multiaddr::Multiaddr> {
319        self.clone().try_into().ok()
320    }
321}
322
323impl fmt::Display for P2pConnectionOutgoingInitOpts {
324    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
325        match self {
326            Self::WebRTC { peer_id, signaling } => {
327                write!(f, "/{}{}", peer_id, signaling)
328            }
329
330            Self::LibP2P(v) => {
331                if let Some(maddr) = v.to_maddr() {
332                    write!(f, "{}", maddr)
333                } else {
334                    write!(f, "*INVALID MULTIADDRESS*")
335                }
336            }
337        }
338    }
339}
340
341#[derive(Error, Serialize, Deserialize, Debug, Clone)]
342pub enum P2pConnectionOutgoingInitOptsParseError {
343    #[error("not enough args for the signaling method")]
344    NotEnoughArgs,
345    #[error("peer id parse error: {0}")]
346    PeerIdParseError(String),
347    #[error("signaling method parse error: `{0}`")]
348    SignalingMethodParseError(webrtc::SignalingMethodParseError),
349    #[error("other error: {0}")]
350    Other(String),
351}
352
353impl FromStr for P2pConnectionOutgoingInitOpts {
354    type Err = P2pConnectionOutgoingInitOptsParseError;
355
356    fn from_str(s: &str) -> Result<Self, Self::Err> {
357        if s.is_empty() {
358            return Err(P2pConnectionOutgoingInitOptsParseError::NotEnoughArgs);
359        }
360
361        let is_libp2p_maddr = s.starts_with("/ip") || s.starts_with("/dns");
362
363        if is_libp2p_maddr {
364            let maddr = multiaddr::Multiaddr::from_str(s)
365                .map_err(|e| P2pConnectionOutgoingInitOptsParseError::Other(e.to_string()))?;
366
367            let opts = (&maddr).try_into()?;
368
369            return Ok(Self::LibP2P(opts));
370        }
371        #[cfg(target_arch = "wasm32")]
372        if is_libp2p_maddr {
373            return Err(P2pConnectionOutgoingInitOptsParseError::Other(
374                "libp2p not supported in wasm".to_owned(),
375            ));
376        }
377
378        let id_end_index = s[1..]
379            .find('/')
380            .map(|i| i + 1)
381            .filter(|i| s.len() > *i)
382            .ok_or(P2pConnectionOutgoingInitOptsParseError::NotEnoughArgs)?;
383
384        Ok(Self::WebRTC {
385            peer_id: s[1..id_end_index].parse::<PeerId>().map_err(|err| {
386                P2pConnectionOutgoingInitOptsParseError::PeerIdParseError(err.to_string())
387            })?,
388            signaling: s[id_end_index..]
389                .parse::<webrtc::SignalingMethod>()
390                .map_err(|err| {
391                    P2pConnectionOutgoingInitOptsParseError::SignalingMethodParseError(err)
392                })?,
393        })
394    }
395}
396
397impl Serialize for P2pConnectionOutgoingInitOpts {
398    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
399    where
400        S: serde::Serializer,
401    {
402        serializer.serialize_str(&self.to_string())
403    }
404}
405
406impl<'de> Deserialize<'de> for P2pConnectionOutgoingInitOpts {
407    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
408    where
409        D: serde::Deserializer<'de>,
410    {
411        let s: String = Deserialize::deserialize(deserializer)?;
412        s.parse().map_err(serde::de::Error::custom)
413    }
414}
415
416impl TryFrom<P2pConnectionOutgoingInitLibp2pOpts> for multiaddr::Multiaddr {
417    type Error = libp2p_identity::DecodingError;
418
419    fn try_from(value: P2pConnectionOutgoingInitLibp2pOpts) -> Result<Self, Self::Error> {
420        use multiaddr::Protocol;
421
422        Ok(Self::empty()
423            .with(match &value.host {
424                // maybe should be just `Dns`?
425                Host::Domain(v) => Protocol::Dns4(v.into()),
426                Host::Ipv4(v) => Protocol::Ip4(*v),
427                Host::Ipv6(v) => Protocol::Ip6(*v),
428            })
429            .with(Protocol::Tcp(value.port))
430            .with(Protocol::P2p(libp2p_identity::PeerId::try_from(
431                value.peer_id,
432            )?)))
433    }
434}
435
436impl TryFrom<&multiaddr::Multiaddr> for P2pConnectionOutgoingInitOpts {
437    type Error = P2pConnectionOutgoingInitOptsParseError;
438
439    fn try_from(value: &multiaddr::Multiaddr) -> Result<Self, Self::Error> {
440        Ok(Self::LibP2P(value.try_into()?))
441    }
442}
443
444impl TryFrom<multiaddr::Multiaddr> for P2pConnectionOutgoingInitOpts {
445    type Error = P2pConnectionOutgoingInitOptsParseError;
446
447    fn try_from(value: multiaddr::Multiaddr) -> Result<Self, Self::Error> {
448        Ok(Self::LibP2P((&value).try_into()?))
449    }
450}
451
452impl TryFrom<&multiaddr::Multiaddr> for P2pConnectionOutgoingInitLibp2pOpts {
453    type Error = P2pConnectionOutgoingInitOptsParseError;
454
455    fn try_from(maddr: &multiaddr::Multiaddr) -> Result<Self, Self::Error> {
456        use multiaddr::Protocol;
457
458        let mut iter = maddr.iter();
459        Ok(P2pConnectionOutgoingInitLibp2pOpts {
460            host: match iter.next() {
461                Some(Protocol::Ip4(v)) => Host::Ipv4(v),
462                Some(Protocol::Dns(v) | Protocol::Dns4(v) | Protocol::Dns6(v)) => {
463                    Host::Domain(v.to_string())
464                }
465                Some(_) => {
466                    return Err(P2pConnectionOutgoingInitOptsParseError::Other(
467                        "unexpected part in multiaddr! expected host".to_string(),
468                    ));
469                }
470                None => {
471                    return Err(P2pConnectionOutgoingInitOptsParseError::Other(
472                        "missing host part from multiaddr".to_string(),
473                    ));
474                }
475            },
476            port: match iter.next() {
477                Some(Protocol::Tcp(port)) => port,
478                Some(_) => {
479                    return Err(P2pConnectionOutgoingInitOptsParseError::Other(
480                        "unexpected part in multiaddr! expected port".to_string(),
481                    ));
482                }
483                None => {
484                    return Err(P2pConnectionOutgoingInitOptsParseError::Other(
485                        "missing port part from multiaddr".to_string(),
486                    ));
487                }
488            },
489            peer_id: match iter.next() {
490                Some(Protocol::P2p(hash)) => libp2p_identity::PeerId::from_multihash(hash.into())
491                    .map_err(|_| {
492                        P2pConnectionOutgoingInitOptsParseError::Other(
493                            "invalid peer_id multihash".to_string(),
494                        )
495                    })?
496                    .try_into()
497                    .map_err(|_| {
498                        P2pConnectionOutgoingInitOptsParseError::Other(
499                            "unexpected error converting PeerId".to_string(),
500                        )
501                    })?,
502                Some(_) => {
503                    return Err(P2pConnectionOutgoingInitOptsParseError::Other(
504                        "unexpected part in multiaddr! expected peer_id".to_string(),
505                    ));
506                }
507                None => {
508                    return Err(P2pConnectionOutgoingInitOptsParseError::Other(
509                        "peer_id not set in multiaddr. Missing `../p2p/<peer_id>`".to_string(),
510                    ));
511                }
512            },
513        })
514    }
515}
516
517mod measurement {
518    use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
519
520    use super::P2pConnectionOutgoingInitOpts;
521
522    // `Host` may contain `String` which allocates
523    // but hostname usually small, compared to `String` container size 24 bytes
524    impl MallocSizeOf for P2pConnectionOutgoingInitOpts {
525        fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize {
526            0
527        }
528    }
529}