mina_p2p/service_impl/webrtc/
webrtc_rs.rs1use std::{future::Future, sync::Arc};
2
3use webrtc::{
4 api::APIBuilder,
5 data_channel::{data_channel_init::RTCDataChannelInit, RTCDataChannel},
6 ice_transport::{
7 ice_credential_type::RTCIceCredentialType, ice_gatherer_state::RTCIceGathererState,
8 ice_gathering_state::RTCIceGatheringState, ice_server::RTCIceServer,
9 },
10 peer_connection::{
11 configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState,
12 policy::ice_transport_policy::RTCIceTransportPolicy,
13 sdp::session_description::RTCSessionDescription, RTCPeerConnection,
14 },
15};
16
17use crate::{
18 connection::P2pConnectionResponse,
19 webrtc::{Answer, Offer},
20};
21
22use super::{OnConnectionStateChangeHdlrFn, RTCChannelConfig, RTCConfig};
23
24pub type Result<T> = std::result::Result<T, webrtc::Error>;
25
26pub type RTCConnectionState = RTCPeerConnectionState;
27
28pub type Api = Arc<webrtc::api::API>;
29
30pub type RTCCertificate = webrtc::peer_connection::certificate::RTCCertificate;
31
32pub fn certificate_from_pem_key(pem_str: &str) -> RTCCertificate {
33 let keypair = rcgen::KeyPair::from_pem(pem_str).expect("valid pem");
34 RTCCertificate::from_key_pair(keypair).expect("keypair is compatible")
35}
36
37pub fn build_api() -> Api {
38 APIBuilder::new().build().into()
39}
40
41pub struct RTCConnection(Arc<RTCPeerConnection>, bool);
42
43#[derive(Clone)]
44pub struct RTCChannel(Arc<RTCDataChannel>);
45
46#[derive(thiserror::Error, derive_more::From, Debug)]
47pub enum RTCSignalingError {
48 #[error("http request failed: {0}")]
49 Http(reqwest::Error),
50}
51
52impl RTCConnection {
53 pub async fn create(api: &Api, config: RTCConfig) -> Result<Self> {
54 let mut configuration = RTCConfiguration::from(config);
55 configuration.certificates.clear();
57 api.new_peer_connection(configuration)
58 .await
59 .map(|v| Self(v.into(), true))
60 }
61
62 pub fn is_main(&self) -> bool {
63 self.1
64 }
65
66 pub async fn channel_create(&self, config: RTCChannelConfig) -> Result<RTCChannel> {
67 self.0
68 .create_data_channel(
69 config.label,
70 Some(RTCDataChannelInit {
71 ordered: Some(true),
72 max_packet_life_time: None,
73 max_retransmits: None,
74 negotiated: config.negotiated,
75 ..Default::default()
76 }),
77 )
78 .await
79 .map(RTCChannel)
80 }
81
82 pub async fn offer_create(&self) -> Result<RTCSessionDescription> {
83 self.0.create_offer(None).await
84 }
85
86 pub async fn answer_create(&self) -> Result<RTCSessionDescription> {
87 self.0.create_answer(None).await
88 }
89
90 pub async fn local_desc_set(&self, desc: RTCSessionDescription) -> Result<()> {
91 self.0.set_local_description(desc).await
92 }
93
94 pub async fn remote_desc_set(&self, desc: RTCSessionDescription) -> Result<()> {
95 self.0.set_remote_description(desc).await
96 }
97
98 pub async fn local_sdp(&self) -> Option<String> {
99 self.0.local_description().await.map(|v| v.sdp)
100 }
101
102 pub fn connection_state(&self) -> RTCConnectionState {
103 self.0.connection_state()
104 }
105
106 pub async fn wait_for_ice_gathering_complete(&self) {
107 if !matches!(self.0.ice_gathering_state(), RTCIceGatheringState::Complete) {
108 let (tx, rx) = tokio::sync::oneshot::channel::<()>();
109 let mut tx = Some(tx);
110 self.0.on_ice_gathering_state_change(Box::new(move |state| {
111 if matches!(state, RTCIceGathererState::Complete) {
112 if let Some(tx) = tx.take() {
113 let _ = tx.send(());
114 }
115 }
116 Box::pin(std::future::ready(()))
117 }));
118 let _ = rx.await;
119 }
120 }
121
122 pub fn on_connection_state_change(&self, handler: OnConnectionStateChangeHdlrFn) {
123 self.0.on_peer_connection_state_change(handler)
124 }
125
126 pub async fn close(self) {
127 if let Err(error) = self.0.close().await {
128 mina_core::warn!(
129 mina_core::log::system_time();
130 summary = "CONNECTION LEAK: Failure when closing RTCConnection",
131 error = error.to_string(),
132 )
133 }
134 }
135}
136
137impl RTCChannel {
138 pub fn on_open<Fut>(&self, f: impl FnOnce() -> Fut + Send + Sync + 'static)
139 where
140 Fut: Future<Output = ()> + Send + 'static,
141 {
142 self.0.on_open(Box::new(move || Box::pin(f())))
143 }
144
145 pub fn on_message<Fut>(&self, mut f: impl FnMut(&[u8]) -> Fut + Send + Sync + 'static)
146 where
147 Fut: Future<Output = ()> + Send + 'static,
148 {
149 self.0
150 .on_message(Box::new(move |msg| Box::pin(f(msg.data.as_ref()))));
151 }
152
153 pub fn on_error<Fut>(&self, mut f: impl FnMut(webrtc::Error) -> Fut + Send + Sync + 'static)
154 where
155 Fut: Future<Output = ()> + Send + 'static,
156 {
157 self.0.on_error(Box::new(move |err| Box::pin(f(err))))
158 }
159
160 pub fn on_close<Fut>(&self, mut f: impl FnMut() -> Fut + Send + Sync + 'static)
161 where
162 Fut: Future<Output = ()> + Send + 'static,
163 {
164 self.0.on_close(Box::new(move || Box::pin(f())))
165 }
166
167 pub async fn send(&self, data: &bytes::Bytes) -> Result<usize> {
168 self.0.send(data).await
169 }
170
171 pub async fn close(&self) {
172 let _ = self.0.close().await;
173 }
174}
175
176pub async fn webrtc_signal_send(
177 url: &str,
178 offer: Offer,
179) -> std::result::Result<P2pConnectionResponse, RTCSignalingError> {
180 let client = reqwest::Client::new();
181 let res = client
182 .post(url)
183 .json(&offer) .send()
185 .await?
186 .json()
187 .await?;
188 Ok(res)
189}
190
191impl Clone for RTCConnection {
192 fn clone(&self) -> Self {
193 Self(self.0.clone(), false)
194 }
195}
196
197impl From<RTCConfig> for RTCConfiguration {
198 fn from(value: RTCConfig) -> Self {
199 RTCConfiguration {
200 ice_servers: value.ice_servers.0.into_iter().map(Into::into).collect(),
201 ice_transport_policy: RTCIceTransportPolicy::All,
202 certificates: vec![value.certificate],
203 seed: Some(value.seed.to_vec()),
204 ..Default::default()
205 }
206 }
207}
208
209impl From<super::RTCConfigIceServer> for RTCIceServer {
210 fn from(value: super::RTCConfigIceServer) -> Self {
211 let credential_type = match value.credential.is_some() {
212 false => RTCIceCredentialType::Unspecified,
213 true => RTCIceCredentialType::Password,
214 };
215 RTCIceServer {
216 urls: value.urls,
217 username: value.username.unwrap_or_default(),
218 credential: value.credential.unwrap_or_default(),
219 credential_type,
220 }
221 }
222}
223
224impl TryFrom<Offer> for RTCSessionDescription {
225 type Error = webrtc::Error;
226
227 fn try_from(value: Offer) -> Result<Self> {
228 Self::offer(value.sdp)
229 }
230}
231
232impl TryFrom<Answer> for RTCSessionDescription {
233 type Error = webrtc::Error;
234
235 fn try_from(value: Answer) -> Result<Self> {
236 Self::answer(value.sdp)
237 }
238}