mina_node_common/service/
service.rs1use super::{
2 archive::ArchiveService,
3 block_producer::BlockProducerService,
4 p2p::webrtc_with_libp2p::P2pServiceCtx,
5 replay::ReplayerState,
6 rpc::{RpcSender, RpcService},
7 snark_worker::SnarkWorker,
8 snarks::SnarkBlockVerifyArgs,
9 EventReceiver, EventSender,
10};
11use crate::rpc::RpcReceiver;
12use node::{
13 core::{channels::mpsc, invariants::InvariantsState},
14 event_source::Event,
15 ledger::LedgerManager,
16 p2p::identity::SecretKey as P2pSecretKey,
17 service::Recorder,
18 stats::Stats,
19 transition_frontier::genesis::GenesisConfig,
20};
21use rand::{rngs::StdRng, SeedableRng};
22use sha3::{
23 digest::{core_api::XofReaderCoreWrapper, ExtendableOutput, Update},
24 Shake256, Shake256ReaderCore,
25};
26use std::sync::Arc;
27
28pub struct NodeService {
29 pub rng_seed: [u8; 32],
31 pub rng_ephemeral: XofReaderCoreWrapper<Shake256ReaderCore>,
33 pub rng_static: XofReaderCoreWrapper<Shake256ReaderCore>,
35 pub rng: StdRng,
37
38 pub event_sender: EventSender,
41 pub event_receiver: EventReceiver,
43
44 pub snark_block_proof_verify: mpsc::TrackedUnboundedSender<SnarkBlockVerifyArgs>,
46
47 pub ledger_manager: LedgerManager,
49 pub snark_worker: Option<SnarkWorker>,
52 pub block_producer: Option<BlockProducerService>,
55 pub archive: Option<ArchiveService>,
58 pub p2p: P2pServiceCtx,
60
61 pub stats: Option<Stats>,
63 pub rpc: RpcService,
65 pub recorder: Recorder,
67 pub replayer: Option<ReplayerState>,
70 pub invariants_state: InvariantsState,
72}
73
74impl NodeService {
75 pub fn event_sender(&self) -> &EventSender {
76 &self.event_sender
77 }
78
79 pub fn rpc_sender(&self) -> RpcSender {
80 self.rpc.req_sender()
81 }
82
83 pub fn event_receiver_with_rpc_receiver(&mut self) -> (&mut EventReceiver, &mut RpcReceiver) {
84 (&mut self.event_receiver, self.rpc.req_receiver())
85 }
86
87 pub fn event_receiver(&mut self) -> &mut EventReceiver {
88 &mut self.event_receiver
89 }
90
91 pub fn rpc_receiver(&mut self) -> &mut RpcReceiver {
92 self.rpc.req_receiver()
93 }
94
95 pub fn ledger_manager(&self) -> &LedgerManager {
96 &self.ledger_manager
97 }
98
99 pub fn block_producer(&self) -> Option<&BlockProducerService> {
100 self.block_producer.as_ref()
101 }
102
103 pub fn archive(&self) -> Option<&ArchiveService> {
104 self.archive.as_ref()
105 }
106
107 pub fn stats(&mut self) -> Option<&mut Stats> {
108 self.stats.as_mut()
109 }
110
111 pub fn replayer(&mut self) -> Option<&mut ReplayerState> {
112 self.replayer.as_mut()
113 }
114}
115
116impl NodeService {
117 pub fn for_replay(
118 rng_seed: [u8; 32],
119 initial_time: redux::Timestamp,
120 p2p_sec_key: P2pSecretKey,
121 dynamic_effects_lib: Option<String>,
122 ) -> Self {
123 Self {
124 rng_seed,
125 rng_ephemeral: Shake256::default()
126 .chain(rng_seed)
127 .chain(b"ephemeral")
128 .finalize_xof(),
129 rng_static: Shake256::default()
130 .chain(rng_seed)
131 .chain(b"static")
132 .finalize_xof(),
133 rng: StdRng::from_seed(rng_seed),
134 event_sender: mpsc::unbounded_channel().0,
135 event_receiver: mpsc::unbounded_channel().1.into(),
136 snark_block_proof_verify: mpsc::unbounded_channel().0,
137 ledger_manager: LedgerManager::spawn(Default::default()),
138 snark_worker: None,
139 block_producer: None,
140 archive: None,
141 p2p: P2pServiceCtx::mocked(p2p_sec_key),
142 stats: Some(Stats::new()),
143 rpc: RpcService::new(),
144 recorder: Recorder::None,
145 replayer: Some(ReplayerState {
146 initial_monotonic: redux::Instant::now(),
147 initial_time,
148 expected_actions: Default::default(),
149 replay_dynamic_effects_lib: dynamic_effects_lib.unwrap_or_default(),
150 }),
151 invariants_state: Default::default(),
152 }
153 }
154}
155
156impl AsMut<NodeService> for NodeService {
157 fn as_mut(&mut self) -> &mut NodeService {
158 self
159 }
160}
161
162impl redux::Service for NodeService {}
163
164impl node::Service for NodeService {
165 fn queues(&mut self) -> node::service::Queues {
166 node::service::Queues {
167 events: self.event_receiver.len(),
168 snark_block_verify: self.snark_block_proof_verify.len(),
169 ledger: self.ledger_manager.pending_calls(),
170 vrf_evaluator: self
171 .block_producer
172 .as_ref()
173 .map(|v| v.vrf_pending_requests()),
174 block_prover: self
175 .block_producer
176 .as_ref()
177 .map(|v| v.prove_pending_requests()),
178 p2p_webrtc: self.p2p.webrtc.pending_cmds(),
179 #[cfg(feature = "p2p-libp2p")]
180 p2p_libp2p: self.p2p.mio.pending_cmds(),
181 rpc: self.rpc.req_receiver().len(),
182 }
183 }
184
185 fn stats(&mut self) -> Option<&mut Stats> {
186 self.stats()
187 }
188
189 fn recorder(&mut self) -> &mut Recorder {
190 &mut self.recorder
191 }
192
193 fn is_replay(&self) -> bool {
194 self.replayer.is_some()
195 }
196}
197
198impl redux::TimeService for NodeService {
199 fn monotonic_time(&mut self) -> redux::Instant {
200 self.replayer
201 .as_ref()
202 .map(|v| v.next_monotonic_time())
203 .unwrap_or_else(redux::Instant::now)
204 }
205}
206
207impl node::service::EventSourceService for NodeService {
208 fn next_event(&mut self) -> Option<Event> {
209 self.event_receiver.try_next()
210 }
211}
212
213impl node::service::LedgerService for NodeService {
214 fn ledger_manager(&self) -> &LedgerManager {
215 &self.ledger_manager
216 }
217
218 fn force_sync_calls(&self) -> bool {
219 self.replayer.is_some()
220 }
221}
222
223impl node::service::TransitionFrontierGenesisService for NodeService {
224 fn load_genesis(&mut self, config: Arc<GenesisConfig>) {
225 let res = match config.load() {
226 Err(err) => Err(err.to_string()),
227 Ok((masks, data)) => {
228 let is_archive = self.archive().is_some();
229 masks.into_iter().for_each(|mut mask| {
230 if !is_archive {
231 mask.unset_token_owners();
233 }
234 self.ledger_manager.insert_genesis_ledger(mask);
235 });
236 Ok(data)
237 }
238 };
239 let _ = self.event_sender.send(Event::GenesisLoad(res));
240 }
241}
242
243impl node::core::invariants::InvariantService for NodeService {
244 type ClusterInvariantsState<'a> = std::cell::RefMut<'a, InvariantsState>;
245
246 fn invariants_state(&mut self) -> &mut InvariantsState {
247 &mut self.invariants_state
248 }
249}