mina_p2p/service_impl/webrtc/
mod.rs

1#[cfg(target_arch = "wasm32")]
2mod web;
3#[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp"))]
4mod webrtc_cpp;
5#[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-rs"))]
6mod webrtc_rs;
7
8use std::{collections::BTreeMap, future::Future, pin::Pin, sync::Arc, time::Duration};
9
10use mina_core::bug_condition;
11use serde::Serialize;
12use tokio::sync::Semaphore;
13
14#[cfg(not(target_arch = "wasm32"))]
15use tokio::task::spawn_local;
16#[cfg(target_arch = "wasm32")]
17use wasm_bindgen_futures::spawn_local;
18
19use mina_core::channels::{mpsc, oneshot, Aborted, Aborter};
20
21use crate::{
22    channels::{ChannelId, ChannelMsg, MsgId},
23    connection::outgoing::P2pConnectionOutgoingInitOpts,
24    identity::{EncryptableType, PublicKey, SecretKey},
25    webrtc,
26    webrtc::{ConnectionAuth, ConnectionAuthEncrypted},
27    P2pChannelEvent, P2pConnectionEvent, P2pEvent, PeerId,
28};
29
30#[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-rs"))]
31mod imports {
32    pub use super::webrtc_rs::{
33        build_api, certificate_from_pem_key, webrtc_signal_send, Api, RTCCertificate, RTCChannel,
34        RTCConnection, RTCConnectionState, RTCSignalingError,
35    };
36}
37#[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp"))]
38mod imports {
39    pub use super::webrtc_cpp::{
40        build_api, certificate_from_pem_key, webrtc_signal_send, Api, RTCCertificate, RTCChannel,
41        RTCConnection, RTCConnectionState, RTCSignalingError,
42    };
43}
44#[cfg(target_arch = "wasm32")]
45mod imports {
46    pub use super::web::{
47        build_api, certificate_from_pem_key, webrtc_signal_send, Api, RTCCertificate, RTCChannel,
48        RTCConnection, RTCConnectionState, RTCSignalingError,
49    };
50}
51
52use imports::*;
53pub use imports::{webrtc_signal_send, RTCSignalingError};
54
55use super::TaskSpawner;
56
57/// 16KB.
58const CHUNK_SIZE: usize = 16 * 1024;
59
60pub enum Cmd {
61    PeerAdd { args: PeerAddArgs, aborted: Aborted },
62}
63
64#[derive(Debug)]
65pub enum PeerCmd {
66    PeerHttpOfferSend(String, webrtc::Offer),
67    AnswerSet(webrtc::Answer),
68    ConnectionAuthorizationSend(Option<ConnectionAuthEncrypted>),
69    ChannelOpen(ChannelId),
70    ChannelSend(MsgId, ChannelMsg),
71}
72
73enum PeerCmdInternal {
74    ChannelOpened(ChannelId, Result<RTCChannel, Error>),
75    ChannelClosed(ChannelId),
76}
77
78enum PeerCmdAll {
79    External(Box<PeerCmd>),
80    Internal(PeerCmdInternal),
81}
82
83pub struct P2pServiceCtx {
84    pub cmd_sender: mpsc::TrackedUnboundedSender<Cmd>,
85    pub peers: BTreeMap<PeerId, PeerState>,
86}
87
88pub struct PeerAddArgs {
89    peer_id: PeerId,
90    kind: PeerConnectionKind,
91    event_sender: Arc<dyn Fn(P2pEvent) -> Option<()> + Send + Sync + 'static>,
92    cmd_receiver: mpsc::TrackedUnboundedReceiver<PeerCmd>,
93}
94
95pub enum PeerConnectionKind {
96    Outgoing,
97    Incoming(Box<webrtc::Offer>),
98}
99
100pub struct PeerState {
101    pub cmd_sender: mpsc::TrackedUnboundedSender<PeerCmd>,
102    pub abort: Aborter,
103}
104
105#[derive(thiserror::Error, derive_more::From, Debug)]
106pub(super) enum Error {
107    #[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-rs"))]
108    #[error("{0}")]
109    Rtc(::webrtc::Error),
110    #[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp"))]
111    #[error("{0}")]
112    Rtc(::datachannel::Error),
113    #[cfg(target_arch = "wasm32")]
114    #[error("js error: {0:?}")]
115    RtcJs(String),
116    #[error("signaling error: {0}")]
117    Signaling(RTCSignalingError),
118    #[error("unexpected cmd received")]
119    UnexpectedCmd,
120    #[from(ignore)]
121    #[error("channel closed")]
122    ChannelClosed,
123}
124
125#[cfg(target_arch = "wasm32")]
126impl From<wasm_bindgen::JsValue> for Error {
127    fn from(value: wasm_bindgen::JsValue) -> Self {
128        Error::RtcJs(format!("{value:?}"))
129    }
130}
131
132pub type OnConnectionStateChangeHdlrFn = Box<
133    dyn (FnMut(RTCConnectionState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
134        + Send
135        + Sync,
136>;
137
138pub struct RTCConfig {
139    pub ice_servers: RTCConfigIceServers,
140    pub certificate: RTCCertificate,
141    pub seed: [u8; 32],
142}
143
144#[derive(Serialize)]
145pub struct RTCConfigIceServers(Vec<RTCConfigIceServer>);
146#[derive(Serialize)]
147pub struct RTCConfigIceServer {
148    pub urls: Vec<String>,
149    pub username: Option<String>,
150    pub credential: Option<String>,
151}
152
153#[derive(Serialize)]
154pub struct RTCChannelConfig {
155    pub label: &'static str,
156    pub negotiated: Option<u16>,
157}
158
159impl Default for RTCConfigIceServers {
160    fn default() -> Self {
161        Self(vec![
162            RTCConfigIceServer {
163                urls: vec!["stun:65.109.110.75:3478".to_owned()],
164                username: Some("mina".to_owned()),
165                credential: Some("webrtc".to_owned()),
166            },
167            RTCConfigIceServer {
168                urls: vec!["stun:176.9.147.28:3478".to_owned()],
169                username: None,
170                credential: None,
171            },
172            RTCConfigIceServer {
173                urls: vec![
174                    "stun:stun.l.google.com:19302".to_owned(),
175                    "stun:stun1.l.google.com:19302".to_owned(),
176                    "stun:stun2.l.google.com:19302".to_owned(),
177                    "stun:stun3.l.google.com:19302".to_owned(),
178                    "stun:stun4.l.google.com:19302".to_owned(),
179                ],
180                username: None,
181                credential: None,
182            },
183        ])
184    }
185}
186
187impl std::ops::Deref for RTCConfigIceServers {
188    type Target = Vec<RTCConfigIceServer>;
189
190    fn deref(&self) -> &Self::Target {
191        &self.0
192    }
193}
194
195#[cfg(not(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp")))]
196impl Drop for RTCConnection {
197    fn drop(&mut self) {
198        if self.is_main() {
199            let cloned = self.clone();
200            spawn_local(async move {
201                let _ = cloned.close().await;
202            });
203        }
204    }
205}
206
207async fn sleep(dur: Duration) {
208    #[cfg(not(target_arch = "wasm32"))]
209    let fut = tokio::time::sleep(dur);
210    #[cfg(target_arch = "wasm32")]
211    let fut = gloo_timers::future::TimeoutFuture::new(dur.as_millis() as u32);
212    fut.await
213}
214
215async fn wait_for_ice_gathering_complete(pc: &mut RTCConnection) {
216    let timeout = sleep(Duration::from_secs(3));
217
218    tokio::select! {
219        _ = timeout => {}
220        _ = pc.wait_for_ice_gathering_complete() => {}
221    }
222}
223
224async fn peer_start(
225    api: Api,
226    args: PeerAddArgs,
227    abort: Aborted,
228    closed: mpsc::Sender<()>,
229    certificate: RTCCertificate,
230    rng_seed: [u8; 32],
231) {
232    let PeerAddArgs {
233        peer_id,
234        kind,
235        event_sender,
236        mut cmd_receiver,
237    } = args;
238    let is_outgoing = matches!(kind, PeerConnectionKind::Outgoing);
239
240    let config = RTCConfig {
241        ice_servers: Default::default(),
242        certificate,
243        seed: rng_seed,
244    };
245    let fut = async {
246        let mut pc = RTCConnection::create(&api, config).await?;
247        let main_channel = pc
248            .channel_create(RTCChannelConfig {
249                label: "",
250                negotiated: Some(0),
251            })
252            .await?;
253
254        let offer = match kind {
255            PeerConnectionKind::Incoming(offer) => (*offer).try_into()?,
256            PeerConnectionKind::Outgoing => pc.offer_create().await?,
257        };
258
259        if is_outgoing {
260            pc.local_desc_set(offer).await?;
261            wait_for_ice_gathering_complete(&mut pc).await;
262        } else {
263            pc.remote_desc_set(offer).await?;
264        }
265
266        Result::<_, Error>::Ok((pc, main_channel))
267    };
268
269    #[allow(unused_mut)]
270    let (mut pc, mut main_channel) = match fut.await {
271        Ok(v) => v,
272        Err(err) => {
273            event_sender(P2pConnectionEvent::OfferSdpReady(peer_id, Err(err.to_string())).into());
274            return;
275        }
276    };
277
278    let (main_channel_open_tx, main_channel_open) = oneshot::channel::<()>();
279    let mut main_channel_open_tx = Some(main_channel_open_tx);
280    main_channel.on_open(move || {
281        if let Some(tx) = main_channel_open_tx.take() {
282            let _ = tx.send(());
283        }
284        std::future::ready(())
285    });
286
287    let answer = if is_outgoing {
288        let answer_fut = async {
289            let sdp = pc.local_sdp().await.unwrap();
290            event_sender(P2pConnectionEvent::OfferSdpReady(peer_id, Ok(sdp)).into())
291                .ok_or(Error::ChannelClosed)?;
292            match cmd_receiver.recv().await.ok_or(Error::ChannelClosed)?.0 {
293                PeerCmd::PeerHttpOfferSend(url, offer) => {
294                    let answer = webrtc_signal_send(&url, offer).await?;
295                    event_sender(P2pConnectionEvent::AnswerReceived(peer_id, answer).into())
296                        .ok_or(Error::ChannelClosed)?;
297
298                    if let PeerCmd::AnswerSet(v) =
299                        cmd_receiver.recv().await.ok_or(Error::ChannelClosed)?.0
300                    {
301                        return Ok(v);
302                    }
303                }
304                PeerCmd::AnswerSet(v) => return Ok(v),
305                _cmd => {
306                    return Err(Error::UnexpectedCmd);
307                }
308            }
309            Err(Error::ChannelClosed)
310        };
311        answer_fut.await.and_then(|v| Ok(v.try_into()?))
312    } else {
313        pc.answer_create().await.map_err(Error::from)
314    };
315    let Ok(answer) = answer else {
316        return;
317    };
318
319    if is_outgoing {
320        if let Err(err) = pc.remote_desc_set(answer).await {
321            let err = Error::from(err).to_string();
322            let _ = event_sender(P2pConnectionEvent::Finalized(peer_id, Err(err)).into());
323        }
324    } else {
325        let fut = async {
326            pc.local_desc_set(answer).await?;
327            wait_for_ice_gathering_complete(&mut pc).await;
328            Ok(pc.local_sdp().await.unwrap())
329        };
330        let res = fut.await.map_err(|err: Error| err.to_string());
331        let is_err = res.is_err();
332        let is_err = is_err
333            || event_sender(P2pConnectionEvent::AnswerSdpReady(peer_id, res).into()).is_none();
334        if is_err {
335            return;
336        }
337    }
338
339    let (connected_tx, connected) = oneshot::channel();
340    if matches!(pc.connection_state(), RTCConnectionState::Connected) {
341        connected_tx.send(Ok(())).unwrap();
342    } else {
343        let mut connected_tx = Some(connected_tx);
344        pc.on_connection_state_change(Box::new(move |state| {
345            match state {
346                RTCConnectionState::Connected => {
347                    if let Some(connected_tx) = connected_tx.take() {
348                        let _ = connected_tx.send(Ok(()));
349                    }
350                }
351                RTCConnectionState::Disconnected | RTCConnectionState::Closed => {
352                    if let Some(connected_tx) = connected_tx.take() {
353                        let _ = connected_tx.send(Err("disconnected"));
354                    } else {
355                        let _ = closed.try_send(());
356                    }
357                }
358                _ => {}
359            }
360            Box::pin(std::future::ready(()))
361        }));
362    }
363    match connected
364        .await
365        .map_err(|_| Error::ChannelClosed.to_string())
366        .and_then(|res| res.map_err(|v| v.to_string()))
367    {
368        Ok(_) => {}
369        Err(err) => {
370            let _ = event_sender(P2pConnectionEvent::Finalized(peer_id, Err(err)).into());
371            return;
372        }
373    }
374
375    // Exchange encrypted connection authorization messsages. Makes sure
376    // there is a link between peer identity and connection.
377    let (remote_auth_tx, remote_auth_rx) = oneshot::channel::<ConnectionAuthEncrypted>();
378    let mut remote_auth_tx = Some(remote_auth_tx);
379    main_channel.on_message(move |data| {
380        if let Some(tx) = remote_auth_tx.take() {
381            if let Ok(auth) = data.try_into() {
382                let _ = tx.send(auth);
383            }
384        }
385        #[cfg(not(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp")))]
386        std::future::ready(())
387    });
388    let msg = match cmd_receiver.recv().await {
389        None => return,
390        Some(msg) => msg,
391    };
392    match msg.0 {
393        PeerCmd::ConnectionAuthorizationSend(None) => {
394            // eprintln!("PeerCmd::ConnectionAuthorizationSend(None)");
395            return;
396        }
397        PeerCmd::ConnectionAuthorizationSend(Some(auth)) => {
398            let _ = main_channel_open.await;
399
400            // Add a delay for sending messages after channel
401            // was opened. Some initial messages get lost otherwise.
402            // TODO(binier): find deeper cause and fix it.
403            sleep(Duration::from_secs(1)).await;
404            let _ = main_channel
405                .send(&bytes::Bytes::copy_from_slice(auth.as_ref()))
406                .await;
407
408            let res = match remote_auth_rx.await {
409                Err(_) => Err("didn't receive connection authentication message".to_owned()),
410                Ok(remote_auth) => Ok(remote_auth),
411            };
412            let is_err = res.is_err();
413            let _ = event_sender(P2pConnectionEvent::Finalized(peer_id, res).into());
414            if is_err {
415                return;
416            }
417        }
418        cmd => {
419            bug_condition!("unexpected peer cmd! Expected `PeerCmd::ConnectionAuthorizationSend`. received: {cmd:?}");
420            return;
421        }
422    }
423
424    let _ = main_channel.close().await;
425
426    peer_loop(peer_id, event_sender, cmd_receiver, pc, abort).await
427}
428
429struct Channel {
430    id: ChannelId,
431    msg_sender: ChannelMsgSender,
432}
433
434type ChannelMsgSender = mpsc::UnboundedSender<(MsgId, Vec<u8>, Option<mpsc::Tracker>)>;
435
436struct MsgBuffer {
437    buf: Vec<u8>,
438}
439
440impl MsgBuffer {
441    fn new(capacity: usize) -> Self {
442        Self {
443            buf: Vec::with_capacity(capacity),
444        }
445    }
446
447    fn encode(&mut self, msg: &ChannelMsg) -> Result<Vec<u8>, std::io::Error> {
448        msg.encode(&mut self.buf)?;
449        let len_encoded = (self.buf.len() as u32).to_be_bytes();
450        let encoded = len_encoded
451            .into_iter()
452            .chain(self.buf.iter().cloned())
453            .collect();
454        self.buf.clear();
455        Ok(encoded)
456    }
457}
458
459struct Channels {
460    list: Vec<Channel>,
461}
462
463impl Channels {
464    fn new() -> Self {
465        Self {
466            list: Vec::with_capacity(32),
467        }
468    }
469
470    fn get_msg_sender(&self, id: ChannelId) -> Option<&ChannelMsgSender> {
471        self.list.iter().find(|c| c.id == id).map(|c| &c.msg_sender)
472    }
473
474    fn add(&mut self, id: ChannelId, msg_sender: ChannelMsgSender) {
475        self.list.push(Channel { id, msg_sender });
476    }
477
478    fn remove(&mut self, id: ChannelId) -> bool {
479        match self.list.iter().position(|c| c.id == id) {
480            None => false,
481            Some(index) => {
482                self.list.remove(index);
483                true
484            }
485        }
486    }
487}
488
489// TODO(binier): remove unwraps
490#[allow(unused_mut)]
491async fn peer_loop(
492    peer_id: PeerId,
493    event_sender: Arc<dyn Fn(P2pEvent) -> Option<()> + Send + Sync + 'static>,
494    mut cmd_receiver: mpsc::TrackedUnboundedReceiver<PeerCmd>,
495    mut pc: RTCConnection,
496    aborted: Aborted,
497) {
498    // TODO(binier): maybe use small_vec (stack allocated) or something like that.
499    let mut channels = Channels::new();
500    let mut msg_buf = MsgBuffer::new(64 * 1024);
501
502    let (internal_cmd_sender, mut internal_cmd_receiver) =
503        mpsc::unbounded_channel::<PeerCmdInternal>();
504
505    while matches!(pc.connection_state(), RTCConnectionState::Connected) {
506        let (cmd, _tracker) = tokio::select! {
507            cmd = cmd_receiver.recv() => match cmd {
508                None => return,
509                Some(cmd) => (PeerCmdAll::External(Box::new(cmd.0)), Some(cmd.1)),
510            },
511            cmd = internal_cmd_receiver.recv() => match cmd {
512                None => return,
513                Some(cmd) => (PeerCmdAll::Internal(cmd), None),
514            },
515        };
516        match cmd {
517            PeerCmdAll::External(cmd) => match *cmd {
518                PeerCmd::PeerHttpOfferSend(..)
519                | PeerCmd::AnswerSet(_)
520                | PeerCmd::ConnectionAuthorizationSend(_) => {
521                    bug_condition!("unexpected peer cmd");
522                }
523                PeerCmd::ChannelOpen(id) => {
524                    let chan = pc
525                        .channel_create(RTCChannelConfig {
526                            label: id.name(),
527                            negotiated: Some(id.to_u16()),
528                        })
529                        .await;
530                    let internal_cmd_sender = internal_cmd_sender.clone();
531                    let fut = async move {
532                        let internal_cmd_sender_clone = internal_cmd_sender.clone();
533                        let result = async move {
534                            let chan = chan?;
535
536                            let (done_tx, mut done_rx) = mpsc::channel::<Result<(), Error>>(1);
537
538                            let done_tx_clone = done_tx.clone();
539                            chan.on_open(move || {
540                                let _ = done_tx_clone.try_send(Ok(()));
541                                std::future::ready(())
542                            });
543
544                            let done_tx_clone = done_tx.clone();
545                            let internal_cmd_sender = internal_cmd_sender_clone.clone();
546                            chan.on_error(move |err| {
547                                if done_tx_clone.try_send(Err(err.into())).is_err() {
548                                    let _ = internal_cmd_sender
549                                        .send(PeerCmdInternal::ChannelClosed(id));
550                                }
551                                std::future::ready(())
552                            });
553
554                            let done_tx_clone = done_tx.clone();
555                            let internal_cmd_sender = internal_cmd_sender_clone.clone();
556                            chan.on_close(move || {
557                                if done_tx_clone.try_send(Err(Error::ChannelClosed)).is_err() {
558                                    let _ = internal_cmd_sender
559                                        .send(PeerCmdInternal::ChannelClosed(id));
560                                }
561                                std::future::ready(())
562                            });
563
564                            done_rx.recv().await.ok_or(Error::ChannelClosed)??;
565
566                            Ok(chan)
567                        };
568
569                        let _ = internal_cmd_sender
570                            .send(PeerCmdInternal::ChannelOpened(id, result.await));
571                    };
572                    let mut aborted = aborted.clone();
573                    spawn_local(async move {
574                        tokio::select! {
575                            _ = aborted.wait() => {}
576                            _ = fut => {}
577                        }
578                    });
579                }
580                PeerCmd::ChannelSend(msg_id, msg) => {
581                    let id = msg.channel_id();
582                    let err = match channels.get_msg_sender(id) {
583                        Some(msg_sender) => match msg_buf.encode(&msg) {
584                            Ok(encoded) => match msg_sender.send((msg_id, encoded, _tracker)) {
585                                Ok(_) => None,
586                                Err(_) => Some("ChannelMsgMpscSendFailed".to_owned()),
587                            },
588                            Err(err) => Some(err.to_string()),
589                        },
590                        None => Some("ChannelNotOpen".to_owned()),
591                    };
592                    if let Some(err) = err {
593                        let _ = event_sender(
594                            P2pChannelEvent::Sent(peer_id, id, msg_id, Err(err)).into(),
595                        );
596                    }
597                }
598            },
599            PeerCmdAll::Internal(PeerCmdInternal::ChannelOpened(chan_id, result)) => {
600                let (sender_tx, mut sender_rx) = mpsc::unbounded_channel();
601                let (chan, res) = match result {
602                    Ok(chan) => {
603                        channels.add(chan_id, sender_tx);
604                        (Some(chan), Ok(()))
605                    }
606                    Err(err) => (None, Err(err.to_string())),
607                };
608
609                #[allow(unused_mut)]
610                if let Some(mut chan) = chan {
611                    fn process_msg(
612                        chan_id: ChannelId,
613                        buf: &mut Vec<u8>,
614                        len: &mut u32,
615                        msg: &mut &[u8],
616                    ) -> Result<Option<ChannelMsg>, String> {
617                        let len = if buf.is_empty() {
618                            if msg.len() < 4 {
619                                return Err("WebRTCMessageTooSmall".to_owned());
620                            } else {
621                                *len = u32::from_be_bytes(
622                                    msg[..4].try_into().expect("Size checked above"),
623                                );
624                                *msg = &msg[4..];
625                                let len = *len as usize;
626                                if len > chan_id.max_msg_size() {
627                                    return Err(format!(
628                                        "ChannelMsgLenOverLimit; len: {}, limit: {}",
629                                        len,
630                                        chan_id.max_msg_size()
631                                    ));
632                                }
633                                len
634                            }
635                        } else {
636                            *len as usize
637                        };
638                        let bytes_left = len - buf.len();
639
640                        if bytes_left > msg.len() {
641                            buf.extend_from_slice(msg);
642                            *msg = &[];
643                            return Ok(None);
644                        }
645
646                        buf.extend_from_slice(&msg[..bytes_left]);
647                        *msg = &msg[bytes_left..];
648                        let msg = ChannelMsg::decode(&mut &buf[..], chan_id)
649                            .map_err(|err| err.to_string())?;
650                        buf.clear();
651                        Ok(Some(msg))
652                    }
653
654                    let mut len = 0;
655                    let mut buf = Vec::new();
656                    let event_sender_clone = event_sender.clone();
657
658                    chan.on_message(move |mut data| {
659                        while !data.is_empty() {
660                            let res = match process_msg(chan_id, &mut buf, &mut len, &mut data) {
661                                Ok(None) => continue,
662                                Ok(Some(msg)) => Ok(Box::new(msg)),
663                                Err(err) => Err(err),
664                            };
665                            let _ =
666                                event_sender_clone(P2pChannelEvent::Received(peer_id, res).into());
667                        }
668                        #[cfg(not(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp")))]
669                        std::future::ready(())
670                    });
671
672                    let event_sender = event_sender.clone();
673                    let fut = async move {
674                        // Add a delay for sending messages after channel
675                        // was opened. Some initial messages get lost otherwise.
676                        // TODO(binier): find deeper cause and fix it.
677                        sleep(Duration::from_secs(3)).await;
678
679                        while let Some((msg_id, encoded, _tracker)) = sender_rx.recv().await {
680                            let encoded = bytes::Bytes::from(encoded);
681                            let mut chunks =
682                                encoded.chunks(CHUNK_SIZE).map(|b| encoded.slice_ref(b));
683                            let result = loop {
684                                let Some(chunk) = chunks.next() else {
685                                    break Ok(());
686                                };
687                                if let Err(err) = chan
688                                    .send(&chunk)
689                                    .await
690                                    .map_err(|e| format!("{e:?}"))
691                                    .and_then(|n| match n == chunk.len() {
692                                        false => Err("NotAllBytesWritten".to_owned()),
693                                        true => Ok(()),
694                                    })
695                                {
696                                    break Err(err);
697                                }
698                            };
699
700                            let _ = event_sender(
701                                P2pChannelEvent::Sent(peer_id, chan_id, msg_id, result).into(),
702                            );
703                        }
704                    };
705
706                    let mut aborted = aborted.clone();
707                    spawn_local(async move {
708                        tokio::select! {
709                            _ = aborted.wait() => {}
710                            _ = fut => {}
711                        }
712                    });
713                }
714
715                let _ = event_sender(P2pChannelEvent::Opened(peer_id, chan_id, res).into());
716            }
717            PeerCmdAll::Internal(PeerCmdInternal::ChannelClosed(id)) => {
718                channels.remove(id);
719                let _ = event_sender(P2pChannelEvent::Closed(peer_id, id).into());
720            }
721        }
722    }
723}
724
725pub trait P2pServiceWebrtc: redux::Service {
726    type Event: From<P2pEvent> + Send + Sync + 'static;
727
728    fn random_pick(
729        &mut self,
730        list: &[P2pConnectionOutgoingInitOpts],
731    ) -> Option<P2pConnectionOutgoingInitOpts>;
732
733    fn event_sender(&self) -> &mpsc::UnboundedSender<Self::Event>;
734
735    fn cmd_sender(&self) -> &mpsc::TrackedUnboundedSender<Cmd>;
736
737    fn peers(&mut self) -> &mut BTreeMap<PeerId, PeerState>;
738
739    fn init<S: TaskSpawner>(
740        secret_key: SecretKey,
741        spawner: S,
742        rng_seed: [u8; 32],
743    ) -> P2pServiceCtx {
744        const MAX_PEERS: usize = 500;
745        let (cmd_sender, mut cmd_receiver) = mpsc::tracked_unbounded_channel();
746
747        let certificate = certificate_from_pem_key(secret_key.to_pem().as_str());
748
749        spawner.spawn_main("webrtc", async move {
750            #[allow(clippy::all)]
751            let api = build_api();
752            let conn_permits = Arc::new(Semaphore::const_new(MAX_PEERS));
753            while let Some(cmd) = cmd_receiver.recv().await {
754                match cmd.0 {
755                    Cmd::PeerAdd { args, aborted } => {
756                        #[allow(clippy::all)]
757                        let api = api.clone();
758                        let conn_permits = conn_permits.clone();
759                        let peer_id = args.peer_id;
760                        let event_sender = args.event_sender.clone();
761                        let certificate = certificate.clone();
762                        spawn_local(async move {
763                            let Ok(_permit) = conn_permits.try_acquire() else {
764                                // state machine shouldn't allow this to happen.
765                                bug_condition!("P2P WebRTC Semaphore acquisition failed!");
766                                return;
767                            };
768                            let (closed_tx, mut closed) = mpsc::channel(1);
769                            let event_sender_clone = event_sender.clone();
770                            spawn_local(async move {
771                                // to avoid sending closed multiple times
772                                let _ = closed.recv().await;
773                                event_sender_clone(P2pConnectionEvent::Closed(peer_id).into());
774                            });
775                            tokio::select! {
776                                _ = peer_start(api, args, aborted.clone(), closed_tx.clone(), certificate, rng_seed) => {}
777                                _ = aborted.wait() => {
778                                }
779                            }
780
781                            // delay dropping permit to give some time for cleanup.
782                            sleep(Duration::from_millis(100)).await;
783                            let _ = closed_tx.send(()).await;
784                        });
785                    }
786                }
787            }
788        });
789
790        P2pServiceCtx {
791            cmd_sender,
792            peers: Default::default(),
793        }
794    }
795
796    fn outgoing_init(&mut self, peer_id: PeerId) {
797        let (peer_cmd_sender, peer_cmd_receiver) = mpsc::tracked_unbounded_channel();
798        let aborter = Aborter::default();
799        let aborted = aborter.aborted();
800
801        self.peers().insert(
802            peer_id,
803            PeerState {
804                cmd_sender: peer_cmd_sender,
805                abort: aborter,
806            },
807        );
808        let event_sender = self.event_sender().clone();
809        let event_sender =
810            Arc::new(move |p2p_event: P2pEvent| event_sender.send(p2p_event.into()).ok());
811        let _ = self.cmd_sender().tracked_send(Cmd::PeerAdd {
812            args: PeerAddArgs {
813                peer_id,
814                kind: PeerConnectionKind::Outgoing,
815                event_sender,
816                cmd_receiver: peer_cmd_receiver,
817            },
818            aborted,
819        });
820    }
821
822    fn incoming_init(&mut self, peer_id: PeerId, offer: webrtc::Offer) {
823        let (peer_cmd_sender, peer_cmd_receiver) = mpsc::tracked_unbounded_channel();
824        let aborter = Aborter::default();
825        let aborted = aborter.aborted();
826
827        self.peers().insert(
828            peer_id,
829            PeerState {
830                cmd_sender: peer_cmd_sender,
831                abort: aborter,
832            },
833        );
834        let event_sender = self.event_sender().clone();
835        let event_sender =
836            Arc::new(move |p2p_event: P2pEvent| event_sender.send(p2p_event.into()).ok());
837        let _ = self.cmd_sender().tracked_send(Cmd::PeerAdd {
838            args: PeerAddArgs {
839                peer_id,
840                kind: PeerConnectionKind::Incoming(Box::new(offer)),
841                event_sender,
842                cmd_receiver: peer_cmd_receiver,
843            },
844            aborted,
845        });
846    }
847
848    fn set_answer(&mut self, peer_id: PeerId, answer: webrtc::Answer) {
849        if let Some(peer) = self.peers().get(&peer_id) {
850            let _ = peer.cmd_sender.tracked_send(PeerCmd::AnswerSet(answer));
851        }
852    }
853
854    fn http_signaling_request(&mut self, url: String, offer: webrtc::Offer) {
855        if let Some(peer) = self.peers().get(&offer.target_peer_id) {
856            let _ = peer
857                .cmd_sender
858                .tracked_send(PeerCmd::PeerHttpOfferSend(url, offer));
859        }
860    }
861
862    fn disconnect(&mut self, peer_id: PeerId) -> bool {
863        // TODO(binier): improve
864        // By removing the peer, `abort` gets dropped which will
865        // cause `peer_loop` to end.
866        if let Some(_peer) = self.peers().remove(&peer_id) {
867            // if peer.abort.receiver_count() > 0 {
868            //     // peer disconnection not yet finished
869            //     return false;
870            // }
871        } else {
872            mina_core::error!(mina_core::log::system_time(); "`disconnect` shouldn't be used for libp2p peers");
873        }
874        true
875    }
876
877    fn channel_open(&mut self, peer_id: PeerId, id: ChannelId) {
878        if let Some(peer) = self.peers().get(&peer_id) {
879            let _ = peer.cmd_sender.tracked_send(PeerCmd::ChannelOpen(id));
880        }
881    }
882
883    fn channel_send(&mut self, peer_id: PeerId, msg_id: MsgId, msg: ChannelMsg) {
884        if let Some(peer) = self.peers().get(&peer_id) {
885            let _ = peer
886                .cmd_sender
887                .tracked_send(PeerCmd::ChannelSend(msg_id, msg));
888        }
889    }
890
891    fn encrypt<T: EncryptableType>(
892        &mut self,
893        other_pk: &PublicKey,
894        message: &T,
895    ) -> Result<T::Encrypted, Box<dyn std::error::Error>>;
896
897    fn decrypt<T: EncryptableType>(
898        &mut self,
899        other_pub_key: &PublicKey,
900        encrypted: &T::Encrypted,
901    ) -> Result<T, Box<dyn std::error::Error>>;
902
903    fn auth_send(
904        &mut self,
905        peer_id: PeerId,
906        _other_pub_key: &PublicKey,
907        auth: Option<ConnectionAuthEncrypted>,
908    ) {
909        if let Some(peer) = self.peers().get(&peer_id) {
910            let _ = peer
911                .cmd_sender
912                .tracked_send(PeerCmd::ConnectionAuthorizationSend(auth));
913        }
914    }
915
916    fn auth_encrypt_and_send(
917        &mut self,
918        peer_id: PeerId,
919        other_pub_key: &PublicKey,
920        auth: ConnectionAuth,
921    );
922
923    fn auth_decrypt(
924        &mut self,
925        other_pub_key: &PublicKey,
926        auth: ConnectionAuthEncrypted,
927    ) -> Option<ConnectionAuth>;
928}
929
930impl P2pServiceCtx {
931    pub fn pending_cmds(&self) -> usize {
932        self.peers
933            .iter()
934            .fold(self.cmd_sender.len(), |acc, (_, peer)| {
935                acc + peer.cmd_sender.len()
936            })
937    }
938}