openmina_node_common/service/
event_receiver.rs

1use node::{core::channels::mpsc, event_source::Event};
2
3pub type EventSender = mpsc::UnboundedSender<Event>;
4
5pub struct EventReceiver {
6    rx: mpsc::UnboundedReceiver<Event>,
7    queue: Vec<Event>,
8}
9
10impl EventReceiver {
11    pub fn is_empty(&self) -> bool {
12        !self.has_next()
13    }
14
15    pub fn len(&self) -> usize {
16        self.rx.len() + self.queue.len()
17    }
18
19    /// If `Err(())`, `mpsc::Sender` for this channel was dropped.
20    pub async fn wait_for_events(&mut self) -> Result<(), ()> {
21        if !self.queue.is_empty() {
22            return Ok(());
23        }
24        let next = self.rx.recv().await.ok_or(())?;
25        self.queue.push(next);
26        Ok(())
27    }
28
29    pub fn has_next(&self) -> bool {
30        !self.queue.is_empty() || !self.rx.is_empty()
31    }
32
33    pub fn try_next(&mut self) -> Option<Event> {
34        if !self.queue.is_empty() {
35            Some(self.queue.remove(0))
36        } else {
37            self.rx.try_recv().ok()
38        }
39    }
40}
41
42impl From<mpsc::UnboundedReceiver<Event>> for EventReceiver {
43    fn from(rx: mpsc::UnboundedReceiver<Event>) -> Self {
44        Self {
45            rx,
46            queue: Vec::with_capacity(1),
47        }
48    }
49}