1#[cfg(target_arch = "wasm32")]
2mod web;
3#[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp"))]
4mod webrtc_cpp;
5#[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-rs"))]
6mod webrtc_rs;
7
8use std::{collections::BTreeMap, future::Future, pin::Pin, sync::Arc, time::Duration};
9
10use mina_core::bug_condition;
11use serde::Serialize;
12use tokio::sync::Semaphore;
13
14#[cfg(not(target_arch = "wasm32"))]
15use tokio::task::spawn_local;
16#[cfg(target_arch = "wasm32")]
17use wasm_bindgen_futures::spawn_local;
18
19use mina_core::channels::{mpsc, oneshot, Aborted, Aborter};
20
21use crate::{
22 channels::{ChannelId, ChannelMsg, MsgId},
23 connection::outgoing::P2pConnectionOutgoingInitOpts,
24 identity::{EncryptableType, PublicKey, SecretKey},
25 webrtc,
26 webrtc::{ConnectionAuth, ConnectionAuthEncrypted},
27 P2pChannelEvent, P2pConnectionEvent, P2pEvent, PeerId,
28};
29
30#[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-rs"))]
31mod imports {
32 pub use super::webrtc_rs::{
33 build_api, certificate_from_pem_key, webrtc_signal_send, Api, RTCCertificate, RTCChannel,
34 RTCConnection, RTCConnectionState, RTCSignalingError,
35 };
36}
37#[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp"))]
38mod imports {
39 pub use super::webrtc_cpp::{
40 build_api, certificate_from_pem_key, webrtc_signal_send, Api, RTCCertificate, RTCChannel,
41 RTCConnection, RTCConnectionState, RTCSignalingError,
42 };
43}
44#[cfg(target_arch = "wasm32")]
45mod imports {
46 pub use super::web::{
47 build_api, certificate_from_pem_key, webrtc_signal_send, Api, RTCCertificate, RTCChannel,
48 RTCConnection, RTCConnectionState, RTCSignalingError,
49 };
50}
51
52use imports::*;
53pub use imports::{webrtc_signal_send, RTCSignalingError};
54
55use super::TaskSpawner;
56
57const CHUNK_SIZE: usize = 16 * 1024;
59
60pub enum Cmd {
61 PeerAdd { args: PeerAddArgs, aborted: Aborted },
62}
63
64#[derive(Debug)]
65pub enum PeerCmd {
66 PeerHttpOfferSend(String, webrtc::Offer),
67 AnswerSet(webrtc::Answer),
68 ConnectionAuthorizationSend(Option<ConnectionAuthEncrypted>),
69 ChannelOpen(ChannelId),
70 ChannelSend(MsgId, ChannelMsg),
71}
72
73enum PeerCmdInternal {
74 ChannelOpened(ChannelId, Result<RTCChannel, Error>),
75 ChannelClosed(ChannelId),
76}
77
78enum PeerCmdAll {
79 External(Box<PeerCmd>),
80 Internal(PeerCmdInternal),
81}
82
83pub struct P2pServiceCtx {
84 pub cmd_sender: mpsc::TrackedUnboundedSender<Cmd>,
85 pub peers: BTreeMap<PeerId, PeerState>,
86}
87
88pub struct PeerAddArgs {
89 peer_id: PeerId,
90 kind: PeerConnectionKind,
91 event_sender: Arc<dyn Fn(P2pEvent) -> Option<()> + Send + Sync + 'static>,
92 cmd_receiver: mpsc::TrackedUnboundedReceiver<PeerCmd>,
93}
94
95pub enum PeerConnectionKind {
96 Outgoing,
97 Incoming(Box<webrtc::Offer>),
98}
99
100pub struct PeerState {
101 pub cmd_sender: mpsc::TrackedUnboundedSender<PeerCmd>,
102 pub abort: Aborter,
103}
104
105#[derive(thiserror::Error, derive_more::From, Debug)]
106pub(super) enum Error {
107 #[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-rs"))]
108 #[error("{0}")]
109 Rtc(::webrtc::Error),
110 #[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp"))]
111 #[error("{0}")]
112 Rtc(::datachannel::Error),
113 #[cfg(target_arch = "wasm32")]
114 #[error("js error: {0:?}")]
115 RtcJs(String),
116 #[error("signaling error: {0}")]
117 Signaling(RTCSignalingError),
118 #[error("unexpected cmd received")]
119 UnexpectedCmd,
120 #[from(ignore)]
121 #[error("channel closed")]
122 ChannelClosed,
123}
124
125#[cfg(target_arch = "wasm32")]
126impl From<wasm_bindgen::JsValue> for Error {
127 fn from(value: wasm_bindgen::JsValue) -> Self {
128 Error::RtcJs(format!("{value:?}"))
129 }
130}
131
132pub type OnConnectionStateChangeHdlrFn = Box<
133 dyn (FnMut(RTCConnectionState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
134 + Send
135 + Sync,
136>;
137
138pub struct RTCConfig {
139 pub ice_servers: RTCConfigIceServers,
140 pub certificate: RTCCertificate,
141 pub seed: [u8; 32],
142}
143
144#[derive(Serialize)]
145pub struct RTCConfigIceServers(Vec<RTCConfigIceServer>);
146#[derive(Serialize)]
147pub struct RTCConfigIceServer {
148 pub urls: Vec<String>,
149 pub username: Option<String>,
150 pub credential: Option<String>,
151}
152
153#[derive(Serialize)]
154pub struct RTCChannelConfig {
155 pub label: &'static str,
156 pub negotiated: Option<u16>,
157}
158
159impl Default for RTCConfigIceServers {
160 fn default() -> Self {
161 Self(vec![
162 RTCConfigIceServer {
163 urls: vec!["stun:65.109.110.75:3478".to_owned()],
164 username: Some("mina".to_owned()),
165 credential: Some("webrtc".to_owned()),
166 },
167 RTCConfigIceServer {
168 urls: vec!["stun:176.9.147.28:3478".to_owned()],
169 username: None,
170 credential: None,
171 },
172 RTCConfigIceServer {
173 urls: vec![
174 "stun:stun.l.google.com:19302".to_owned(),
175 "stun:stun1.l.google.com:19302".to_owned(),
176 "stun:stun2.l.google.com:19302".to_owned(),
177 "stun:stun3.l.google.com:19302".to_owned(),
178 "stun:stun4.l.google.com:19302".to_owned(),
179 ],
180 username: None,
181 credential: None,
182 },
183 ])
184 }
185}
186
187impl std::ops::Deref for RTCConfigIceServers {
188 type Target = Vec<RTCConfigIceServer>;
189
190 fn deref(&self) -> &Self::Target {
191 &self.0
192 }
193}
194
195#[cfg(not(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp")))]
196impl Drop for RTCConnection {
197 fn drop(&mut self) {
198 if self.is_main() {
199 let cloned = self.clone();
200 spawn_local(async move {
201 let _ = cloned.close().await;
202 });
203 }
204 }
205}
206
207async fn sleep(dur: Duration) {
208 #[cfg(not(target_arch = "wasm32"))]
209 let fut = tokio::time::sleep(dur);
210 #[cfg(target_arch = "wasm32")]
211 let fut = gloo_timers::future::TimeoutFuture::new(dur.as_millis() as u32);
212 fut.await
213}
214
215async fn wait_for_ice_gathering_complete(pc: &mut RTCConnection) {
216 let timeout = sleep(Duration::from_secs(3));
217
218 tokio::select! {
219 _ = timeout => {}
220 _ = pc.wait_for_ice_gathering_complete() => {}
221 }
222}
223
224async fn peer_start(
225 api: Api,
226 args: PeerAddArgs,
227 abort: Aborted,
228 closed: mpsc::Sender<()>,
229 certificate: RTCCertificate,
230 rng_seed: [u8; 32],
231) {
232 let PeerAddArgs {
233 peer_id,
234 kind,
235 event_sender,
236 mut cmd_receiver,
237 } = args;
238 let is_outgoing = matches!(kind, PeerConnectionKind::Outgoing);
239
240 let config = RTCConfig {
241 ice_servers: Default::default(),
242 certificate,
243 seed: rng_seed,
244 };
245 let fut = async {
246 let mut pc = RTCConnection::create(&api, config).await?;
247 let main_channel = pc
248 .channel_create(RTCChannelConfig {
249 label: "",
250 negotiated: Some(0),
251 })
252 .await?;
253
254 let offer = match kind {
255 PeerConnectionKind::Incoming(offer) => (*offer).try_into()?,
256 PeerConnectionKind::Outgoing => pc.offer_create().await?,
257 };
258
259 if is_outgoing {
260 pc.local_desc_set(offer).await?;
261 wait_for_ice_gathering_complete(&mut pc).await;
262 } else {
263 pc.remote_desc_set(offer).await?;
264 }
265
266 Result::<_, Error>::Ok((pc, main_channel))
267 };
268
269 #[allow(unused_mut)]
270 let (mut pc, mut main_channel) = match fut.await {
271 Ok(v) => v,
272 Err(err) => {
273 event_sender(P2pConnectionEvent::OfferSdpReady(peer_id, Err(err.to_string())).into());
274 return;
275 }
276 };
277
278 let (main_channel_open_tx, main_channel_open) = oneshot::channel::<()>();
279 let mut main_channel_open_tx = Some(main_channel_open_tx);
280 main_channel.on_open(move || {
281 if let Some(tx) = main_channel_open_tx.take() {
282 let _ = tx.send(());
283 }
284 std::future::ready(())
285 });
286
287 let answer = if is_outgoing {
288 let answer_fut = async {
289 let sdp = pc.local_sdp().await.unwrap();
290 event_sender(P2pConnectionEvent::OfferSdpReady(peer_id, Ok(sdp)).into())
291 .ok_or(Error::ChannelClosed)?;
292 match cmd_receiver.recv().await.ok_or(Error::ChannelClosed)?.0 {
293 PeerCmd::PeerHttpOfferSend(url, offer) => {
294 let answer = webrtc_signal_send(&url, offer).await?;
295 event_sender(P2pConnectionEvent::AnswerReceived(peer_id, answer).into())
296 .ok_or(Error::ChannelClosed)?;
297
298 if let PeerCmd::AnswerSet(v) =
299 cmd_receiver.recv().await.ok_or(Error::ChannelClosed)?.0
300 {
301 return Ok(v);
302 }
303 }
304 PeerCmd::AnswerSet(v) => return Ok(v),
305 _cmd => {
306 return Err(Error::UnexpectedCmd);
307 }
308 }
309 Err(Error::ChannelClosed)
310 };
311 answer_fut.await.and_then(|v| Ok(v.try_into()?))
312 } else {
313 pc.answer_create().await.map_err(Error::from)
314 };
315 let Ok(answer) = answer else {
316 return;
317 };
318
319 if is_outgoing {
320 if let Err(err) = pc.remote_desc_set(answer).await {
321 let err = Error::from(err).to_string();
322 let _ = event_sender(P2pConnectionEvent::Finalized(peer_id, Err(err)).into());
323 }
324 } else {
325 let fut = async {
326 pc.local_desc_set(answer).await?;
327 wait_for_ice_gathering_complete(&mut pc).await;
328 Ok(pc.local_sdp().await.unwrap())
329 };
330 let res = fut.await.map_err(|err: Error| err.to_string());
331 let is_err = res.is_err();
332 let is_err = is_err
333 || event_sender(P2pConnectionEvent::AnswerSdpReady(peer_id, res).into()).is_none();
334 if is_err {
335 return;
336 }
337 }
338
339 let (connected_tx, connected) = oneshot::channel();
340 if matches!(pc.connection_state(), RTCConnectionState::Connected) {
341 connected_tx.send(Ok(())).unwrap();
342 } else {
343 let mut connected_tx = Some(connected_tx);
344 pc.on_connection_state_change(Box::new(move |state| {
345 match state {
346 RTCConnectionState::Connected => {
347 if let Some(connected_tx) = connected_tx.take() {
348 let _ = connected_tx.send(Ok(()));
349 }
350 }
351 RTCConnectionState::Disconnected | RTCConnectionState::Closed => {
352 if let Some(connected_tx) = connected_tx.take() {
353 let _ = connected_tx.send(Err("disconnected"));
354 } else {
355 let _ = closed.try_send(());
356 }
357 }
358 _ => {}
359 }
360 Box::pin(std::future::ready(()))
361 }));
362 }
363 match connected
364 .await
365 .map_err(|_| Error::ChannelClosed.to_string())
366 .and_then(|res| res.map_err(|v| v.to_string()))
367 {
368 Ok(_) => {}
369 Err(err) => {
370 let _ = event_sender(P2pConnectionEvent::Finalized(peer_id, Err(err)).into());
371 return;
372 }
373 }
374
375 let (remote_auth_tx, remote_auth_rx) = oneshot::channel::<ConnectionAuthEncrypted>();
378 let mut remote_auth_tx = Some(remote_auth_tx);
379 main_channel.on_message(move |data| {
380 if let Some(tx) = remote_auth_tx.take() {
381 if let Ok(auth) = data.try_into() {
382 let _ = tx.send(auth);
383 }
384 }
385 #[cfg(not(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp")))]
386 std::future::ready(())
387 });
388 let msg = match cmd_receiver.recv().await {
389 None => return,
390 Some(msg) => msg,
391 };
392 match msg.0 {
393 PeerCmd::ConnectionAuthorizationSend(None) => {
394 return;
396 }
397 PeerCmd::ConnectionAuthorizationSend(Some(auth)) => {
398 let _ = main_channel_open.await;
399
400 sleep(Duration::from_secs(1)).await;
404 let _ = main_channel
405 .send(&bytes::Bytes::copy_from_slice(auth.as_ref()))
406 .await;
407
408 let res = match remote_auth_rx.await {
409 Err(_) => Err("didn't receive connection authentication message".to_owned()),
410 Ok(remote_auth) => Ok(remote_auth),
411 };
412 let is_err = res.is_err();
413 let _ = event_sender(P2pConnectionEvent::Finalized(peer_id, res).into());
414 if is_err {
415 return;
416 }
417 }
418 cmd => {
419 bug_condition!("unexpected peer cmd! Expected `PeerCmd::ConnectionAuthorizationSend`. received: {cmd:?}");
420 return;
421 }
422 }
423
424 let _ = main_channel.close().await;
425
426 peer_loop(peer_id, event_sender, cmd_receiver, pc, abort).await
427}
428
429struct Channel {
430 id: ChannelId,
431 msg_sender: ChannelMsgSender,
432}
433
434type ChannelMsgSender = mpsc::UnboundedSender<(MsgId, Vec<u8>, Option<mpsc::Tracker>)>;
435
436struct MsgBuffer {
437 buf: Vec<u8>,
438}
439
440impl MsgBuffer {
441 fn new(capacity: usize) -> Self {
442 Self {
443 buf: Vec::with_capacity(capacity),
444 }
445 }
446
447 fn encode(&mut self, msg: &ChannelMsg) -> Result<Vec<u8>, std::io::Error> {
448 msg.encode(&mut self.buf)?;
449 let len_encoded = (self.buf.len() as u32).to_be_bytes();
450 let encoded = len_encoded
451 .into_iter()
452 .chain(self.buf.iter().cloned())
453 .collect();
454 self.buf.clear();
455 Ok(encoded)
456 }
457}
458
459struct Channels {
460 list: Vec<Channel>,
461}
462
463impl Channels {
464 fn new() -> Self {
465 Self {
466 list: Vec::with_capacity(32),
467 }
468 }
469
470 fn get_msg_sender(&self, id: ChannelId) -> Option<&ChannelMsgSender> {
471 self.list.iter().find(|c| c.id == id).map(|c| &c.msg_sender)
472 }
473
474 fn add(&mut self, id: ChannelId, msg_sender: ChannelMsgSender) {
475 self.list.push(Channel { id, msg_sender });
476 }
477
478 fn remove(&mut self, id: ChannelId) -> bool {
479 match self.list.iter().position(|c| c.id == id) {
480 None => false,
481 Some(index) => {
482 self.list.remove(index);
483 true
484 }
485 }
486 }
487}
488
489#[allow(unused_mut)]
491async fn peer_loop(
492 peer_id: PeerId,
493 event_sender: Arc<dyn Fn(P2pEvent) -> Option<()> + Send + Sync + 'static>,
494 mut cmd_receiver: mpsc::TrackedUnboundedReceiver<PeerCmd>,
495 mut pc: RTCConnection,
496 aborted: Aborted,
497) {
498 let mut channels = Channels::new();
500 let mut msg_buf = MsgBuffer::new(64 * 1024);
501
502 let (internal_cmd_sender, mut internal_cmd_receiver) =
503 mpsc::unbounded_channel::<PeerCmdInternal>();
504
505 while matches!(pc.connection_state(), RTCConnectionState::Connected) {
506 let (cmd, _tracker) = tokio::select! {
507 cmd = cmd_receiver.recv() => match cmd {
508 None => return,
509 Some(cmd) => (PeerCmdAll::External(Box::new(cmd.0)), Some(cmd.1)),
510 },
511 cmd = internal_cmd_receiver.recv() => match cmd {
512 None => return,
513 Some(cmd) => (PeerCmdAll::Internal(cmd), None),
514 },
515 };
516 match cmd {
517 PeerCmdAll::External(cmd) => match *cmd {
518 PeerCmd::PeerHttpOfferSend(..)
519 | PeerCmd::AnswerSet(_)
520 | PeerCmd::ConnectionAuthorizationSend(_) => {
521 bug_condition!("unexpected peer cmd");
522 }
523 PeerCmd::ChannelOpen(id) => {
524 let chan = pc
525 .channel_create(RTCChannelConfig {
526 label: id.name(),
527 negotiated: Some(id.to_u16()),
528 })
529 .await;
530 let internal_cmd_sender = internal_cmd_sender.clone();
531 let fut = async move {
532 let internal_cmd_sender_clone = internal_cmd_sender.clone();
533 let result = async move {
534 let chan = chan?;
535
536 let (done_tx, mut done_rx) = mpsc::channel::<Result<(), Error>>(1);
537
538 let done_tx_clone = done_tx.clone();
539 chan.on_open(move || {
540 let _ = done_tx_clone.try_send(Ok(()));
541 std::future::ready(())
542 });
543
544 let done_tx_clone = done_tx.clone();
545 let internal_cmd_sender = internal_cmd_sender_clone.clone();
546 chan.on_error(move |err| {
547 if done_tx_clone.try_send(Err(err.into())).is_err() {
548 let _ = internal_cmd_sender
549 .send(PeerCmdInternal::ChannelClosed(id));
550 }
551 std::future::ready(())
552 });
553
554 let done_tx_clone = done_tx.clone();
555 let internal_cmd_sender = internal_cmd_sender_clone.clone();
556 chan.on_close(move || {
557 if done_tx_clone.try_send(Err(Error::ChannelClosed)).is_err() {
558 let _ = internal_cmd_sender
559 .send(PeerCmdInternal::ChannelClosed(id));
560 }
561 std::future::ready(())
562 });
563
564 done_rx.recv().await.ok_or(Error::ChannelClosed)??;
565
566 Ok(chan)
567 };
568
569 let _ = internal_cmd_sender
570 .send(PeerCmdInternal::ChannelOpened(id, result.await));
571 };
572 let mut aborted = aborted.clone();
573 spawn_local(async move {
574 tokio::select! {
575 _ = aborted.wait() => {}
576 _ = fut => {}
577 }
578 });
579 }
580 PeerCmd::ChannelSend(msg_id, msg) => {
581 let id = msg.channel_id();
582 let err = match channels.get_msg_sender(id) {
583 Some(msg_sender) => match msg_buf.encode(&msg) {
584 Ok(encoded) => match msg_sender.send((msg_id, encoded, _tracker)) {
585 Ok(_) => None,
586 Err(_) => Some("ChannelMsgMpscSendFailed".to_owned()),
587 },
588 Err(err) => Some(err.to_string()),
589 },
590 None => Some("ChannelNotOpen".to_owned()),
591 };
592 if let Some(err) = err {
593 let _ = event_sender(
594 P2pChannelEvent::Sent(peer_id, id, msg_id, Err(err)).into(),
595 );
596 }
597 }
598 },
599 PeerCmdAll::Internal(PeerCmdInternal::ChannelOpened(chan_id, result)) => {
600 let (sender_tx, mut sender_rx) = mpsc::unbounded_channel();
601 let (chan, res) = match result {
602 Ok(chan) => {
603 channels.add(chan_id, sender_tx);
604 (Some(chan), Ok(()))
605 }
606 Err(err) => (None, Err(err.to_string())),
607 };
608
609 #[allow(unused_mut)]
610 if let Some(mut chan) = chan {
611 fn process_msg(
612 chan_id: ChannelId,
613 buf: &mut Vec<u8>,
614 len: &mut u32,
615 msg: &mut &[u8],
616 ) -> Result<Option<ChannelMsg>, String> {
617 let len = if buf.is_empty() {
618 if msg.len() < 4 {
619 return Err("WebRTCMessageTooSmall".to_owned());
620 } else {
621 *len = u32::from_be_bytes(
622 msg[..4].try_into().expect("Size checked above"),
623 );
624 *msg = &msg[4..];
625 let len = *len as usize;
626 if len > chan_id.max_msg_size() {
627 return Err(format!(
628 "ChannelMsgLenOverLimit; len: {}, limit: {}",
629 len,
630 chan_id.max_msg_size()
631 ));
632 }
633 len
634 }
635 } else {
636 *len as usize
637 };
638 let bytes_left = len - buf.len();
639
640 if bytes_left > msg.len() {
641 buf.extend_from_slice(msg);
642 *msg = &[];
643 return Ok(None);
644 }
645
646 buf.extend_from_slice(&msg[..bytes_left]);
647 *msg = &msg[bytes_left..];
648 let msg = ChannelMsg::decode(&mut &buf[..], chan_id)
649 .map_err(|err| err.to_string())?;
650 buf.clear();
651 Ok(Some(msg))
652 }
653
654 let mut len = 0;
655 let mut buf = Vec::new();
656 let event_sender_clone = event_sender.clone();
657
658 chan.on_message(move |mut data| {
659 while !data.is_empty() {
660 let res = match process_msg(chan_id, &mut buf, &mut len, &mut data) {
661 Ok(None) => continue,
662 Ok(Some(msg)) => Ok(Box::new(msg)),
663 Err(err) => Err(err),
664 };
665 let _ =
666 event_sender_clone(P2pChannelEvent::Received(peer_id, res).into());
667 }
668 #[cfg(not(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp")))]
669 std::future::ready(())
670 });
671
672 let event_sender = event_sender.clone();
673 let fut = async move {
674 sleep(Duration::from_secs(3)).await;
678
679 while let Some((msg_id, encoded, _tracker)) = sender_rx.recv().await {
680 let encoded = bytes::Bytes::from(encoded);
681 let mut chunks =
682 encoded.chunks(CHUNK_SIZE).map(|b| encoded.slice_ref(b));
683 let result = loop {
684 let Some(chunk) = chunks.next() else {
685 break Ok(());
686 };
687 if let Err(err) = chan
688 .send(&chunk)
689 .await
690 .map_err(|e| format!("{e:?}"))
691 .and_then(|n| match n == chunk.len() {
692 false => Err("NotAllBytesWritten".to_owned()),
693 true => Ok(()),
694 })
695 {
696 break Err(err);
697 }
698 };
699
700 let _ = event_sender(
701 P2pChannelEvent::Sent(peer_id, chan_id, msg_id, result).into(),
702 );
703 }
704 };
705
706 let mut aborted = aborted.clone();
707 spawn_local(async move {
708 tokio::select! {
709 _ = aborted.wait() => {}
710 _ = fut => {}
711 }
712 });
713 }
714
715 let _ = event_sender(P2pChannelEvent::Opened(peer_id, chan_id, res).into());
716 }
717 PeerCmdAll::Internal(PeerCmdInternal::ChannelClosed(id)) => {
718 channels.remove(id);
719 let _ = event_sender(P2pChannelEvent::Closed(peer_id, id).into());
720 }
721 }
722 }
723}
724
725pub trait P2pServiceWebrtc: redux::Service {
726 type Event: From<P2pEvent> + Send + Sync + 'static;
727
728 fn random_pick(
729 &mut self,
730 list: &[P2pConnectionOutgoingInitOpts],
731 ) -> Option<P2pConnectionOutgoingInitOpts>;
732
733 fn event_sender(&self) -> &mpsc::UnboundedSender<Self::Event>;
734
735 fn cmd_sender(&self) -> &mpsc::TrackedUnboundedSender<Cmd>;
736
737 fn peers(&mut self) -> &mut BTreeMap<PeerId, PeerState>;
738
739 fn init<S: TaskSpawner>(
740 secret_key: SecretKey,
741 spawner: S,
742 rng_seed: [u8; 32],
743 ) -> P2pServiceCtx {
744 const MAX_PEERS: usize = 500;
745 let (cmd_sender, mut cmd_receiver) = mpsc::tracked_unbounded_channel();
746
747 let certificate = certificate_from_pem_key(secret_key.to_pem().as_str());
748
749 spawner.spawn_main("webrtc", async move {
750 #[allow(clippy::all)]
751 let api = build_api();
752 let conn_permits = Arc::new(Semaphore::const_new(MAX_PEERS));
753 while let Some(cmd) = cmd_receiver.recv().await {
754 match cmd.0 {
755 Cmd::PeerAdd { args, aborted } => {
756 #[allow(clippy::all)]
757 let api = api.clone();
758 let conn_permits = conn_permits.clone();
759 let peer_id = args.peer_id;
760 let event_sender = args.event_sender.clone();
761 let certificate = certificate.clone();
762 spawn_local(async move {
763 let Ok(_permit) = conn_permits.try_acquire() else {
764 bug_condition!("P2P WebRTC Semaphore acquisition failed!");
766 return;
767 };
768 let (closed_tx, mut closed) = mpsc::channel(1);
769 let event_sender_clone = event_sender.clone();
770 spawn_local(async move {
771 let _ = closed.recv().await;
773 event_sender_clone(P2pConnectionEvent::Closed(peer_id).into());
774 });
775 tokio::select! {
776 _ = peer_start(api, args, aborted.clone(), closed_tx.clone(), certificate, rng_seed) => {}
777 _ = aborted.wait() => {
778 }
779 }
780
781 sleep(Duration::from_millis(100)).await;
783 let _ = closed_tx.send(()).await;
784 });
785 }
786 }
787 }
788 });
789
790 P2pServiceCtx {
791 cmd_sender,
792 peers: Default::default(),
793 }
794 }
795
796 fn outgoing_init(&mut self, peer_id: PeerId) {
797 let (peer_cmd_sender, peer_cmd_receiver) = mpsc::tracked_unbounded_channel();
798 let aborter = Aborter::default();
799 let aborted = aborter.aborted();
800
801 self.peers().insert(
802 peer_id,
803 PeerState {
804 cmd_sender: peer_cmd_sender,
805 abort: aborter,
806 },
807 );
808 let event_sender = self.event_sender().clone();
809 let event_sender =
810 Arc::new(move |p2p_event: P2pEvent| event_sender.send(p2p_event.into()).ok());
811 let _ = self.cmd_sender().tracked_send(Cmd::PeerAdd {
812 args: PeerAddArgs {
813 peer_id,
814 kind: PeerConnectionKind::Outgoing,
815 event_sender,
816 cmd_receiver: peer_cmd_receiver,
817 },
818 aborted,
819 });
820 }
821
822 fn incoming_init(&mut self, peer_id: PeerId, offer: webrtc::Offer) {
823 let (peer_cmd_sender, peer_cmd_receiver) = mpsc::tracked_unbounded_channel();
824 let aborter = Aborter::default();
825 let aborted = aborter.aborted();
826
827 self.peers().insert(
828 peer_id,
829 PeerState {
830 cmd_sender: peer_cmd_sender,
831 abort: aborter,
832 },
833 );
834 let event_sender = self.event_sender().clone();
835 let event_sender =
836 Arc::new(move |p2p_event: P2pEvent| event_sender.send(p2p_event.into()).ok());
837 let _ = self.cmd_sender().tracked_send(Cmd::PeerAdd {
838 args: PeerAddArgs {
839 peer_id,
840 kind: PeerConnectionKind::Incoming(Box::new(offer)),
841 event_sender,
842 cmd_receiver: peer_cmd_receiver,
843 },
844 aborted,
845 });
846 }
847
848 fn set_answer(&mut self, peer_id: PeerId, answer: webrtc::Answer) {
849 if let Some(peer) = self.peers().get(&peer_id) {
850 let _ = peer.cmd_sender.tracked_send(PeerCmd::AnswerSet(answer));
851 }
852 }
853
854 fn http_signaling_request(&mut self, url: String, offer: webrtc::Offer) {
855 if let Some(peer) = self.peers().get(&offer.target_peer_id) {
856 let _ = peer
857 .cmd_sender
858 .tracked_send(PeerCmd::PeerHttpOfferSend(url, offer));
859 }
860 }
861
862 fn disconnect(&mut self, peer_id: PeerId) -> bool {
863 if let Some(_peer) = self.peers().remove(&peer_id) {
867 } else {
872 mina_core::error!(mina_core::log::system_time(); "`disconnect` shouldn't be used for libp2p peers");
873 }
874 true
875 }
876
877 fn channel_open(&mut self, peer_id: PeerId, id: ChannelId) {
878 if let Some(peer) = self.peers().get(&peer_id) {
879 let _ = peer.cmd_sender.tracked_send(PeerCmd::ChannelOpen(id));
880 }
881 }
882
883 fn channel_send(&mut self, peer_id: PeerId, msg_id: MsgId, msg: ChannelMsg) {
884 if let Some(peer) = self.peers().get(&peer_id) {
885 let _ = peer
886 .cmd_sender
887 .tracked_send(PeerCmd::ChannelSend(msg_id, msg));
888 }
889 }
890
891 fn encrypt<T: EncryptableType>(
892 &mut self,
893 other_pk: &PublicKey,
894 message: &T,
895 ) -> Result<T::Encrypted, Box<dyn std::error::Error>>;
896
897 fn decrypt<T: EncryptableType>(
898 &mut self,
899 other_pub_key: &PublicKey,
900 encrypted: &T::Encrypted,
901 ) -> Result<T, Box<dyn std::error::Error>>;
902
903 fn auth_send(
904 &mut self,
905 peer_id: PeerId,
906 _other_pub_key: &PublicKey,
907 auth: Option<ConnectionAuthEncrypted>,
908 ) {
909 if let Some(peer) = self.peers().get(&peer_id) {
910 let _ = peer
911 .cmd_sender
912 .tracked_send(PeerCmd::ConnectionAuthorizationSend(auth));
913 }
914 }
915
916 fn auth_encrypt_and_send(
917 &mut self,
918 peer_id: PeerId,
919 other_pub_key: &PublicKey,
920 auth: ConnectionAuth,
921 );
922
923 fn auth_decrypt(
924 &mut self,
925 other_pub_key: &PublicKey,
926 auth: ConnectionAuthEncrypted,
927 ) -> Option<ConnectionAuth>;
928}
929
930impl P2pServiceCtx {
931 pub fn pending_cmds(&self) -> usize {
932 self.peers
933 .iter()
934 .fold(self.cmd_sender.len(), |acc, (_, peer)| {
935 acc + peer.cmd_sender.len()
936 })
937 }
938}