Skip to main content

mina_p2p/service_impl/webrtc/
webrtc_rs.rs

1use std::{future::Future, sync::Arc};
2
3use webrtc::{
4    api::APIBuilder,
5    data_channel::{data_channel_init::RTCDataChannelInit, RTCDataChannel},
6    ice_transport::{
7        ice_credential_type::RTCIceCredentialType, ice_gatherer_state::RTCIceGathererState,
8        ice_gathering_state::RTCIceGatheringState, ice_server::RTCIceServer,
9    },
10    peer_connection::{
11        configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState,
12        policy::ice_transport_policy::RTCIceTransportPolicy,
13        sdp::session_description::RTCSessionDescription, RTCPeerConnection,
14    },
15};
16
17use crate::{
18    connection::P2pConnectionResponse,
19    webrtc::{Answer, Offer},
20};
21
22use super::{OnConnectionStateChangeHdlrFn, RTCChannelConfig, RTCConfig};
23
24pub type Result<T> = std::result::Result<T, webrtc::Error>;
25
26pub type RTCConnectionState = RTCPeerConnectionState;
27
28pub type Api = Arc<webrtc::api::API>;
29
30pub type RTCCertificate = webrtc::peer_connection::certificate::RTCCertificate;
31
32pub fn certificate_from_pem_key(pem_str: &str) -> RTCCertificate {
33    let keypair = rcgen::KeyPair::from_pem(pem_str).expect("valid pem");
34    RTCCertificate::from_key_pair(keypair).expect("keypair is compatible")
35}
36
37pub fn build_api() -> Api {
38    APIBuilder::new().build().into()
39}
40
41pub struct RTCConnection(Arc<RTCPeerConnection>, bool);
42
43#[derive(Clone)]
44pub struct RTCChannel(Arc<RTCDataChannel>);
45
46#[derive(thiserror::Error, derive_more::From, Debug)]
47pub enum RTCSignalingError {
48    #[error("http request failed: {0}")]
49    Http(reqwest::Error),
50}
51
52impl RTCConnection {
53    pub async fn create(api: &Api, config: RTCConfig) -> Result<Self> {
54        let mut configuration = RTCConfiguration::from(config);
55        // try default certificate, TODO(vlad): do it right
56        configuration.certificates.clear();
57        api.new_peer_connection(configuration)
58            .await
59            .map(|v| Self(v.into(), true))
60    }
61
62    pub fn is_main(&self) -> bool {
63        self.1
64    }
65
66    pub async fn channel_create(&self, config: RTCChannelConfig) -> Result<RTCChannel> {
67        self.0
68            .create_data_channel(
69                config.label,
70                Some(RTCDataChannelInit {
71                    ordered: Some(true),
72                    max_packet_life_time: None,
73                    max_retransmits: None,
74                    negotiated: config.negotiated,
75                    ..Default::default()
76                }),
77            )
78            .await
79            .map(RTCChannel)
80    }
81
82    pub async fn offer_create(&self) -> Result<RTCSessionDescription> {
83        self.0.create_offer(None).await
84    }
85
86    pub async fn answer_create(&self) -> Result<RTCSessionDescription> {
87        self.0.create_answer(None).await
88    }
89
90    pub async fn local_desc_set(&self, desc: RTCSessionDescription) -> Result<()> {
91        self.0.set_local_description(desc).await
92    }
93
94    pub async fn remote_desc_set(&self, desc: RTCSessionDescription) -> Result<()> {
95        self.0.set_remote_description(desc).await
96    }
97
98    pub async fn local_sdp(&self) -> Option<String> {
99        self.0.local_description().await.map(|v| v.sdp)
100    }
101
102    pub fn connection_state(&self) -> RTCConnectionState {
103        self.0.connection_state()
104    }
105
106    pub async fn wait_for_ice_gathering_complete(&self) {
107        if !matches!(self.0.ice_gathering_state(), RTCIceGatheringState::Complete) {
108            let (tx, rx) = tokio::sync::oneshot::channel::<()>();
109            let mut tx = Some(tx);
110            self.0.on_ice_gathering_state_change(Box::new(move |state| {
111                if matches!(state, RTCIceGathererState::Complete) {
112                    if let Some(tx) = tx.take() {
113                        let _ = tx.send(());
114                    }
115                }
116                Box::pin(std::future::ready(()))
117            }));
118            let _ = rx.await;
119        }
120    }
121
122    pub fn on_connection_state_change(&self, handler: OnConnectionStateChangeHdlrFn) {
123        self.0.on_peer_connection_state_change(handler)
124    }
125
126    pub async fn close(self) {
127        if let Err(error) = self.0.close().await {
128            mina_core::warn!(
129                mina_core::log::system_time();
130                summary = "CONNECTION LEAK: Failure when closing RTCConnection",
131                error = error.to_string(),
132            )
133        }
134    }
135}
136
137impl RTCChannel {
138    pub fn on_open<Fut>(&self, f: impl FnOnce() -> Fut + Send + Sync + 'static)
139    where
140        Fut: Future<Output = ()> + Send + 'static,
141    {
142        self.0.on_open(Box::new(move || Box::pin(f())))
143    }
144
145    pub fn on_message<Fut>(&self, mut f: impl FnMut(&[u8]) -> Fut + Send + Sync + 'static)
146    where
147        Fut: Future<Output = ()> + Send + 'static,
148    {
149        self.0
150            .on_message(Box::new(move |msg| Box::pin(f(msg.data.as_ref()))));
151    }
152
153    pub fn on_error<Fut>(&self, mut f: impl FnMut(webrtc::Error) -> Fut + Send + Sync + 'static)
154    where
155        Fut: Future<Output = ()> + Send + 'static,
156    {
157        self.0.on_error(Box::new(move |err| Box::pin(f(err))))
158    }
159
160    pub fn on_close<Fut>(&self, mut f: impl FnMut() -> Fut + Send + Sync + 'static)
161    where
162        Fut: Future<Output = ()> + Send + 'static,
163    {
164        self.0.on_close(Box::new(move || Box::pin(f())))
165    }
166
167    pub async fn send(&self, data: &bytes::Bytes) -> Result<usize> {
168        self.0.send(data).await
169    }
170
171    pub async fn close(&self) {
172        let _ = self.0.close().await;
173    }
174}
175
176pub async fn webrtc_signal_send(
177    url: &str,
178    offer: Offer,
179) -> std::result::Result<P2pConnectionResponse, RTCSignalingError> {
180    let client = reqwest::Client::new();
181    let res = client
182        .post(url)
183        .json(&offer) // Sets Content-Type: application/json automatically
184        .send()
185        .await?
186        .json()
187        .await?;
188    Ok(res)
189}
190
191impl Clone for RTCConnection {
192    fn clone(&self) -> Self {
193        Self(self.0.clone(), false)
194    }
195}
196
197impl From<RTCConfig> for RTCConfiguration {
198    fn from(value: RTCConfig) -> Self {
199        RTCConfiguration {
200            ice_servers: value.ice_servers.0.into_iter().map(Into::into).collect(),
201            ice_transport_policy: RTCIceTransportPolicy::All,
202            certificates: vec![value.certificate],
203            seed: Some(value.seed.to_vec()),
204            ..Default::default()
205        }
206    }
207}
208
209impl From<super::RTCConfigIceServer> for RTCIceServer {
210    fn from(value: super::RTCConfigIceServer) -> Self {
211        let credential_type = match value.credential.is_some() {
212            false => RTCIceCredentialType::Unspecified,
213            true => RTCIceCredentialType::Password,
214        };
215        RTCIceServer {
216            urls: value.urls,
217            username: value.username.unwrap_or_default(),
218            credential: value.credential.unwrap_or_default(),
219            credential_type,
220        }
221    }
222}
223
224impl TryFrom<Offer> for RTCSessionDescription {
225    type Error = webrtc::Error;
226
227    fn try_from(value: Offer) -> Result<Self> {
228        Self::offer(value.sdp)
229    }
230}
231
232impl TryFrom<Answer> for RTCSessionDescription {
233    type Error = webrtc::Error;
234
235    fn try_from(value: Answer) -> Result<Self> {
236        Self::answer(value.sdp)
237    }
238}