openmina_node_common/node/
node.rs1use std::time::Duration;
2
3use node::{Effects, EventSourceAction, Service, State, Store};
4
5use crate::{
6 rpc::{RpcReceiver, RpcSender},
7 EventReceiver, NodeService,
8};
9
10pub struct Node<Serv> {
11 store: Store<Serv>,
12}
13
14impl<Serv: Service + AsMut<NodeService>> Node<Serv> {
15 pub fn new(
16 rng_seed: [u8; 32],
17 initial_state: State,
18 mut service: Serv,
19 override_effects: Option<Effects<Serv>>,
20 ) -> Self {
21 let p2p_sec_key = service.as_mut().p2p.sec_key.clone();
22 service
23 .recorder()
24 .initial_state(rng_seed, p2p_sec_key, &initial_state);
25
26 let time_since_epoch = initial_state
27 .time()
28 .checked_sub(redux::Timestamp::ZERO)
29 .unwrap();
30 let store = Store::new(
31 node::reducer,
32 override_effects.unwrap_or(node::effects),
33 service,
34 redux::SystemTime::UNIX_EPOCH + time_since_epoch,
35 initial_state,
36 );
37
38 Self { store }
39 }
40
41 pub fn store(&self) -> &Store<Serv> {
42 &self.store
43 }
44
45 pub fn store_mut(&mut self) -> &mut Store<Serv> {
46 &mut self.store
47 }
48
49 pub fn state(&self) -> &State {
50 self.store().state.get()
51 }
52
53 fn service_mut(&mut self) -> &mut Serv {
54 &mut self.store.service
55 }
56
57 fn service_common_mut(&mut self) -> &mut NodeService {
58 self.service_mut().as_mut()
59 }
60
61 fn event_receiver_with_rpc_receiver(&mut self) -> (&mut EventReceiver, &mut RpcReceiver) {
62 self.service_common_mut().event_receiver_with_rpc_receiver()
63 }
64
65 fn event_receiver(&mut self) -> &mut EventReceiver {
66 &mut self.service_common_mut().event_receiver
67 }
68
69 pub async fn run_forever(&mut self) {
70 loop {
71 self.store_mut().dispatch(EventSourceAction::WaitForEvents);
72
73 let (event_receiver, rpc_receiver) = self.event_receiver_with_rpc_receiver();
74 let wait_for_events = event_receiver.wait_for_events();
75 let rpc_req_fut = async {
76 match rpc_receiver.recv().await {
78 Some(v) => v,
79 None => std::future::pending().await,
80 }
81 };
82
83 let timeout = Duration::from_millis(100);
84
85 #[cfg(not(target_arch = "wasm32"))]
86 let timeout = tokio::time::sleep(timeout);
87 #[cfg(target_arch = "wasm32")]
88 let timeout = gloo_timers::future::TimeoutFuture::new(timeout.as_millis() as u32);
89
90 tokio::select! {
91 _ = wait_for_events => {
92 while self.event_receiver().has_next() {
93 self.store_mut().dispatch(EventSourceAction::ProcessEvents);
94 }
95 }
96 req = rpc_req_fut => {
97 self.service_common_mut().process_rpc_request(req);
98 }
99 _ = timeout => {
100 self.store_mut().dispatch(EventSourceAction::WaitTimeout);
101 }
102 }
103 }
104 }
105
106 pub fn rpc(&mut self) -> RpcSender {
107 self.service_common_mut().rpc_sender()
108 }
109}
110
111impl<Serv> Clone for Node<Serv>
112where
113 Serv: Clone,
114{
115 fn clone(&self) -> Self {
116 Self {
117 store: self.store.clone(),
118 }
119 }
120}