p2p/service_impl/webrtc/
mod.rs

1#[cfg(target_arch = "wasm32")]
2mod web;
3#[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp"))]
4mod webrtc_cpp;
5#[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-rs"))]
6mod webrtc_rs;
7
8use std::{collections::BTreeMap, future::Future, pin::Pin, sync::Arc, time::Duration};
9
10use openmina_core::bug_condition;
11use serde::Serialize;
12use tokio::sync::Semaphore;
13
14#[cfg(not(target_arch = "wasm32"))]
15use tokio::task::spawn_local;
16#[cfg(target_arch = "wasm32")]
17use wasm_bindgen_futures::spawn_local;
18
19use openmina_core::channels::{mpsc, oneshot, Aborted, Aborter};
20
21use crate::{
22    channels::{ChannelId, ChannelMsg, MsgId},
23    connection::outgoing::P2pConnectionOutgoingInitOpts,
24    identity::{EncryptableType, PublicKey, SecretKey},
25    webrtc,
26    webrtc::{ConnectionAuth, ConnectionAuthEncrypted},
27    P2pChannelEvent, P2pConnectionEvent, P2pEvent, PeerId,
28};
29
30#[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-rs"))]
31mod imports {
32    pub use super::webrtc_rs::{
33        build_api, certificate_from_pem_key, webrtc_signal_send, Api, RTCCertificate, RTCChannel,
34        RTCConnection, RTCConnectionState, RTCSignalingError,
35    };
36}
37#[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp"))]
38mod imports {
39    pub use super::webrtc_cpp::{
40        build_api, certificate_from_pem_key, webrtc_signal_send, Api, RTCCertificate, RTCChannel,
41        RTCConnection, RTCConnectionState, RTCSignalingError,
42    };
43}
44#[cfg(target_arch = "wasm32")]
45mod imports {
46    pub use super::web::{
47        build_api, certificate_from_pem_key, webrtc_signal_send, Api, RTCCertificate, RTCChannel,
48        RTCConnection, RTCConnectionState, RTCSignalingError,
49    };
50}
51
52use imports::*;
53pub use imports::{webrtc_signal_send, RTCSignalingError};
54
55use super::TaskSpawner;
56
57/// 16KB.
58const CHUNK_SIZE: usize = 16 * 1024;
59
60pub enum Cmd {
61    PeerAdd { args: PeerAddArgs, aborted: Aborted },
62}
63
64#[derive(Debug)]
65pub enum PeerCmd {
66    PeerHttpOfferSend(String, webrtc::Offer),
67    AnswerSet(webrtc::Answer),
68    ConnectionAuthorizationSend(Option<ConnectionAuthEncrypted>),
69    ChannelOpen(ChannelId),
70    ChannelSend(MsgId, ChannelMsg),
71}
72
73enum PeerCmdInternal {
74    ChannelOpened(ChannelId, Result<RTCChannel, Error>),
75    ChannelClosed(ChannelId),
76}
77
78enum PeerCmdAll {
79    External(PeerCmd),
80    Internal(PeerCmdInternal),
81}
82
83pub struct P2pServiceCtx {
84    pub cmd_sender: mpsc::TrackedUnboundedSender<Cmd>,
85    pub peers: BTreeMap<PeerId, PeerState>,
86}
87
88pub struct PeerAddArgs {
89    peer_id: PeerId,
90    kind: PeerConnectionKind,
91    event_sender: Arc<dyn Fn(P2pEvent) -> Option<()> + Send + Sync + 'static>,
92    cmd_receiver: mpsc::TrackedUnboundedReceiver<PeerCmd>,
93}
94
95pub enum PeerConnectionKind {
96    Outgoing,
97    Incoming(Box<webrtc::Offer>),
98}
99
100pub struct PeerState {
101    pub cmd_sender: mpsc::TrackedUnboundedSender<PeerCmd>,
102    pub abort: Aborter,
103}
104
105#[derive(thiserror::Error, derive_more::From, Debug)]
106pub(super) enum Error {
107    #[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-rs"))]
108    #[error("{0}")]
109    Rtc(::webrtc::Error),
110    #[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp"))]
111    #[error("{0}")]
112    Rtc(::datachannel::Error),
113    #[cfg(target_arch = "wasm32")]
114    #[error("js error: {0:?}")]
115    RtcJs(String),
116    #[error("signaling error: {0}")]
117    Signaling(RTCSignalingError),
118    #[error("unexpected cmd received")]
119    UnexpectedCmd,
120    #[from(ignore)]
121    #[error("channel closed")]
122    ChannelClosed,
123}
124
125#[cfg(target_arch = "wasm32")]
126impl From<wasm_bindgen::JsValue> for Error {
127    fn from(value: wasm_bindgen::JsValue) -> Self {
128        Error::RtcJs(format!("{value:?}"))
129    }
130}
131
132pub type OnConnectionStateChangeHdlrFn = Box<
133    dyn (FnMut(RTCConnectionState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
134        + Send
135        + Sync,
136>;
137
138pub struct RTCConfig {
139    pub ice_servers: RTCConfigIceServers,
140    pub certificate: RTCCertificate,
141    pub seed: [u8; 32],
142}
143
144#[derive(Serialize)]
145pub struct RTCConfigIceServers(Vec<RTCConfigIceServer>);
146#[derive(Serialize)]
147pub struct RTCConfigIceServer {
148    pub urls: Vec<String>,
149    pub username: Option<String>,
150    pub credential: Option<String>,
151}
152
153#[derive(Serialize)]
154pub struct RTCChannelConfig {
155    pub label: &'static str,
156    pub negotiated: Option<u16>,
157}
158
159impl Default for RTCConfigIceServers {
160    fn default() -> Self {
161        Self(vec![
162            RTCConfigIceServer {
163                urls: vec!["stun:65.109.110.75:3478".to_owned()],
164                username: Some("openmina".to_owned()),
165                credential: Some("webrtc".to_owned()),
166            },
167            RTCConfigIceServer {
168                urls: vec!["stun:176.9.147.28:3478".to_owned()],
169                username: None,
170                credential: None,
171            },
172            RTCConfigIceServer {
173                urls: vec![
174                    "stun:stun.l.google.com:19302".to_owned(),
175                    "stun:stun1.l.google.com:19302".to_owned(),
176                    "stun:stun2.l.google.com:19302".to_owned(),
177                    "stun:stun3.l.google.com:19302".to_owned(),
178                    "stun:stun4.l.google.com:19302".to_owned(),
179                ],
180                username: None,
181                credential: None,
182            },
183        ])
184    }
185}
186
187impl std::ops::Deref for RTCConfigIceServers {
188    type Target = Vec<RTCConfigIceServer>;
189
190    fn deref(&self) -> &Self::Target {
191        &self.0
192    }
193}
194
195#[cfg(not(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp")))]
196impl Drop for RTCConnection {
197    fn drop(&mut self) {
198        if self.is_main() {
199            let cloned = self.clone();
200            spawn_local(async move {
201                let _ = cloned.close().await;
202            });
203        }
204    }
205}
206
207async fn sleep(dur: Duration) {
208    #[cfg(not(target_arch = "wasm32"))]
209    let fut = tokio::time::sleep(dur);
210    #[cfg(target_arch = "wasm32")]
211    let fut = gloo_timers::future::TimeoutFuture::new(dur.as_millis() as u32);
212    fut.await
213}
214
215async fn wait_for_ice_gathering_complete(pc: &mut RTCConnection) {
216    let timeout = sleep(Duration::from_secs(3));
217
218    tokio::select! {
219        _ = timeout => {}
220        _ = pc.wait_for_ice_gathering_complete() => {}
221    }
222}
223
224async fn peer_start(
225    api: Api,
226    args: PeerAddArgs,
227    abort: Aborted,
228    closed: mpsc::Sender<()>,
229    certificate: RTCCertificate,
230    rng_seed: [u8; 32],
231) {
232    let PeerAddArgs {
233        peer_id,
234        kind,
235        event_sender,
236        mut cmd_receiver,
237    } = args;
238    let is_outgoing = matches!(kind, PeerConnectionKind::Outgoing);
239
240    let config = RTCConfig {
241        ice_servers: Default::default(),
242        certificate,
243        seed: rng_seed,
244    };
245    let fut = async {
246        let mut pc = RTCConnection::create(&api, config).await?;
247        let main_channel = pc
248            .channel_create(RTCChannelConfig {
249                label: "",
250                negotiated: Some(0),
251            })
252            .await?;
253
254        let offer = match kind {
255            PeerConnectionKind::Incoming(offer) => (*offer).try_into()?,
256            PeerConnectionKind::Outgoing => pc.offer_create().await?,
257        };
258
259        if is_outgoing {
260            pc.local_desc_set(offer).await?;
261            wait_for_ice_gathering_complete(&mut pc).await;
262        } else {
263            pc.remote_desc_set(offer).await?;
264        }
265
266        Result::<_, Error>::Ok((pc, main_channel))
267    };
268
269    #[allow(unused_mut)]
270    let (mut pc, mut main_channel) = match fut.await {
271        Ok(v) => v,
272        Err(err) => {
273            event_sender(P2pConnectionEvent::OfferSdpReady(peer_id, Err(err.to_string())).into());
274            return;
275        }
276    };
277
278    let (main_channel_open_tx, main_channel_open) = oneshot::channel::<()>();
279    let mut main_channel_open_tx = Some(main_channel_open_tx);
280    main_channel.on_open(move || {
281        if let Some(tx) = main_channel_open_tx.take() {
282            let _ = tx.send(());
283        }
284        std::future::ready(())
285    });
286
287    let answer = if is_outgoing {
288        let answer_fut = async {
289            let sdp = pc.local_sdp().await.unwrap();
290            event_sender(P2pConnectionEvent::OfferSdpReady(peer_id, Ok(sdp)).into())
291                .ok_or(Error::ChannelClosed)?;
292            match cmd_receiver.recv().await.ok_or(Error::ChannelClosed)?.0 {
293                PeerCmd::PeerHttpOfferSend(url, offer) => {
294                    let answer = webrtc_signal_send(&url, offer).await?;
295                    event_sender(P2pConnectionEvent::AnswerReceived(peer_id, answer).into())
296                        .ok_or(Error::ChannelClosed)?;
297
298                    if let PeerCmd::AnswerSet(v) =
299                        cmd_receiver.recv().await.ok_or(Error::ChannelClosed)?.0
300                    {
301                        return Ok(v);
302                    }
303                }
304                PeerCmd::AnswerSet(v) => return Ok(v),
305                _cmd => {
306                    return Err(Error::UnexpectedCmd);
307                }
308            }
309            Err(Error::ChannelClosed)
310        };
311        answer_fut.await.and_then(|v| Ok(v.try_into()?))
312    } else {
313        pc.answer_create().await.map_err(Error::from)
314    };
315    let Ok(answer) = answer else {
316        return;
317    };
318
319    if is_outgoing {
320        if let Err(err) = pc.remote_desc_set(answer).await {
321            let err = Error::from(err).to_string();
322            let _ = event_sender(P2pConnectionEvent::Finalized(peer_id, Err(err)).into());
323        }
324    } else {
325        let fut = async {
326            pc.local_desc_set(answer).await?;
327            wait_for_ice_gathering_complete(&mut pc).await;
328            Ok(pc.local_sdp().await.unwrap())
329        };
330        let res = fut.await.map_err(|err: Error| err.to_string());
331        let is_err = res.is_err();
332        let is_err = is_err
333            || event_sender(P2pConnectionEvent::AnswerSdpReady(peer_id, res).into()).is_none();
334        if is_err {
335            return;
336        }
337    }
338
339    let (connected_tx, connected) = oneshot::channel();
340    if matches!(pc.connection_state(), RTCConnectionState::Connected) {
341        connected_tx.send(Ok(())).unwrap();
342    } else {
343        let mut connected_tx = Some(connected_tx);
344        pc.on_connection_state_change(Box::new(move |state| {
345            match state {
346                RTCConnectionState::Connected => {
347                    if let Some(connected_tx) = connected_tx.take() {
348                        let _ = connected_tx.send(Ok(()));
349                    }
350                }
351                RTCConnectionState::Disconnected | RTCConnectionState::Closed => {
352                    if let Some(connected_tx) = connected_tx.take() {
353                        let _ = connected_tx.send(Err("disconnected"));
354                    } else {
355                        let _ = closed.try_send(());
356                    }
357                }
358                _ => {}
359            }
360            Box::pin(std::future::ready(()))
361        }));
362    }
363    match connected
364        .await
365        .map_err(|_| Error::ChannelClosed.to_string())
366        .and_then(|res| res.map_err(|v| v.to_string()))
367    {
368        Ok(_) => {}
369        Err(err) => {
370            let _ = event_sender(P2pConnectionEvent::Finalized(peer_id, Err(err)).into());
371            return;
372        }
373    }
374
375    // Exchange encrypted connection authorization messsages. Makes sure
376    // there is a link between peer identity and connection.
377    let (remote_auth_tx, remote_auth_rx) = oneshot::channel::<ConnectionAuthEncrypted>();
378    let mut remote_auth_tx = Some(remote_auth_tx);
379    main_channel.on_message(move |data| {
380        if let Some(tx) = remote_auth_tx.take() {
381            if let Ok(auth) = data.try_into() {
382                let _ = tx.send(auth);
383            }
384        }
385        #[cfg(not(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp")))]
386        std::future::ready(())
387    });
388    let msg = match cmd_receiver.recv().await {
389        None => return,
390        Some(msg) => msg,
391    };
392    match msg.0 {
393        PeerCmd::ConnectionAuthorizationSend(None) => {
394            // eprintln!("PeerCmd::ConnectionAuthorizationSend(None)");
395            return;
396        }
397        PeerCmd::ConnectionAuthorizationSend(Some(auth)) => {
398            let _ = main_channel_open.await;
399
400            // Add a delay for sending messages after channel
401            // was opened. Some initial messages get lost otherwise.
402            // TODO(binier): find deeper cause and fix it.
403            sleep(Duration::from_secs(1)).await;
404            let _ = main_channel
405                .send(&bytes::Bytes::copy_from_slice(auth.as_ref()))
406                .await;
407
408            let res = match remote_auth_rx.await {
409                Err(_) => Err("didn't receive connection authentication message".to_owned()),
410                Ok(remote_auth) => Ok(remote_auth),
411            };
412            let is_err = res.is_err();
413            let _ = event_sender(P2pConnectionEvent::Finalized(peer_id, res).into());
414            if is_err {
415                return;
416            }
417        }
418        cmd => {
419            bug_condition!("unexpected peer cmd! Expected `PeerCmd::ConnectionAuthorizationSend`. received: {cmd:?}");
420            return;
421        }
422    }
423
424    let _ = main_channel.close().await;
425
426    peer_loop(peer_id, event_sender, cmd_receiver, pc, abort).await
427}
428
429struct Channel {
430    id: ChannelId,
431    msg_sender: ChannelMsgSender,
432}
433
434type ChannelMsgSender = mpsc::UnboundedSender<(MsgId, Vec<u8>, Option<mpsc::Tracker>)>;
435
436struct MsgBuffer {
437    buf: Vec<u8>,
438}
439
440impl MsgBuffer {
441    fn new(capacity: usize) -> Self {
442        Self {
443            buf: Vec::with_capacity(capacity),
444        }
445    }
446
447    fn encode(&mut self, msg: &ChannelMsg) -> Result<Vec<u8>, std::io::Error> {
448        msg.encode(&mut self.buf)?;
449        let len_encoded = (self.buf.len() as u32).to_be_bytes();
450        let encoded = len_encoded
451            .into_iter()
452            .chain(self.buf.iter().cloned())
453            .collect();
454        self.buf.clear();
455        Ok(encoded)
456    }
457}
458
459struct Channels {
460    list: Vec<Channel>,
461}
462
463impl Channels {
464    fn new() -> Self {
465        Self {
466            list: Vec::with_capacity(32),
467        }
468    }
469
470    fn get_msg_sender(&self, id: ChannelId) -> Option<&ChannelMsgSender> {
471        self.list.iter().find(|c| c.id == id).map(|c| &c.msg_sender)
472    }
473
474    fn add(&mut self, id: ChannelId, msg_sender: ChannelMsgSender) {
475        self.list.push(Channel { id, msg_sender });
476    }
477
478    fn remove(&mut self, id: ChannelId) -> bool {
479        match self.list.iter().position(|c| c.id == id) {
480            None => false,
481            Some(index) => {
482                self.list.remove(index);
483                true
484            }
485        }
486    }
487}
488
489// TODO(binier): remove unwraps
490#[allow(unused_mut)]
491async fn peer_loop(
492    peer_id: PeerId,
493    event_sender: Arc<dyn Fn(P2pEvent) -> Option<()> + Send + Sync + 'static>,
494    mut cmd_receiver: mpsc::TrackedUnboundedReceiver<PeerCmd>,
495    mut pc: RTCConnection,
496    aborted: Aborted,
497) {
498    // TODO(binier): maybe use small_vec (stack allocated) or something like that.
499    let mut channels = Channels::new();
500    let mut msg_buf = MsgBuffer::new(64 * 1024);
501
502    let (internal_cmd_sender, mut internal_cmd_receiver) =
503        mpsc::unbounded_channel::<PeerCmdInternal>();
504
505    while matches!(pc.connection_state(), RTCConnectionState::Connected) {
506        let (cmd, _tracker) = tokio::select! {
507            cmd = cmd_receiver.recv() => match cmd {
508                None => return,
509                Some(cmd) => (PeerCmdAll::External(cmd.0), Some(cmd.1)),
510            },
511            cmd = internal_cmd_receiver.recv() => match cmd {
512                None => return,
513                Some(cmd) => (PeerCmdAll::Internal(cmd), None),
514            },
515        };
516        match cmd {
517            PeerCmdAll::External(
518                PeerCmd::PeerHttpOfferSend(..)
519                | PeerCmd::AnswerSet(_)
520                | PeerCmd::ConnectionAuthorizationSend(_),
521            ) => {
522                bug_condition!("unexpected peer cmd");
523            }
524            PeerCmdAll::External(PeerCmd::ChannelOpen(id)) => {
525                let chan = pc
526                    .channel_create(RTCChannelConfig {
527                        label: id.name(),
528                        negotiated: Some(id.to_u16()),
529                    })
530                    .await;
531                let internal_cmd_sender = internal_cmd_sender.clone();
532                let fut = async move {
533                    let internal_cmd_sender_clone = internal_cmd_sender.clone();
534                    let result = async move {
535                        let chan = chan?;
536
537                        let (done_tx, mut done_rx) = mpsc::channel::<Result<(), Error>>(1);
538
539                        let done_tx_clone = done_tx.clone();
540                        chan.on_open(move || {
541                            let _ = done_tx_clone.try_send(Ok(()));
542                            std::future::ready(())
543                        });
544
545                        let done_tx_clone = done_tx.clone();
546                        let internal_cmd_sender = internal_cmd_sender_clone.clone();
547                        chan.on_error(move |err| {
548                            if done_tx_clone.try_send(Err(err.into())).is_err() {
549                                let _ =
550                                    internal_cmd_sender.send(PeerCmdInternal::ChannelClosed(id));
551                            }
552                            std::future::ready(())
553                        });
554
555                        let done_tx_clone = done_tx.clone();
556                        let internal_cmd_sender = internal_cmd_sender_clone.clone();
557                        chan.on_close(move || {
558                            if done_tx_clone.try_send(Err(Error::ChannelClosed)).is_err() {
559                                let _ =
560                                    internal_cmd_sender.send(PeerCmdInternal::ChannelClosed(id));
561                            }
562                            std::future::ready(())
563                        });
564
565                        done_rx.recv().await.ok_or(Error::ChannelClosed)??;
566
567                        Ok(chan)
568                    };
569
570                    let _ =
571                        internal_cmd_sender.send(PeerCmdInternal::ChannelOpened(id, result.await));
572                };
573                let mut aborted = aborted.clone();
574                spawn_local(async move {
575                    tokio::select! {
576                        _ = aborted.wait() => {}
577                        _ = fut => {}
578                    }
579                });
580            }
581            PeerCmdAll::External(PeerCmd::ChannelSend(msg_id, msg)) => {
582                let id = msg.channel_id();
583                let err = match channels.get_msg_sender(id) {
584                    Some(msg_sender) => match msg_buf.encode(&msg) {
585                        Ok(encoded) => match msg_sender.send((msg_id, encoded, _tracker)) {
586                            Ok(_) => None,
587                            Err(_) => Some("ChannelMsgMpscSendFailed".to_owned()),
588                        },
589                        Err(err) => Some(err.to_string()),
590                    },
591                    None => Some("ChannelNotOpen".to_owned()),
592                };
593                if let Some(err) = err {
594                    let _ =
595                        event_sender(P2pChannelEvent::Sent(peer_id, id, msg_id, Err(err)).into());
596                }
597            }
598            PeerCmdAll::Internal(PeerCmdInternal::ChannelOpened(chan_id, result)) => {
599                let (sender_tx, mut sender_rx) = mpsc::unbounded_channel();
600                let (chan, res) = match result {
601                    Ok(chan) => {
602                        channels.add(chan_id, sender_tx);
603                        (Some(chan), Ok(()))
604                    }
605                    Err(err) => (None, Err(err.to_string())),
606                };
607
608                #[allow(unused_mut)]
609                if let Some(mut chan) = chan {
610                    fn process_msg(
611                        chan_id: ChannelId,
612                        buf: &mut Vec<u8>,
613                        len: &mut u32,
614                        msg: &mut &[u8],
615                    ) -> Result<Option<ChannelMsg>, String> {
616                        let len = if buf.is_empty() {
617                            if msg.len() < 4 {
618                                return Err("WebRTCMessageTooSmall".to_owned());
619                            } else {
620                                *len = u32::from_be_bytes(
621                                    msg[..4].try_into().expect("Size checked above"),
622                                );
623                                *msg = &msg[4..];
624                                let len = *len as usize;
625                                if len > chan_id.max_msg_size() {
626                                    return Err(format!(
627                                        "ChannelMsgLenOverLimit; len: {}, limit: {}",
628                                        len,
629                                        chan_id.max_msg_size()
630                                    ));
631                                }
632                                len
633                            }
634                        } else {
635                            *len as usize
636                        };
637                        let bytes_left = len - buf.len();
638
639                        if bytes_left > msg.len() {
640                            buf.extend_from_slice(msg);
641                            *msg = &[];
642                            return Ok(None);
643                        }
644
645                        buf.extend_from_slice(&msg[..bytes_left]);
646                        *msg = &msg[bytes_left..];
647                        let msg = ChannelMsg::decode(&mut &buf[..], chan_id)
648                            .map_err(|err| err.to_string())?;
649                        buf.clear();
650                        Ok(Some(msg))
651                    }
652
653                    let mut len = 0;
654                    let mut buf = Vec::new();
655                    let event_sender_clone = event_sender.clone();
656
657                    chan.on_message(move |mut data| {
658                        while !data.is_empty() {
659                            let res = match process_msg(chan_id, &mut buf, &mut len, &mut data) {
660                                Ok(None) => continue,
661                                Ok(Some(msg)) => Ok(msg),
662                                Err(err) => Err(err),
663                            };
664                            let _ =
665                                event_sender_clone(P2pChannelEvent::Received(peer_id, res).into());
666                        }
667                        #[cfg(not(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp")))]
668                        std::future::ready(())
669                    });
670
671                    let event_sender = event_sender.clone();
672                    let fut = async move {
673                        // Add a delay for sending messages after channel
674                        // was opened. Some initial messages get lost otherwise.
675                        // TODO(binier): find deeper cause and fix it.
676                        sleep(Duration::from_secs(3)).await;
677
678                        while let Some((msg_id, encoded, _tracker)) = sender_rx.recv().await {
679                            let encoded = bytes::Bytes::from(encoded);
680                            let mut chunks =
681                                encoded.chunks(CHUNK_SIZE).map(|b| encoded.slice_ref(b));
682                            let result = loop {
683                                let Some(chunk) = chunks.next() else {
684                                    break Ok(());
685                                };
686                                if let Err(err) = chan
687                                    .send(&chunk)
688                                    .await
689                                    .map_err(|e| format!("{e:?}"))
690                                    .and_then(|n| match n == chunk.len() {
691                                        false => Err("NotAllBytesWritten".to_owned()),
692                                        true => Ok(()),
693                                    })
694                                {
695                                    break Err(err);
696                                }
697                            };
698
699                            let _ = event_sender(
700                                P2pChannelEvent::Sent(peer_id, chan_id, msg_id, result).into(),
701                            );
702                        }
703                    };
704
705                    let mut aborted = aborted.clone();
706                    spawn_local(async move {
707                        tokio::select! {
708                            _ = aborted.wait() => {}
709                            _ = fut => {}
710                        }
711                    });
712                }
713
714                let _ = event_sender(P2pChannelEvent::Opened(peer_id, chan_id, res).into());
715            }
716            PeerCmdAll::Internal(PeerCmdInternal::ChannelClosed(id)) => {
717                channels.remove(id);
718                let _ = event_sender(P2pChannelEvent::Closed(peer_id, id).into());
719            }
720        }
721    }
722}
723
724pub trait P2pServiceWebrtc: redux::Service {
725    type Event: From<P2pEvent> + Send + Sync + 'static;
726
727    fn random_pick(
728        &mut self,
729        list: &[P2pConnectionOutgoingInitOpts],
730    ) -> Option<P2pConnectionOutgoingInitOpts>;
731
732    fn event_sender(&self) -> &mpsc::UnboundedSender<Self::Event>;
733
734    fn cmd_sender(&self) -> &mpsc::TrackedUnboundedSender<Cmd>;
735
736    fn peers(&mut self) -> &mut BTreeMap<PeerId, PeerState>;
737
738    fn init<S: TaskSpawner>(
739        secret_key: SecretKey,
740        spawner: S,
741        rng_seed: [u8; 32],
742    ) -> P2pServiceCtx {
743        const MAX_PEERS: usize = 500;
744        let (cmd_sender, mut cmd_receiver) = mpsc::tracked_unbounded_channel();
745
746        let certificate = certificate_from_pem_key(secret_key.to_pem().as_str());
747
748        spawner.spawn_main("webrtc", async move {
749            #[allow(clippy::all)]
750            let api = build_api();
751            let conn_permits = Arc::new(Semaphore::const_new(MAX_PEERS));
752            while let Some(cmd) = cmd_receiver.recv().await {
753                match cmd.0 {
754                    Cmd::PeerAdd { args, aborted } => {
755                        #[allow(clippy::all)]
756                        let api = api.clone();
757                        let conn_permits = conn_permits.clone();
758                        let peer_id = args.peer_id;
759                        let event_sender = args.event_sender.clone();
760                        let certificate = certificate.clone();
761                        spawn_local(async move {
762                            let Ok(_permit) = conn_permits.try_acquire() else {
763                                // state machine shouldn't allow this to happen.
764                                bug_condition!("P2P WebRTC Semaphore acquisition failed!");
765                                return;
766                            };
767                            let (closed_tx, mut closed) = mpsc::channel(1);
768                            let event_sender_clone = event_sender.clone();
769                            spawn_local(async move {
770                                // to avoid sending closed multiple times
771                                let _ = closed.recv().await;
772                                event_sender_clone(P2pConnectionEvent::Closed(peer_id).into());
773                            });
774                            tokio::select! {
775                                _ = peer_start(api, args, aborted.clone(), closed_tx.clone(), certificate, rng_seed) => {}
776                                _ = aborted.wait() => {
777                                }
778                            }
779
780                            // delay dropping permit to give some time for cleanup.
781                            sleep(Duration::from_millis(100)).await;
782                            let _ = closed_tx.send(()).await;
783                        });
784                    }
785                }
786            }
787        });
788
789        P2pServiceCtx {
790            cmd_sender,
791            peers: Default::default(),
792        }
793    }
794
795    fn outgoing_init(&mut self, peer_id: PeerId) {
796        let (peer_cmd_sender, peer_cmd_receiver) = mpsc::tracked_unbounded_channel();
797        let aborter = Aborter::default();
798        let aborted = aborter.aborted();
799
800        self.peers().insert(
801            peer_id,
802            PeerState {
803                cmd_sender: peer_cmd_sender,
804                abort: aborter,
805            },
806        );
807        let event_sender = self.event_sender().clone();
808        let event_sender =
809            Arc::new(move |p2p_event: P2pEvent| event_sender.send(p2p_event.into()).ok());
810        let _ = self.cmd_sender().tracked_send(Cmd::PeerAdd {
811            args: PeerAddArgs {
812                peer_id,
813                kind: PeerConnectionKind::Outgoing,
814                event_sender,
815                cmd_receiver: peer_cmd_receiver,
816            },
817            aborted,
818        });
819    }
820
821    fn incoming_init(&mut self, peer_id: PeerId, offer: webrtc::Offer) {
822        let (peer_cmd_sender, peer_cmd_receiver) = mpsc::tracked_unbounded_channel();
823        let aborter = Aborter::default();
824        let aborted = aborter.aborted();
825
826        self.peers().insert(
827            peer_id,
828            PeerState {
829                cmd_sender: peer_cmd_sender,
830                abort: aborter,
831            },
832        );
833        let event_sender = self.event_sender().clone();
834        let event_sender =
835            Arc::new(move |p2p_event: P2pEvent| event_sender.send(p2p_event.into()).ok());
836        let _ = self.cmd_sender().tracked_send(Cmd::PeerAdd {
837            args: PeerAddArgs {
838                peer_id,
839                kind: PeerConnectionKind::Incoming(Box::new(offer)),
840                event_sender,
841                cmd_receiver: peer_cmd_receiver,
842            },
843            aborted,
844        });
845    }
846
847    fn set_answer(&mut self, peer_id: PeerId, answer: webrtc::Answer) {
848        if let Some(peer) = self.peers().get(&peer_id) {
849            let _ = peer.cmd_sender.tracked_send(PeerCmd::AnswerSet(answer));
850        }
851    }
852
853    fn http_signaling_request(&mut self, url: String, offer: webrtc::Offer) {
854        if let Some(peer) = self.peers().get(&offer.target_peer_id) {
855            let _ = peer
856                .cmd_sender
857                .tracked_send(PeerCmd::PeerHttpOfferSend(url, offer));
858        }
859    }
860
861    fn disconnect(&mut self, peer_id: PeerId) -> bool {
862        // TODO(binier): improve
863        // By removing the peer, `abort` gets dropped which will
864        // cause `peer_loop` to end.
865        if let Some(_peer) = self.peers().remove(&peer_id) {
866            // if peer.abort.receiver_count() > 0 {
867            //     // peer disconnection not yet finished
868            //     return false;
869            // }
870        } else {
871            openmina_core::error!(openmina_core::log::system_time(); "`disconnect` shouldn't be used for libp2p peers");
872        }
873        true
874    }
875
876    fn channel_open(&mut self, peer_id: PeerId, id: ChannelId) {
877        if let Some(peer) = self.peers().get(&peer_id) {
878            let _ = peer.cmd_sender.tracked_send(PeerCmd::ChannelOpen(id));
879        }
880    }
881
882    fn channel_send(&mut self, peer_id: PeerId, msg_id: MsgId, msg: ChannelMsg) {
883        if let Some(peer) = self.peers().get(&peer_id) {
884            let _ = peer
885                .cmd_sender
886                .tracked_send(PeerCmd::ChannelSend(msg_id, msg));
887        }
888    }
889
890    fn encrypt<T: EncryptableType>(
891        &mut self,
892        other_pk: &PublicKey,
893        message: &T,
894    ) -> Result<T::Encrypted, Box<dyn std::error::Error>>;
895
896    fn decrypt<T: EncryptableType>(
897        &mut self,
898        other_pub_key: &PublicKey,
899        encrypted: &T::Encrypted,
900    ) -> Result<T, Box<dyn std::error::Error>>;
901
902    fn auth_send(
903        &mut self,
904        peer_id: PeerId,
905        _other_pub_key: &PublicKey,
906        auth: Option<ConnectionAuthEncrypted>,
907    ) {
908        if let Some(peer) = self.peers().get(&peer_id) {
909            let _ = peer
910                .cmd_sender
911                .tracked_send(PeerCmd::ConnectionAuthorizationSend(auth));
912        }
913    }
914
915    fn auth_encrypt_and_send(
916        &mut self,
917        peer_id: PeerId,
918        other_pub_key: &PublicKey,
919        auth: ConnectionAuth,
920    );
921
922    fn auth_decrypt(
923        &mut self,
924        other_pub_key: &PublicKey,
925        auth: ConnectionAuthEncrypted,
926    ) -> Option<ConnectionAuth>;
927}
928
929impl P2pServiceCtx {
930    pub fn pending_cmds(&self) -> usize {
931        self.peers
932            .iter()
933            .fold(self.cmd_sender.len(), |acc, (_, peer)| {
934                acc + peer.cmd_sender.len()
935            })
936    }
937}