openmina_core/
channels.rs1pub use tokio::sync::oneshot;
2
3pub mod mpsc {
4 use std::sync::{Arc, Weak};
5
6 pub use flume::{SendError, TryRecvError, TrySendError};
7
8 pub type RecvStream<T> = flume::r#async::RecvStream<'static, T>;
9
10 pub struct Sender<T>(flume::Sender<T>);
11 pub struct Receiver<T>(flume::Receiver<T>);
12
13 pub struct UnboundedSender<T>(flume::Sender<T>, Arc<()>);
14 pub struct UnboundedReceiver<T>(flume::Receiver<T>);
15
16 pub type TrackedUnboundedSender<T> = UnboundedSender<Tracked<T>>;
17 pub type TrackedUnboundedReceiver<T> = UnboundedReceiver<Tracked<T>>;
18
19 #[allow(dead_code)]
20 pub struct Tracked<T>(pub T, pub Tracker);
21 #[allow(dead_code)]
22 pub struct Tracker(Weak<()>);
23
24 impl<T> std::fmt::Debug for UnboundedSender<T> {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 write!(f, "{:?} (len: {})", self.0, self.len())
27 }
28 }
29
30 impl<T> std::fmt::Debug for UnboundedReceiver<T> {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 write!(f, "{:?} (len: {})", self.0, self.len())
33 }
34 }
35
36 impl<T> Clone for Sender<T> {
37 fn clone(&self) -> Self {
38 Self(self.0.clone())
39 }
40 }
41
42 impl<T> Clone for UnboundedSender<T> {
43 fn clone(&self) -> Self {
44 Self(self.0.clone(), self.1.clone())
45 }
46 }
47
48 impl<T> std::ops::Deref for Tracked<T> {
49 type Target = T;
50
51 fn deref(&self) -> &Self::Target {
52 &self.0
53 }
54 }
55
56 impl<T> std::ops::DerefMut for Tracked<T> {
57 fn deref_mut(&mut self) -> &mut Self::Target {
58 &mut self.0
59 }
60 }
61
62 impl<T> Sender<T> {
63 pub async fn send(&self, message: T) -> Result<(), SendError<T>> {
64 self.0.send_async(message).await
65 }
66
67 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
68 self.0.try_send(message)
69 }
70 }
71
72 impl<T> Receiver<T> {
73 pub fn is_empty(&self) -> bool {
74 self.0.is_empty()
75 }
76
77 pub fn len(&self) -> usize {
78 self.0.len()
79 }
80
81 pub async fn recv(&mut self) -> Option<T> {
82 self.0.recv_async().await.ok()
83 }
84
85 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
86 self.0.try_recv()
87 }
88 }
89
90 impl<T> UnboundedSender<T> {
91 pub fn is_empty(&self) -> bool {
92 self.len() == 0
93 }
94
95 pub fn len(&self) -> usize {
96 Arc::weak_count(&self.1)
97 }
98
99 pub fn send(&self, message: T) -> Result<(), SendError<T>> {
100 self.0.send(message)
101 }
102 }
103
104 impl<T> TrackedUnboundedSender<T> {
105 pub fn tracked_send(&self, message: T) -> Result<(), SendError<T>> {
106 let msg = Tracked(message, Tracker(Arc::downgrade(&self.1)));
107 self.send(msg).map_err(|err| SendError(err.0 .0))
108 }
109 }
110
111 impl<T> UnboundedReceiver<T> {
112 pub fn is_empty(&self) -> bool {
113 self.0.is_empty()
114 }
115
116 pub fn len(&self) -> usize {
117 self.0.len()
118 }
119
120 pub async fn recv(&mut self) -> Option<T> {
121 self.0.recv_async().await.ok()
122 }
123
124 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
125 self.0.try_recv()
126 }
127
128 pub fn stream(&self) -> RecvStream<T> {
129 self.0.clone().into_stream()
130 }
131
132 pub fn blocking_recv(&mut self) -> Option<T> {
133 self.0.recv().ok()
134 }
135 }
136
137 pub fn channel<T>(bound: usize) -> (Sender<T>, Receiver<T>) {
138 let (tx, rx) = flume::bounded(bound);
139
140 (Sender(tx), Receiver(rx))
141 }
142
143 pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
144 let (tx, rx) = flume::unbounded();
145
146 (UnboundedSender(tx, Arc::new(())), UnboundedReceiver(rx))
147 }
148
149 pub fn tracked_unbounded_channel<T>(
150 ) -> (UnboundedSender<Tracked<T>>, UnboundedReceiver<Tracked<T>>) {
151 let (tx, rx) = flume::unbounded();
152
153 (UnboundedSender(tx, Arc::new(())), UnboundedReceiver(rx))
154 }
155}
156
157pub mod broadcast {
158 pub use tokio::sync::broadcast::*;
159
160 #[deprecated(note = "don't use across threads as it can cause panic in WASM")]
161 #[inline(always)]
162 pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
163 tokio::sync::broadcast::channel(capacity)
164 }
165}
166
167pub mod watch {
168 pub use tokio::sync::watch::*;
169
170 #[deprecated(note = "don't use across threads as it can cause panic in WASM")]
171 #[inline(always)]
172 pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
173 tokio::sync::watch::channel(init)
174 }
175}
176
177#[allow(dead_code)]
178pub struct Aborter(flume::Receiver<()>, flume::Sender<()>);
179
180#[derive(Clone)]
181pub struct Aborted(flume::Sender<()>);
182
183impl Default for Aborter {
184 fn default() -> Self {
185 let (tx, rx) = flume::bounded(0);
186 Self(rx, tx)
187 }
188}
189
190impl Aborter {
191 pub fn listener_count(&self) -> usize {
192 self.0.sender_count().saturating_sub(1)
193 }
194
195 pub fn abort_mut(&mut self) {
198 std::mem::take(self);
199 }
200
201 pub fn aborted(&self) -> Aborted {
202 Aborted(self.1.clone())
203 }
204}
205
206impl Aborted {
207 pub async fn wait(&self) {
208 while self.0.send_async(()).await.is_ok() {}
210 }
211}