openmina_node_common/service/
event_receiver.rs1use node::{core::channels::mpsc, event_source::Event};
2
3pub type EventSender = mpsc::UnboundedSender<Event>;
4
5pub struct EventReceiver {
6 rx: mpsc::UnboundedReceiver<Event>,
7 queue: Vec<Event>,
8}
9
10impl EventReceiver {
11 pub fn is_empty(&self) -> bool {
12 !self.has_next()
13 }
14
15 pub fn len(&self) -> usize {
16 self.rx.len() + self.queue.len()
17 }
18
19 pub async fn wait_for_events(&mut self) -> Result<(), ()> {
21 if !self.queue.is_empty() {
22 return Ok(());
23 }
24 let next = self.rx.recv().await.ok_or(())?;
25 self.queue.push(next);
26 Ok(())
27 }
28
29 pub fn has_next(&self) -> bool {
30 !self.queue.is_empty() || !self.rx.is_empty()
31 }
32
33 pub fn try_next(&mut self) -> Option<Event> {
34 if !self.queue.is_empty() {
35 Some(self.queue.remove(0))
36 } else {
37 self.rx.try_recv().ok()
38 }
39 }
40}
41
42impl From<mpsc::UnboundedReceiver<Event>> for EventReceiver {
43 fn from(rx: mpsc::UnboundedReceiver<Event>) -> Self {
44 Self {
45 rx,
46 queue: Vec::with_capacity(1),
47 }
48 }
49}