1#[cfg(target_arch = "wasm32")]
2mod web;
3#[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp"))]
4mod webrtc_cpp;
5#[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-rs"))]
6mod webrtc_rs;
7
8use std::{collections::BTreeMap, future::Future, pin::Pin, sync::Arc, time::Duration};
9
10use openmina_core::bug_condition;
11use serde::Serialize;
12use tokio::sync::Semaphore;
13
14#[cfg(not(target_arch = "wasm32"))]
15use tokio::task::spawn_local;
16#[cfg(target_arch = "wasm32")]
17use wasm_bindgen_futures::spawn_local;
18
19use openmina_core::channels::{mpsc, oneshot, Aborted, Aborter};
20
21use crate::{
22 channels::{ChannelId, ChannelMsg, MsgId},
23 connection::outgoing::P2pConnectionOutgoingInitOpts,
24 identity::{EncryptableType, PublicKey, SecretKey},
25 webrtc,
26 webrtc::{ConnectionAuth, ConnectionAuthEncrypted},
27 P2pChannelEvent, P2pConnectionEvent, P2pEvent, PeerId,
28};
29
30#[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-rs"))]
31mod imports {
32 pub use super::webrtc_rs::{
33 build_api, certificate_from_pem_key, webrtc_signal_send, Api, RTCCertificate, RTCChannel,
34 RTCConnection, RTCConnectionState, RTCSignalingError,
35 };
36}
37#[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp"))]
38mod imports {
39 pub use super::webrtc_cpp::{
40 build_api, certificate_from_pem_key, webrtc_signal_send, Api, RTCCertificate, RTCChannel,
41 RTCConnection, RTCConnectionState, RTCSignalingError,
42 };
43}
44#[cfg(target_arch = "wasm32")]
45mod imports {
46 pub use super::web::{
47 build_api, certificate_from_pem_key, webrtc_signal_send, Api, RTCCertificate, RTCChannel,
48 RTCConnection, RTCConnectionState, RTCSignalingError,
49 };
50}
51
52use imports::*;
53pub use imports::{webrtc_signal_send, RTCSignalingError};
54
55use super::TaskSpawner;
56
57const CHUNK_SIZE: usize = 16 * 1024;
59
60pub enum Cmd {
61 PeerAdd { args: PeerAddArgs, aborted: Aborted },
62}
63
64#[derive(Debug)]
65pub enum PeerCmd {
66 PeerHttpOfferSend(String, webrtc::Offer),
67 AnswerSet(webrtc::Answer),
68 ConnectionAuthorizationSend(Option<ConnectionAuthEncrypted>),
69 ChannelOpen(ChannelId),
70 ChannelSend(MsgId, ChannelMsg),
71}
72
73enum PeerCmdInternal {
74 ChannelOpened(ChannelId, Result<RTCChannel, Error>),
75 ChannelClosed(ChannelId),
76}
77
78enum PeerCmdAll {
79 External(PeerCmd),
80 Internal(PeerCmdInternal),
81}
82
83pub struct P2pServiceCtx {
84 pub cmd_sender: mpsc::TrackedUnboundedSender<Cmd>,
85 pub peers: BTreeMap<PeerId, PeerState>,
86}
87
88pub struct PeerAddArgs {
89 peer_id: PeerId,
90 kind: PeerConnectionKind,
91 event_sender: Arc<dyn Fn(P2pEvent) -> Option<()> + Send + Sync + 'static>,
92 cmd_receiver: mpsc::TrackedUnboundedReceiver<PeerCmd>,
93}
94
95pub enum PeerConnectionKind {
96 Outgoing,
97 Incoming(Box<webrtc::Offer>),
98}
99
100pub struct PeerState {
101 pub cmd_sender: mpsc::TrackedUnboundedSender<PeerCmd>,
102 pub abort: Aborter,
103}
104
105#[derive(thiserror::Error, derive_more::From, Debug)]
106pub(super) enum Error {
107 #[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-rs"))]
108 #[error("{0}")]
109 Rtc(::webrtc::Error),
110 #[cfg(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp"))]
111 #[error("{0}")]
112 Rtc(::datachannel::Error),
113 #[cfg(target_arch = "wasm32")]
114 #[error("js error: {0:?}")]
115 RtcJs(String),
116 #[error("signaling error: {0}")]
117 Signaling(RTCSignalingError),
118 #[error("unexpected cmd received")]
119 UnexpectedCmd,
120 #[from(ignore)]
121 #[error("channel closed")]
122 ChannelClosed,
123}
124
125#[cfg(target_arch = "wasm32")]
126impl From<wasm_bindgen::JsValue> for Error {
127 fn from(value: wasm_bindgen::JsValue) -> Self {
128 Error::RtcJs(format!("{value:?}"))
129 }
130}
131
132pub type OnConnectionStateChangeHdlrFn = Box<
133 dyn (FnMut(RTCConnectionState) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>)
134 + Send
135 + Sync,
136>;
137
138pub struct RTCConfig {
139 pub ice_servers: RTCConfigIceServers,
140 pub certificate: RTCCertificate,
141 pub seed: [u8; 32],
142}
143
144#[derive(Serialize)]
145pub struct RTCConfigIceServers(Vec<RTCConfigIceServer>);
146#[derive(Serialize)]
147pub struct RTCConfigIceServer {
148 pub urls: Vec<String>,
149 pub username: Option<String>,
150 pub credential: Option<String>,
151}
152
153#[derive(Serialize)]
154pub struct RTCChannelConfig {
155 pub label: &'static str,
156 pub negotiated: Option<u16>,
157}
158
159impl Default for RTCConfigIceServers {
160 fn default() -> Self {
161 Self(vec![
162 RTCConfigIceServer {
163 urls: vec!["stun:65.109.110.75:3478".to_owned()],
164 username: Some("openmina".to_owned()),
165 credential: Some("webrtc".to_owned()),
166 },
167 RTCConfigIceServer {
168 urls: vec!["stun:176.9.147.28:3478".to_owned()],
169 username: None,
170 credential: None,
171 },
172 RTCConfigIceServer {
173 urls: vec![
174 "stun:stun.l.google.com:19302".to_owned(),
175 "stun:stun1.l.google.com:19302".to_owned(),
176 "stun:stun2.l.google.com:19302".to_owned(),
177 "stun:stun3.l.google.com:19302".to_owned(),
178 "stun:stun4.l.google.com:19302".to_owned(),
179 ],
180 username: None,
181 credential: None,
182 },
183 ])
184 }
185}
186
187impl std::ops::Deref for RTCConfigIceServers {
188 type Target = Vec<RTCConfigIceServer>;
189
190 fn deref(&self) -> &Self::Target {
191 &self.0
192 }
193}
194
195#[cfg(not(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp")))]
196impl Drop for RTCConnection {
197 fn drop(&mut self) {
198 if self.is_main() {
199 let cloned = self.clone();
200 spawn_local(async move {
201 let _ = cloned.close().await;
202 });
203 }
204 }
205}
206
207async fn sleep(dur: Duration) {
208 #[cfg(not(target_arch = "wasm32"))]
209 let fut = tokio::time::sleep(dur);
210 #[cfg(target_arch = "wasm32")]
211 let fut = gloo_timers::future::TimeoutFuture::new(dur.as_millis() as u32);
212 fut.await
213}
214
215async fn wait_for_ice_gathering_complete(pc: &mut RTCConnection) {
216 let timeout = sleep(Duration::from_secs(3));
217
218 tokio::select! {
219 _ = timeout => {}
220 _ = pc.wait_for_ice_gathering_complete() => {}
221 }
222}
223
224async fn peer_start(
225 api: Api,
226 args: PeerAddArgs,
227 abort: Aborted,
228 closed: mpsc::Sender<()>,
229 certificate: RTCCertificate,
230 rng_seed: [u8; 32],
231) {
232 let PeerAddArgs {
233 peer_id,
234 kind,
235 event_sender,
236 mut cmd_receiver,
237 } = args;
238 let is_outgoing = matches!(kind, PeerConnectionKind::Outgoing);
239
240 let config = RTCConfig {
241 ice_servers: Default::default(),
242 certificate,
243 seed: rng_seed,
244 };
245 let fut = async {
246 let mut pc = RTCConnection::create(&api, config).await?;
247 let main_channel = pc
248 .channel_create(RTCChannelConfig {
249 label: "",
250 negotiated: Some(0),
251 })
252 .await?;
253
254 let offer = match kind {
255 PeerConnectionKind::Incoming(offer) => (*offer).try_into()?,
256 PeerConnectionKind::Outgoing => pc.offer_create().await?,
257 };
258
259 if is_outgoing {
260 pc.local_desc_set(offer).await?;
261 wait_for_ice_gathering_complete(&mut pc).await;
262 } else {
263 pc.remote_desc_set(offer).await?;
264 }
265
266 Result::<_, Error>::Ok((pc, main_channel))
267 };
268
269 #[allow(unused_mut)]
270 let (mut pc, mut main_channel) = match fut.await {
271 Ok(v) => v,
272 Err(err) => {
273 event_sender(P2pConnectionEvent::OfferSdpReady(peer_id, Err(err.to_string())).into());
274 return;
275 }
276 };
277
278 let (main_channel_open_tx, main_channel_open) = oneshot::channel::<()>();
279 let mut main_channel_open_tx = Some(main_channel_open_tx);
280 main_channel.on_open(move || {
281 if let Some(tx) = main_channel_open_tx.take() {
282 let _ = tx.send(());
283 }
284 std::future::ready(())
285 });
286
287 let answer = if is_outgoing {
288 let answer_fut = async {
289 let sdp = pc.local_sdp().await.unwrap();
290 event_sender(P2pConnectionEvent::OfferSdpReady(peer_id, Ok(sdp)).into())
291 .ok_or(Error::ChannelClosed)?;
292 match cmd_receiver.recv().await.ok_or(Error::ChannelClosed)?.0 {
293 PeerCmd::PeerHttpOfferSend(url, offer) => {
294 let answer = webrtc_signal_send(&url, offer).await?;
295 event_sender(P2pConnectionEvent::AnswerReceived(peer_id, answer).into())
296 .ok_or(Error::ChannelClosed)?;
297
298 if let PeerCmd::AnswerSet(v) =
299 cmd_receiver.recv().await.ok_or(Error::ChannelClosed)?.0
300 {
301 return Ok(v);
302 }
303 }
304 PeerCmd::AnswerSet(v) => return Ok(v),
305 _cmd => {
306 return Err(Error::UnexpectedCmd);
307 }
308 }
309 Err(Error::ChannelClosed)
310 };
311 answer_fut.await.and_then(|v| Ok(v.try_into()?))
312 } else {
313 pc.answer_create().await.map_err(Error::from)
314 };
315 let Ok(answer) = answer else {
316 return;
317 };
318
319 if is_outgoing {
320 if let Err(err) = pc.remote_desc_set(answer).await {
321 let err = Error::from(err).to_string();
322 let _ = event_sender(P2pConnectionEvent::Finalized(peer_id, Err(err)).into());
323 }
324 } else {
325 let fut = async {
326 pc.local_desc_set(answer).await?;
327 wait_for_ice_gathering_complete(&mut pc).await;
328 Ok(pc.local_sdp().await.unwrap())
329 };
330 let res = fut.await.map_err(|err: Error| err.to_string());
331 let is_err = res.is_err();
332 let is_err = is_err
333 || event_sender(P2pConnectionEvent::AnswerSdpReady(peer_id, res).into()).is_none();
334 if is_err {
335 return;
336 }
337 }
338
339 let (connected_tx, connected) = oneshot::channel();
340 if matches!(pc.connection_state(), RTCConnectionState::Connected) {
341 connected_tx.send(Ok(())).unwrap();
342 } else {
343 let mut connected_tx = Some(connected_tx);
344 pc.on_connection_state_change(Box::new(move |state| {
345 match state {
346 RTCConnectionState::Connected => {
347 if let Some(connected_tx) = connected_tx.take() {
348 let _ = connected_tx.send(Ok(()));
349 }
350 }
351 RTCConnectionState::Disconnected | RTCConnectionState::Closed => {
352 if let Some(connected_tx) = connected_tx.take() {
353 let _ = connected_tx.send(Err("disconnected"));
354 } else {
355 let _ = closed.try_send(());
356 }
357 }
358 _ => {}
359 }
360 Box::pin(std::future::ready(()))
361 }));
362 }
363 match connected
364 .await
365 .map_err(|_| Error::ChannelClosed.to_string())
366 .and_then(|res| res.map_err(|v| v.to_string()))
367 {
368 Ok(_) => {}
369 Err(err) => {
370 let _ = event_sender(P2pConnectionEvent::Finalized(peer_id, Err(err)).into());
371 return;
372 }
373 }
374
375 let (remote_auth_tx, remote_auth_rx) = oneshot::channel::<ConnectionAuthEncrypted>();
378 let mut remote_auth_tx = Some(remote_auth_tx);
379 main_channel.on_message(move |data| {
380 if let Some(tx) = remote_auth_tx.take() {
381 if let Ok(auth) = data.try_into() {
382 let _ = tx.send(auth);
383 }
384 }
385 #[cfg(not(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp")))]
386 std::future::ready(())
387 });
388 let msg = match cmd_receiver.recv().await {
389 None => return,
390 Some(msg) => msg,
391 };
392 match msg.0 {
393 PeerCmd::ConnectionAuthorizationSend(None) => {
394 return;
396 }
397 PeerCmd::ConnectionAuthorizationSend(Some(auth)) => {
398 let _ = main_channel_open.await;
399
400 sleep(Duration::from_secs(1)).await;
404 let _ = main_channel
405 .send(&bytes::Bytes::copy_from_slice(auth.as_ref()))
406 .await;
407
408 let res = match remote_auth_rx.await {
409 Err(_) => Err("didn't receive connection authentication message".to_owned()),
410 Ok(remote_auth) => Ok(remote_auth),
411 };
412 let is_err = res.is_err();
413 let _ = event_sender(P2pConnectionEvent::Finalized(peer_id, res).into());
414 if is_err {
415 return;
416 }
417 }
418 cmd => {
419 bug_condition!("unexpected peer cmd! Expected `PeerCmd::ConnectionAuthorizationSend`. received: {cmd:?}");
420 return;
421 }
422 }
423
424 let _ = main_channel.close().await;
425
426 peer_loop(peer_id, event_sender, cmd_receiver, pc, abort).await
427}
428
429struct Channel {
430 id: ChannelId,
431 msg_sender: ChannelMsgSender,
432}
433
434type ChannelMsgSender = mpsc::UnboundedSender<(MsgId, Vec<u8>, Option<mpsc::Tracker>)>;
435
436struct MsgBuffer {
437 buf: Vec<u8>,
438}
439
440impl MsgBuffer {
441 fn new(capacity: usize) -> Self {
442 Self {
443 buf: Vec::with_capacity(capacity),
444 }
445 }
446
447 fn encode(&mut self, msg: &ChannelMsg) -> Result<Vec<u8>, std::io::Error> {
448 msg.encode(&mut self.buf)?;
449 let len_encoded = (self.buf.len() as u32).to_be_bytes();
450 let encoded = len_encoded
451 .into_iter()
452 .chain(self.buf.iter().cloned())
453 .collect();
454 self.buf.clear();
455 Ok(encoded)
456 }
457}
458
459struct Channels {
460 list: Vec<Channel>,
461}
462
463impl Channels {
464 fn new() -> Self {
465 Self {
466 list: Vec::with_capacity(32),
467 }
468 }
469
470 fn get_msg_sender(&self, id: ChannelId) -> Option<&ChannelMsgSender> {
471 self.list.iter().find(|c| c.id == id).map(|c| &c.msg_sender)
472 }
473
474 fn add(&mut self, id: ChannelId, msg_sender: ChannelMsgSender) {
475 self.list.push(Channel { id, msg_sender });
476 }
477
478 fn remove(&mut self, id: ChannelId) -> bool {
479 match self.list.iter().position(|c| c.id == id) {
480 None => false,
481 Some(index) => {
482 self.list.remove(index);
483 true
484 }
485 }
486 }
487}
488
489#[allow(unused_mut)]
491async fn peer_loop(
492 peer_id: PeerId,
493 event_sender: Arc<dyn Fn(P2pEvent) -> Option<()> + Send + Sync + 'static>,
494 mut cmd_receiver: mpsc::TrackedUnboundedReceiver<PeerCmd>,
495 mut pc: RTCConnection,
496 aborted: Aborted,
497) {
498 let mut channels = Channels::new();
500 let mut msg_buf = MsgBuffer::new(64 * 1024);
501
502 let (internal_cmd_sender, mut internal_cmd_receiver) =
503 mpsc::unbounded_channel::<PeerCmdInternal>();
504
505 while matches!(pc.connection_state(), RTCConnectionState::Connected) {
506 let (cmd, _tracker) = tokio::select! {
507 cmd = cmd_receiver.recv() => match cmd {
508 None => return,
509 Some(cmd) => (PeerCmdAll::External(cmd.0), Some(cmd.1)),
510 },
511 cmd = internal_cmd_receiver.recv() => match cmd {
512 None => return,
513 Some(cmd) => (PeerCmdAll::Internal(cmd), None),
514 },
515 };
516 match cmd {
517 PeerCmdAll::External(
518 PeerCmd::PeerHttpOfferSend(..)
519 | PeerCmd::AnswerSet(_)
520 | PeerCmd::ConnectionAuthorizationSend(_),
521 ) => {
522 bug_condition!("unexpected peer cmd");
523 }
524 PeerCmdAll::External(PeerCmd::ChannelOpen(id)) => {
525 let chan = pc
526 .channel_create(RTCChannelConfig {
527 label: id.name(),
528 negotiated: Some(id.to_u16()),
529 })
530 .await;
531 let internal_cmd_sender = internal_cmd_sender.clone();
532 let fut = async move {
533 let internal_cmd_sender_clone = internal_cmd_sender.clone();
534 let result = async move {
535 let chan = chan?;
536
537 let (done_tx, mut done_rx) = mpsc::channel::<Result<(), Error>>(1);
538
539 let done_tx_clone = done_tx.clone();
540 chan.on_open(move || {
541 let _ = done_tx_clone.try_send(Ok(()));
542 std::future::ready(())
543 });
544
545 let done_tx_clone = done_tx.clone();
546 let internal_cmd_sender = internal_cmd_sender_clone.clone();
547 chan.on_error(move |err| {
548 if done_tx_clone.try_send(Err(err.into())).is_err() {
549 let _ =
550 internal_cmd_sender.send(PeerCmdInternal::ChannelClosed(id));
551 }
552 std::future::ready(())
553 });
554
555 let done_tx_clone = done_tx.clone();
556 let internal_cmd_sender = internal_cmd_sender_clone.clone();
557 chan.on_close(move || {
558 if done_tx_clone.try_send(Err(Error::ChannelClosed)).is_err() {
559 let _ =
560 internal_cmd_sender.send(PeerCmdInternal::ChannelClosed(id));
561 }
562 std::future::ready(())
563 });
564
565 done_rx.recv().await.ok_or(Error::ChannelClosed)??;
566
567 Ok(chan)
568 };
569
570 let _ =
571 internal_cmd_sender.send(PeerCmdInternal::ChannelOpened(id, result.await));
572 };
573 let mut aborted = aborted.clone();
574 spawn_local(async move {
575 tokio::select! {
576 _ = aborted.wait() => {}
577 _ = fut => {}
578 }
579 });
580 }
581 PeerCmdAll::External(PeerCmd::ChannelSend(msg_id, msg)) => {
582 let id = msg.channel_id();
583 let err = match channels.get_msg_sender(id) {
584 Some(msg_sender) => match msg_buf.encode(&msg) {
585 Ok(encoded) => match msg_sender.send((msg_id, encoded, _tracker)) {
586 Ok(_) => None,
587 Err(_) => Some("ChannelMsgMpscSendFailed".to_owned()),
588 },
589 Err(err) => Some(err.to_string()),
590 },
591 None => Some("ChannelNotOpen".to_owned()),
592 };
593 if let Some(err) = err {
594 let _ =
595 event_sender(P2pChannelEvent::Sent(peer_id, id, msg_id, Err(err)).into());
596 }
597 }
598 PeerCmdAll::Internal(PeerCmdInternal::ChannelOpened(chan_id, result)) => {
599 let (sender_tx, mut sender_rx) = mpsc::unbounded_channel();
600 let (chan, res) = match result {
601 Ok(chan) => {
602 channels.add(chan_id, sender_tx);
603 (Some(chan), Ok(()))
604 }
605 Err(err) => (None, Err(err.to_string())),
606 };
607
608 #[allow(unused_mut)]
609 if let Some(mut chan) = chan {
610 fn process_msg(
611 chan_id: ChannelId,
612 buf: &mut Vec<u8>,
613 len: &mut u32,
614 msg: &mut &[u8],
615 ) -> Result<Option<ChannelMsg>, String> {
616 let len = if buf.is_empty() {
617 if msg.len() < 4 {
618 return Err("WebRTCMessageTooSmall".to_owned());
619 } else {
620 *len = u32::from_be_bytes(
621 msg[..4].try_into().expect("Size checked above"),
622 );
623 *msg = &msg[4..];
624 let len = *len as usize;
625 if len > chan_id.max_msg_size() {
626 return Err(format!(
627 "ChannelMsgLenOverLimit; len: {}, limit: {}",
628 len,
629 chan_id.max_msg_size()
630 ));
631 }
632 len
633 }
634 } else {
635 *len as usize
636 };
637 let bytes_left = len - buf.len();
638
639 if bytes_left > msg.len() {
640 buf.extend_from_slice(msg);
641 *msg = &[];
642 return Ok(None);
643 }
644
645 buf.extend_from_slice(&msg[..bytes_left]);
646 *msg = &msg[bytes_left..];
647 let msg = ChannelMsg::decode(&mut &buf[..], chan_id)
648 .map_err(|err| err.to_string())?;
649 buf.clear();
650 Ok(Some(msg))
651 }
652
653 let mut len = 0;
654 let mut buf = Vec::new();
655 let event_sender_clone = event_sender.clone();
656
657 chan.on_message(move |mut data| {
658 while !data.is_empty() {
659 let res = match process_msg(chan_id, &mut buf, &mut len, &mut data) {
660 Ok(None) => continue,
661 Ok(Some(msg)) => Ok(msg),
662 Err(err) => Err(err),
663 };
664 let _ =
665 event_sender_clone(P2pChannelEvent::Received(peer_id, res).into());
666 }
667 #[cfg(not(all(not(target_arch = "wasm32"), feature = "p2p-webrtc-cpp")))]
668 std::future::ready(())
669 });
670
671 let event_sender = event_sender.clone();
672 let fut = async move {
673 sleep(Duration::from_secs(3)).await;
677
678 while let Some((msg_id, encoded, _tracker)) = sender_rx.recv().await {
679 let encoded = bytes::Bytes::from(encoded);
680 let mut chunks =
681 encoded.chunks(CHUNK_SIZE).map(|b| encoded.slice_ref(b));
682 let result = loop {
683 let Some(chunk) = chunks.next() else {
684 break Ok(());
685 };
686 if let Err(err) = chan
687 .send(&chunk)
688 .await
689 .map_err(|e| format!("{e:?}"))
690 .and_then(|n| match n == chunk.len() {
691 false => Err("NotAllBytesWritten".to_owned()),
692 true => Ok(()),
693 })
694 {
695 break Err(err);
696 }
697 };
698
699 let _ = event_sender(
700 P2pChannelEvent::Sent(peer_id, chan_id, msg_id, result).into(),
701 );
702 }
703 };
704
705 let mut aborted = aborted.clone();
706 spawn_local(async move {
707 tokio::select! {
708 _ = aborted.wait() => {}
709 _ = fut => {}
710 }
711 });
712 }
713
714 let _ = event_sender(P2pChannelEvent::Opened(peer_id, chan_id, res).into());
715 }
716 PeerCmdAll::Internal(PeerCmdInternal::ChannelClosed(id)) => {
717 channels.remove(id);
718 let _ = event_sender(P2pChannelEvent::Closed(peer_id, id).into());
719 }
720 }
721 }
722}
723
724pub trait P2pServiceWebrtc: redux::Service {
725 type Event: From<P2pEvent> + Send + Sync + 'static;
726
727 fn random_pick(
728 &mut self,
729 list: &[P2pConnectionOutgoingInitOpts],
730 ) -> Option<P2pConnectionOutgoingInitOpts>;
731
732 fn event_sender(&self) -> &mpsc::UnboundedSender<Self::Event>;
733
734 fn cmd_sender(&self) -> &mpsc::TrackedUnboundedSender<Cmd>;
735
736 fn peers(&mut self) -> &mut BTreeMap<PeerId, PeerState>;
737
738 fn init<S: TaskSpawner>(
739 secret_key: SecretKey,
740 spawner: S,
741 rng_seed: [u8; 32],
742 ) -> P2pServiceCtx {
743 const MAX_PEERS: usize = 500;
744 let (cmd_sender, mut cmd_receiver) = mpsc::tracked_unbounded_channel();
745
746 let certificate = certificate_from_pem_key(secret_key.to_pem().as_str());
747
748 spawner.spawn_main("webrtc", async move {
749 #[allow(clippy::all)]
750 let api = build_api();
751 let conn_permits = Arc::new(Semaphore::const_new(MAX_PEERS));
752 while let Some(cmd) = cmd_receiver.recv().await {
753 match cmd.0 {
754 Cmd::PeerAdd { args, aborted } => {
755 #[allow(clippy::all)]
756 let api = api.clone();
757 let conn_permits = conn_permits.clone();
758 let peer_id = args.peer_id;
759 let event_sender = args.event_sender.clone();
760 let certificate = certificate.clone();
761 spawn_local(async move {
762 let Ok(_permit) = conn_permits.try_acquire() else {
763 bug_condition!("P2P WebRTC Semaphore acquisition failed!");
765 return;
766 };
767 let (closed_tx, mut closed) = mpsc::channel(1);
768 let event_sender_clone = event_sender.clone();
769 spawn_local(async move {
770 let _ = closed.recv().await;
772 event_sender_clone(P2pConnectionEvent::Closed(peer_id).into());
773 });
774 tokio::select! {
775 _ = peer_start(api, args, aborted.clone(), closed_tx.clone(), certificate, rng_seed) => {}
776 _ = aborted.wait() => {
777 }
778 }
779
780 sleep(Duration::from_millis(100)).await;
782 let _ = closed_tx.send(()).await;
783 });
784 }
785 }
786 }
787 });
788
789 P2pServiceCtx {
790 cmd_sender,
791 peers: Default::default(),
792 }
793 }
794
795 fn outgoing_init(&mut self, peer_id: PeerId) {
796 let (peer_cmd_sender, peer_cmd_receiver) = mpsc::tracked_unbounded_channel();
797 let aborter = Aborter::default();
798 let aborted = aborter.aborted();
799
800 self.peers().insert(
801 peer_id,
802 PeerState {
803 cmd_sender: peer_cmd_sender,
804 abort: aborter,
805 },
806 );
807 let event_sender = self.event_sender().clone();
808 let event_sender =
809 Arc::new(move |p2p_event: P2pEvent| event_sender.send(p2p_event.into()).ok());
810 let _ = self.cmd_sender().tracked_send(Cmd::PeerAdd {
811 args: PeerAddArgs {
812 peer_id,
813 kind: PeerConnectionKind::Outgoing,
814 event_sender,
815 cmd_receiver: peer_cmd_receiver,
816 },
817 aborted,
818 });
819 }
820
821 fn incoming_init(&mut self, peer_id: PeerId, offer: webrtc::Offer) {
822 let (peer_cmd_sender, peer_cmd_receiver) = mpsc::tracked_unbounded_channel();
823 let aborter = Aborter::default();
824 let aborted = aborter.aborted();
825
826 self.peers().insert(
827 peer_id,
828 PeerState {
829 cmd_sender: peer_cmd_sender,
830 abort: aborter,
831 },
832 );
833 let event_sender = self.event_sender().clone();
834 let event_sender =
835 Arc::new(move |p2p_event: P2pEvent| event_sender.send(p2p_event.into()).ok());
836 let _ = self.cmd_sender().tracked_send(Cmd::PeerAdd {
837 args: PeerAddArgs {
838 peer_id,
839 kind: PeerConnectionKind::Incoming(Box::new(offer)),
840 event_sender,
841 cmd_receiver: peer_cmd_receiver,
842 },
843 aborted,
844 });
845 }
846
847 fn set_answer(&mut self, peer_id: PeerId, answer: webrtc::Answer) {
848 if let Some(peer) = self.peers().get(&peer_id) {
849 let _ = peer.cmd_sender.tracked_send(PeerCmd::AnswerSet(answer));
850 }
851 }
852
853 fn http_signaling_request(&mut self, url: String, offer: webrtc::Offer) {
854 if let Some(peer) = self.peers().get(&offer.target_peer_id) {
855 let _ = peer
856 .cmd_sender
857 .tracked_send(PeerCmd::PeerHttpOfferSend(url, offer));
858 }
859 }
860
861 fn disconnect(&mut self, peer_id: PeerId) -> bool {
862 if let Some(_peer) = self.peers().remove(&peer_id) {
866 } else {
871 openmina_core::error!(openmina_core::log::system_time(); "`disconnect` shouldn't be used for libp2p peers");
872 }
873 true
874 }
875
876 fn channel_open(&mut self, peer_id: PeerId, id: ChannelId) {
877 if let Some(peer) = self.peers().get(&peer_id) {
878 let _ = peer.cmd_sender.tracked_send(PeerCmd::ChannelOpen(id));
879 }
880 }
881
882 fn channel_send(&mut self, peer_id: PeerId, msg_id: MsgId, msg: ChannelMsg) {
883 if let Some(peer) = self.peers().get(&peer_id) {
884 let _ = peer
885 .cmd_sender
886 .tracked_send(PeerCmd::ChannelSend(msg_id, msg));
887 }
888 }
889
890 fn encrypt<T: EncryptableType>(
891 &mut self,
892 other_pk: &PublicKey,
893 message: &T,
894 ) -> Result<T::Encrypted, Box<dyn std::error::Error>>;
895
896 fn decrypt<T: EncryptableType>(
897 &mut self,
898 other_pub_key: &PublicKey,
899 encrypted: &T::Encrypted,
900 ) -> Result<T, Box<dyn std::error::Error>>;
901
902 fn auth_send(
903 &mut self,
904 peer_id: PeerId,
905 _other_pub_key: &PublicKey,
906 auth: Option<ConnectionAuthEncrypted>,
907 ) {
908 if let Some(peer) = self.peers().get(&peer_id) {
909 let _ = peer
910 .cmd_sender
911 .tracked_send(PeerCmd::ConnectionAuthorizationSend(auth));
912 }
913 }
914
915 fn auth_encrypt_and_send(
916 &mut self,
917 peer_id: PeerId,
918 other_pub_key: &PublicKey,
919 auth: ConnectionAuth,
920 );
921
922 fn auth_decrypt(
923 &mut self,
924 other_pub_key: &PublicKey,
925 auth: ConnectionAuthEncrypted,
926 ) -> Option<ConnectionAuth>;
927}
928
929impl P2pServiceCtx {
930 pub fn pending_cmds(&self) -> usize {
931 self.peers
932 .iter()
933 .fold(self.cmd_sender.len(), |acc, (_, peer)| {
934 acc + peer.cmd_sender.len()
935 })
936 }
937}