openmina_node_common/service/
service.rs1use std::sync::Arc;
2
3use node::{
4 core::{channels::mpsc, invariants::InvariantsState},
5 event_source::Event,
6 ledger::LedgerManager,
7 p2p::identity::SecretKey as P2pSecretKey,
8 service::Recorder,
9 stats::Stats,
10 transition_frontier::genesis::GenesisConfig,
11};
12use rand::{rngs::StdRng, SeedableRng};
13use sha3::{
14 digest::{core_api::XofReaderCoreWrapper, ExtendableOutput, Update},
15 Shake256, Shake256ReaderCore,
16};
17
18use crate::rpc::RpcReceiver;
19
20use super::{
21 archive::ArchiveService,
22 block_producer::BlockProducerService,
23 p2p::webrtc_with_libp2p::P2pServiceCtx,
24 replay::ReplayerState,
25 rpc::{RpcSender, RpcService},
26 snark_worker::SnarkWorker,
27 snarks::SnarkBlockVerifyArgs,
28 EventReceiver, EventSender,
29};
30
31pub struct NodeService {
32 pub rng_seed: [u8; 32],
33 pub rng_ephemeral: XofReaderCoreWrapper<Shake256ReaderCore>,
34 pub rng_static: XofReaderCoreWrapper<Shake256ReaderCore>,
35 pub rng: StdRng,
36
37 pub event_sender: EventSender,
40 pub event_receiver: EventReceiver,
41
42 pub snark_block_proof_verify: mpsc::TrackedUnboundedSender<SnarkBlockVerifyArgs>,
43
44 pub ledger_manager: LedgerManager,
45 pub snark_worker: Option<SnarkWorker>,
46 pub block_producer: Option<BlockProducerService>,
47 pub archive: Option<ArchiveService>,
48 pub p2p: P2pServiceCtx,
49
50 pub stats: Option<Stats>,
51 pub rpc: RpcService,
52 pub recorder: Recorder,
53 pub replayer: Option<ReplayerState>,
54 pub invariants_state: InvariantsState,
55}
56
57impl NodeService {
58 pub fn event_sender(&self) -> &EventSender {
59 &self.event_sender
60 }
61
62 pub fn rpc_sender(&self) -> RpcSender {
63 self.rpc.req_sender()
64 }
65
66 pub fn event_receiver_with_rpc_receiver(&mut self) -> (&mut EventReceiver, &mut RpcReceiver) {
67 (&mut self.event_receiver, self.rpc.req_receiver())
68 }
69
70 pub fn event_receiver(&mut self) -> &mut EventReceiver {
71 &mut self.event_receiver
72 }
73
74 pub fn rpc_receiver(&mut self) -> &mut RpcReceiver {
75 self.rpc.req_receiver()
76 }
77
78 pub fn ledger_manager(&self) -> &LedgerManager {
79 &self.ledger_manager
80 }
81
82 pub fn block_producer(&self) -> Option<&BlockProducerService> {
83 self.block_producer.as_ref()
84 }
85
86 pub fn archive(&self) -> Option<&ArchiveService> {
87 self.archive.as_ref()
88 }
89
90 pub fn stats(&mut self) -> Option<&mut Stats> {
91 self.stats.as_mut()
92 }
93
94 pub fn replayer(&mut self) -> Option<&mut ReplayerState> {
95 self.replayer.as_mut()
96 }
97}
98
99impl NodeService {
100 pub fn for_replay(
101 rng_seed: [u8; 32],
102 initial_time: redux::Timestamp,
103 p2p_sec_key: P2pSecretKey,
104 dynamic_effects_lib: Option<String>,
105 ) -> Self {
106 Self {
107 rng_seed,
108 rng_ephemeral: Shake256::default()
109 .chain(rng_seed)
110 .chain(b"ephemeral")
111 .finalize_xof(),
112 rng_static: Shake256::default()
113 .chain(rng_seed)
114 .chain(b"static")
115 .finalize_xof(),
116 rng: StdRng::from_seed(rng_seed),
117 event_sender: mpsc::unbounded_channel().0,
118 event_receiver: mpsc::unbounded_channel().1.into(),
119 snark_block_proof_verify: mpsc::unbounded_channel().0,
120 ledger_manager: LedgerManager::spawn(Default::default()),
121 snark_worker: None,
122 block_producer: None,
123 archive: None,
124 p2p: P2pServiceCtx::mocked(p2p_sec_key),
125 stats: Some(Stats::new()),
126 rpc: RpcService::new(),
127 recorder: Recorder::None,
128 replayer: Some(ReplayerState {
129 initial_monotonic: redux::Instant::now(),
130 initial_time,
131 expected_actions: Default::default(),
132 replay_dynamic_effects_lib: dynamic_effects_lib.unwrap_or_default(),
133 }),
134 invariants_state: Default::default(),
135 }
136 }
137}
138
139impl AsMut<NodeService> for NodeService {
140 fn as_mut(&mut self) -> &mut NodeService {
141 self
142 }
143}
144
145impl redux::Service for NodeService {}
146
147impl node::Service for NodeService {
148 fn queues(&mut self) -> node::service::Queues {
149 node::service::Queues {
150 events: self.event_receiver.len(),
151 snark_block_verify: self.snark_block_proof_verify.len(),
152 ledger: self.ledger_manager.pending_calls(),
153 vrf_evaluator: self
154 .block_producer
155 .as_ref()
156 .map(|v| v.vrf_pending_requests()),
157 block_prover: self
158 .block_producer
159 .as_ref()
160 .map(|v| v.prove_pending_requests()),
161 p2p_webrtc: self.p2p.webrtc.pending_cmds(),
162 #[cfg(feature = "p2p-libp2p")]
163 p2p_libp2p: self.p2p.mio.pending_cmds(),
164 rpc: self.rpc.req_receiver().len(),
165 }
166 }
167
168 fn stats(&mut self) -> Option<&mut Stats> {
169 self.stats()
170 }
171
172 fn recorder(&mut self) -> &mut Recorder {
173 &mut self.recorder
174 }
175
176 fn is_replay(&self) -> bool {
177 self.replayer.is_some()
178 }
179}
180
181impl redux::TimeService for NodeService {
182 fn monotonic_time(&mut self) -> redux::Instant {
183 self.replayer
184 .as_ref()
185 .map(|v| v.next_monotonic_time())
186 .unwrap_or_else(redux::Instant::now)
187 }
188}
189
190impl node::service::EventSourceService for NodeService {
191 fn next_event(&mut self) -> Option<Event> {
192 self.event_receiver.try_next()
193 }
194}
195
196impl node::service::LedgerService for NodeService {
197 fn ledger_manager(&self) -> &LedgerManager {
198 &self.ledger_manager
199 }
200
201 fn force_sync_calls(&self) -> bool {
202 self.replayer.is_some()
203 }
204}
205
206impl node::service::TransitionFrontierGenesisService for NodeService {
207 fn load_genesis(&mut self, config: Arc<GenesisConfig>) {
208 let res = match config.load() {
209 Err(err) => Err(err.to_string()),
210 Ok((masks, data)) => {
211 let is_archive = self.archive().is_some();
212 masks.into_iter().for_each(|mut mask| {
213 if !is_archive {
214 mask.unset_token_owners();
216 }
217 self.ledger_manager.insert_genesis_ledger(mask);
218 });
219 Ok(data)
220 }
221 };
222 let _ = self.event_sender.send(Event::GenesisLoad(res));
223 }
224}
225
226impl node::core::invariants::InvariantService for NodeService {
227 type ClusterInvariantsState<'a> = std::cell::RefMut<'a, InvariantsState>;
228
229 fn invariants_state(&mut self) -> &mut InvariantsState {
230 &mut self.invariants_state
231 }
232}