mina_node_common/service/
service.rs

1use super::{
2    archive::ArchiveService,
3    block_producer::BlockProducerService,
4    p2p::webrtc_with_libp2p::P2pServiceCtx,
5    replay::ReplayerState,
6    rpc::{RpcSender, RpcService},
7    snark_worker::SnarkWorker,
8    snarks::SnarkBlockVerifyArgs,
9    EventReceiver, EventSender,
10};
11use crate::rpc::RpcReceiver;
12use node::{
13    core::{channels::mpsc, invariants::InvariantsState},
14    event_source::Event,
15    ledger::LedgerManager,
16    p2p::identity::SecretKey as P2pSecretKey,
17    service::Recorder,
18    stats::Stats,
19    transition_frontier::genesis::GenesisConfig,
20};
21use rand::{rngs::StdRng, SeedableRng};
22use sha3::{
23    digest::{core_api::XofReaderCoreWrapper, ExtendableOutput, Update},
24    Shake256, Shake256ReaderCore,
25};
26use std::sync::Arc;
27
28pub struct NodeService {
29    /// Master seed for deterministic random number generation.
30    pub rng_seed: [u8; 32],
31    /// XOF-based RNG for ephemeral keys (derived from seed + "ephemeral").
32    pub rng_ephemeral: XofReaderCoreWrapper<Shake256ReaderCore>,
33    /// XOF-based RNG for static operations (derived from seed + "static").
34    pub rng_static: XofReaderCoreWrapper<Shake256ReaderCore>,
35    /// Standard RNG for general-purpose randomness.
36    pub rng: StdRng,
37
38    /// Events sent on this channel are retrieved and processed in the
39    /// `event_source` state machine defined in the `mina-node` crate.
40    pub event_sender: EventSender,
41    /// Channel for consuming events in the event source state machine.
42    pub event_receiver: EventReceiver,
43
44    /// Channel for asynchronous block proof verification requests.
45    pub snark_block_proof_verify: mpsc::TrackedUnboundedSender<SnarkBlockVerifyArgs>,
46
47    /// Manages ledger operations, database access, and staged ledger state.
48    pub ledger_manager: LedgerManager,
49    /// SNARK proof worker for generating transaction proofs (enabled when node
50    /// acts as SNARK worker).
51    pub snark_worker: Option<SnarkWorker>,
52    /// Block production service including VRF evaluation and block proving
53    /// (enabled when node acts as block producer).
54    pub block_producer: Option<BlockProducerService>,
55    /// Archive service for storing full blockchain history (enabled when node
56    /// acts as archive node).
57    pub archive: Option<ArchiveService>,
58    /// P2P networking context (WebRTC and optionally libp2p transports).
59    pub p2p: P2pServiceCtx,
60
61    /// Runtime statistics and metrics collection.
62    pub stats: Option<Stats>,
63    /// RPC service for external API queries.
64    pub rpc: RpcService,
65    /// Records node state and actions for debugging and replay.
66    pub recorder: Recorder,
67    /// Replayer state for deterministic action replay (only set in replay
68    /// mode).
69    pub replayer: Option<ReplayerState>,
70    /// State for runtime invariant checking and validation.
71    pub invariants_state: InvariantsState,
72}
73
74impl NodeService {
75    pub fn event_sender(&self) -> &EventSender {
76        &self.event_sender
77    }
78
79    pub fn rpc_sender(&self) -> RpcSender {
80        self.rpc.req_sender()
81    }
82
83    pub fn event_receiver_with_rpc_receiver(&mut self) -> (&mut EventReceiver, &mut RpcReceiver) {
84        (&mut self.event_receiver, self.rpc.req_receiver())
85    }
86
87    pub fn event_receiver(&mut self) -> &mut EventReceiver {
88        &mut self.event_receiver
89    }
90
91    pub fn rpc_receiver(&mut self) -> &mut RpcReceiver {
92        self.rpc.req_receiver()
93    }
94
95    pub fn ledger_manager(&self) -> &LedgerManager {
96        &self.ledger_manager
97    }
98
99    pub fn block_producer(&self) -> Option<&BlockProducerService> {
100        self.block_producer.as_ref()
101    }
102
103    pub fn archive(&self) -> Option<&ArchiveService> {
104        self.archive.as_ref()
105    }
106
107    pub fn stats(&mut self) -> Option<&mut Stats> {
108        self.stats.as_mut()
109    }
110
111    pub fn replayer(&mut self) -> Option<&mut ReplayerState> {
112        self.replayer.as_mut()
113    }
114}
115
116impl NodeService {
117    pub fn for_replay(
118        rng_seed: [u8; 32],
119        initial_time: redux::Timestamp,
120        p2p_sec_key: P2pSecretKey,
121        dynamic_effects_lib: Option<String>,
122    ) -> Self {
123        Self {
124            rng_seed,
125            rng_ephemeral: Shake256::default()
126                .chain(rng_seed)
127                .chain(b"ephemeral")
128                .finalize_xof(),
129            rng_static: Shake256::default()
130                .chain(rng_seed)
131                .chain(b"static")
132                .finalize_xof(),
133            rng: StdRng::from_seed(rng_seed),
134            event_sender: mpsc::unbounded_channel().0,
135            event_receiver: mpsc::unbounded_channel().1.into(),
136            snark_block_proof_verify: mpsc::unbounded_channel().0,
137            ledger_manager: LedgerManager::spawn(Default::default()),
138            snark_worker: None,
139            block_producer: None,
140            archive: None,
141            p2p: P2pServiceCtx::mocked(p2p_sec_key),
142            stats: Some(Stats::new()),
143            rpc: RpcService::new(),
144            recorder: Recorder::None,
145            replayer: Some(ReplayerState {
146                initial_monotonic: redux::Instant::now(),
147                initial_time,
148                expected_actions: Default::default(),
149                replay_dynamic_effects_lib: dynamic_effects_lib.unwrap_or_default(),
150            }),
151            invariants_state: Default::default(),
152        }
153    }
154}
155
156impl AsMut<NodeService> for NodeService {
157    fn as_mut(&mut self) -> &mut NodeService {
158        self
159    }
160}
161
162impl redux::Service for NodeService {}
163
164impl node::Service for NodeService {
165    fn queues(&mut self) -> node::service::Queues {
166        node::service::Queues {
167            events: self.event_receiver.len(),
168            snark_block_verify: self.snark_block_proof_verify.len(),
169            ledger: self.ledger_manager.pending_calls(),
170            vrf_evaluator: self
171                .block_producer
172                .as_ref()
173                .map(|v| v.vrf_pending_requests()),
174            block_prover: self
175                .block_producer
176                .as_ref()
177                .map(|v| v.prove_pending_requests()),
178            p2p_webrtc: self.p2p.webrtc.pending_cmds(),
179            #[cfg(feature = "p2p-libp2p")]
180            p2p_libp2p: self.p2p.mio.pending_cmds(),
181            rpc: self.rpc.req_receiver().len(),
182        }
183    }
184
185    fn stats(&mut self) -> Option<&mut Stats> {
186        self.stats()
187    }
188
189    fn recorder(&mut self) -> &mut Recorder {
190        &mut self.recorder
191    }
192
193    fn is_replay(&self) -> bool {
194        self.replayer.is_some()
195    }
196}
197
198impl redux::TimeService for NodeService {
199    fn monotonic_time(&mut self) -> redux::Instant {
200        self.replayer
201            .as_ref()
202            .map(|v| v.next_monotonic_time())
203            .unwrap_or_else(redux::Instant::now)
204    }
205}
206
207impl node::service::EventSourceService for NodeService {
208    fn next_event(&mut self) -> Option<Event> {
209        self.event_receiver.try_next()
210    }
211}
212
213impl node::service::LedgerService for NodeService {
214    fn ledger_manager(&self) -> &LedgerManager {
215        &self.ledger_manager
216    }
217
218    fn force_sync_calls(&self) -> bool {
219        self.replayer.is_some()
220    }
221}
222
223impl node::service::TransitionFrontierGenesisService for NodeService {
224    fn load_genesis(&mut self, config: Arc<GenesisConfig>) {
225        let res = match config.load() {
226            Err(err) => Err(err.to_string()),
227            Ok((masks, data)) => {
228                let is_archive = self.archive().is_some();
229                masks.into_iter().for_each(|mut mask| {
230                    if !is_archive {
231                        // Optimization: We don't need token owners if the node is not an archive
232                        mask.unset_token_owners();
233                    }
234                    self.ledger_manager.insert_genesis_ledger(mask);
235                });
236                Ok(data)
237            }
238        };
239        let _ = self.event_sender.send(Event::GenesisLoad(res));
240    }
241}
242
243impl node::core::invariants::InvariantService for NodeService {
244    type ClusterInvariantsState<'a> = std::cell::RefMut<'a, InvariantsState>;
245
246    fn invariants_state(&mut self) -> &mut InvariantsState {
247        &mut self.invariants_state
248    }
249}