openmina_node_common/node/
node.rs

1use std::time::Duration;
2
3use node::{Effects, EventSourceAction, Service, State, Store};
4
5use crate::{
6    rpc::{RpcReceiver, RpcSender},
7    EventReceiver, NodeService,
8};
9
10pub struct Node<Serv> {
11    store: Store<Serv>,
12}
13
14impl<Serv: Service + AsMut<NodeService>> Node<Serv> {
15    pub fn new(
16        rng_seed: [u8; 32],
17        initial_state: State,
18        mut service: Serv,
19        override_effects: Option<Effects<Serv>>,
20    ) -> Self {
21        let p2p_sec_key = service.as_mut().p2p.sec_key.clone();
22        service
23            .recorder()
24            .initial_state(rng_seed, p2p_sec_key, &initial_state);
25
26        let time_since_epoch = initial_state
27            .time()
28            .checked_sub(redux::Timestamp::ZERO)
29            .unwrap();
30        let store = Store::new(
31            node::reducer,
32            override_effects.unwrap_or(node::effects),
33            service,
34            redux::SystemTime::UNIX_EPOCH + time_since_epoch,
35            initial_state,
36        );
37
38        Self { store }
39    }
40
41    pub fn store(&self) -> &Store<Serv> {
42        &self.store
43    }
44
45    pub fn store_mut(&mut self) -> &mut Store<Serv> {
46        &mut self.store
47    }
48
49    pub fn state(&self) -> &State {
50        self.store().state.get()
51    }
52
53    fn service_mut(&mut self) -> &mut Serv {
54        &mut self.store.service
55    }
56
57    fn service_common_mut(&mut self) -> &mut NodeService {
58        self.service_mut().as_mut()
59    }
60
61    fn event_receiver_with_rpc_receiver(&mut self) -> (&mut EventReceiver, &mut RpcReceiver) {
62        self.service_common_mut().event_receiver_with_rpc_receiver()
63    }
64
65    fn event_receiver(&mut self) -> &mut EventReceiver {
66        &mut self.service_common_mut().event_receiver
67    }
68
69    pub async fn run_forever(&mut self) {
70        loop {
71            self.store_mut().dispatch(EventSourceAction::WaitForEvents);
72
73            let (event_receiver, rpc_receiver) = self.event_receiver_with_rpc_receiver();
74            let wait_for_events = event_receiver.wait_for_events();
75            let rpc_req_fut = async {
76                // TODO(binier): optimize maybe to not check it all the time.
77                match rpc_receiver.recv().await {
78                    Some(v) => v,
79                    None => std::future::pending().await,
80                }
81            };
82
83            let timeout = Duration::from_millis(100);
84
85            #[cfg(not(target_arch = "wasm32"))]
86            let timeout = tokio::time::sleep(timeout);
87            #[cfg(target_arch = "wasm32")]
88            let timeout = gloo_timers::future::TimeoutFuture::new(timeout.as_millis() as u32);
89
90            tokio::select! {
91                _ = wait_for_events => {
92                    while self.event_receiver().has_next() {
93                        self.store_mut().dispatch(EventSourceAction::ProcessEvents);
94                    }
95                }
96                req = rpc_req_fut => {
97                    self.service_common_mut().process_rpc_request(req);
98                }
99                _ = timeout => {
100                    self.store_mut().dispatch(EventSourceAction::WaitTimeout);
101                }
102            }
103        }
104    }
105
106    pub fn rpc(&mut self) -> RpcSender {
107        self.service_common_mut().rpc_sender()
108    }
109}
110
111impl<Serv> Clone for Node<Serv>
112where
113    Serv: Clone,
114{
115    fn clone(&self) -> Self {
116        Self {
117            store: self.store.clone(),
118        }
119    }
120}