openmina_core/
channels.rs

1pub use tokio::sync::oneshot;
2
3pub mod mpsc {
4    use std::sync::{Arc, Weak};
5
6    pub use flume::{SendError, TryRecvError, TrySendError};
7
8    pub type RecvStream<T> = flume::r#async::RecvStream<'static, T>;
9
10    pub struct Sender<T>(flume::Sender<T>);
11    pub struct Receiver<T>(flume::Receiver<T>);
12
13    pub struct UnboundedSender<T>(flume::Sender<T>, Arc<()>);
14    pub struct UnboundedReceiver<T>(flume::Receiver<T>);
15
16    pub type TrackedUnboundedSender<T> = UnboundedSender<Tracked<T>>;
17    pub type TrackedUnboundedReceiver<T> = UnboundedReceiver<Tracked<T>>;
18
19    #[allow(dead_code)]
20    pub struct Tracked<T>(pub T, pub Tracker);
21    #[allow(dead_code)]
22    pub struct Tracker(Weak<()>);
23
24    impl<T> std::fmt::Debug for UnboundedSender<T> {
25        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26            write!(f, "{:?} (len: {})", self.0, self.len())
27        }
28    }
29
30    impl<T> std::fmt::Debug for UnboundedReceiver<T> {
31        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32            write!(f, "{:?} (len: {})", self.0, self.len())
33        }
34    }
35
36    impl<T> Clone for Sender<T> {
37        fn clone(&self) -> Self {
38            Self(self.0.clone())
39        }
40    }
41
42    impl<T> Clone for UnboundedSender<T> {
43        fn clone(&self) -> Self {
44            Self(self.0.clone(), self.1.clone())
45        }
46    }
47
48    impl<T> std::ops::Deref for Tracked<T> {
49        type Target = T;
50
51        fn deref(&self) -> &Self::Target {
52            &self.0
53        }
54    }
55
56    impl<T> std::ops::DerefMut for Tracked<T> {
57        fn deref_mut(&mut self) -> &mut Self::Target {
58            &mut self.0
59        }
60    }
61
62    impl<T> Sender<T> {
63        pub async fn send(&self, message: T) -> Result<(), SendError<T>> {
64            self.0.send_async(message).await
65        }
66
67        pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
68            self.0.try_send(message)
69        }
70    }
71
72    impl<T> Receiver<T> {
73        pub fn is_empty(&self) -> bool {
74            self.0.is_empty()
75        }
76
77        pub fn len(&self) -> usize {
78            self.0.len()
79        }
80
81        pub async fn recv(&mut self) -> Option<T> {
82            self.0.recv_async().await.ok()
83        }
84
85        pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
86            self.0.try_recv()
87        }
88    }
89
90    impl<T> UnboundedSender<T> {
91        pub fn is_empty(&self) -> bool {
92            self.len() == 0
93        }
94
95        pub fn len(&self) -> usize {
96            Arc::weak_count(&self.1)
97        }
98
99        pub fn send(&self, message: T) -> Result<(), SendError<T>> {
100            self.0.send(message)
101        }
102    }
103
104    impl<T> TrackedUnboundedSender<T> {
105        pub fn tracked_send(&self, message: T) -> Result<(), SendError<T>> {
106            let msg = Tracked(message, Tracker(Arc::downgrade(&self.1)));
107            self.send(msg).map_err(|err| SendError(err.0 .0))
108        }
109    }
110
111    impl<T> UnboundedReceiver<T> {
112        pub fn is_empty(&self) -> bool {
113            self.0.is_empty()
114        }
115
116        pub fn len(&self) -> usize {
117            self.0.len()
118        }
119
120        pub async fn recv(&mut self) -> Option<T> {
121            self.0.recv_async().await.ok()
122        }
123
124        pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
125            self.0.try_recv()
126        }
127
128        pub fn stream(&self) -> RecvStream<T> {
129            self.0.clone().into_stream()
130        }
131
132        pub fn blocking_recv(&mut self) -> Option<T> {
133            self.0.recv().ok()
134        }
135    }
136
137    pub fn channel<T>(bound: usize) -> (Sender<T>, Receiver<T>) {
138        let (tx, rx) = flume::bounded(bound);
139
140        (Sender(tx), Receiver(rx))
141    }
142
143    pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
144        let (tx, rx) = flume::unbounded();
145
146        (UnboundedSender(tx, Arc::new(())), UnboundedReceiver(rx))
147    }
148
149    pub fn tracked_unbounded_channel<T>(
150    ) -> (UnboundedSender<Tracked<T>>, UnboundedReceiver<Tracked<T>>) {
151        let (tx, rx) = flume::unbounded();
152
153        (UnboundedSender(tx, Arc::new(())), UnboundedReceiver(rx))
154    }
155}
156
157pub mod broadcast {
158    pub use tokio::sync::broadcast::*;
159
160    #[deprecated(note = "don't use across threads as it can cause panic in WASM")]
161    #[inline(always)]
162    pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
163        tokio::sync::broadcast::channel(capacity)
164    }
165}
166
167pub mod watch {
168    pub use tokio::sync::watch::*;
169
170    #[deprecated(note = "don't use across threads as it can cause panic in WASM")]
171    #[inline(always)]
172    pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
173        tokio::sync::watch::channel(init)
174    }
175}
176
177#[allow(dead_code)]
178pub struct Aborter(flume::Receiver<()>, flume::Sender<()>);
179
180#[derive(Clone)]
181pub struct Aborted(flume::Sender<()>);
182
183impl Default for Aborter {
184    fn default() -> Self {
185        let (tx, rx) = flume::bounded(0);
186        Self(rx, tx)
187    }
188}
189
190impl Aborter {
191    pub fn listener_count(&self) -> usize {
192        self.0.sender_count().saturating_sub(1)
193    }
194
195    /// Simply drops the object. No need to call manually, unless you
196    /// temporarily have to retain object for some reason.
197    pub fn abort_mut(&mut self) {
198        std::mem::take(self);
199    }
200
201    pub fn aborted(&self) -> Aborted {
202        Aborted(self.1.clone())
203    }
204}
205
206impl Aborted {
207    pub async fn wait(&self) {
208        // it returning an error means receiver was dropped
209        while self.0.send_async(()).await.is_ok() {}
210    }
211}