openmina_node_common/service/
service.rs

1use std::sync::Arc;
2
3use node::{
4    core::{channels::mpsc, invariants::InvariantsState},
5    event_source::Event,
6    ledger::LedgerManager,
7    p2p::identity::SecretKey as P2pSecretKey,
8    service::Recorder,
9    stats::Stats,
10    transition_frontier::genesis::GenesisConfig,
11};
12use rand::{rngs::StdRng, SeedableRng};
13use sha3::{
14    digest::{core_api::XofReaderCoreWrapper, ExtendableOutput, Update},
15    Shake256, Shake256ReaderCore,
16};
17
18use crate::rpc::RpcReceiver;
19
20use super::{
21    archive::ArchiveService,
22    block_producer::BlockProducerService,
23    p2p::webrtc_with_libp2p::P2pServiceCtx,
24    replay::ReplayerState,
25    rpc::{RpcSender, RpcService},
26    snark_worker::SnarkWorker,
27    snarks::SnarkBlockVerifyArgs,
28    EventReceiver, EventSender,
29};
30
31pub struct NodeService {
32    pub rng_seed: [u8; 32],
33    pub rng_ephemeral: XofReaderCoreWrapper<Shake256ReaderCore>,
34    pub rng_static: XofReaderCoreWrapper<Shake256ReaderCore>,
35    pub rng: StdRng,
36
37    /// Events sent on this channel are retrieved and processed in the
38    /// `event_source` state machine defined in the `openmina-node` crate.
39    pub event_sender: EventSender,
40    pub event_receiver: EventReceiver,
41
42    pub snark_block_proof_verify: mpsc::TrackedUnboundedSender<SnarkBlockVerifyArgs>,
43
44    pub ledger_manager: LedgerManager,
45    pub snark_worker: Option<SnarkWorker>,
46    pub block_producer: Option<BlockProducerService>,
47    pub archive: Option<ArchiveService>,
48    pub p2p: P2pServiceCtx,
49
50    pub stats: Option<Stats>,
51    pub rpc: RpcService,
52    pub recorder: Recorder,
53    pub replayer: Option<ReplayerState>,
54    pub invariants_state: InvariantsState,
55}
56
57impl NodeService {
58    pub fn event_sender(&self) -> &EventSender {
59        &self.event_sender
60    }
61
62    pub fn rpc_sender(&self) -> RpcSender {
63        self.rpc.req_sender()
64    }
65
66    pub fn event_receiver_with_rpc_receiver(&mut self) -> (&mut EventReceiver, &mut RpcReceiver) {
67        (&mut self.event_receiver, self.rpc.req_receiver())
68    }
69
70    pub fn event_receiver(&mut self) -> &mut EventReceiver {
71        &mut self.event_receiver
72    }
73
74    pub fn rpc_receiver(&mut self) -> &mut RpcReceiver {
75        self.rpc.req_receiver()
76    }
77
78    pub fn ledger_manager(&self) -> &LedgerManager {
79        &self.ledger_manager
80    }
81
82    pub fn block_producer(&self) -> Option<&BlockProducerService> {
83        self.block_producer.as_ref()
84    }
85
86    pub fn archive(&self) -> Option<&ArchiveService> {
87        self.archive.as_ref()
88    }
89
90    pub fn stats(&mut self) -> Option<&mut Stats> {
91        self.stats.as_mut()
92    }
93
94    pub fn replayer(&mut self) -> Option<&mut ReplayerState> {
95        self.replayer.as_mut()
96    }
97}
98
99impl NodeService {
100    pub fn for_replay(
101        rng_seed: [u8; 32],
102        initial_time: redux::Timestamp,
103        p2p_sec_key: P2pSecretKey,
104        dynamic_effects_lib: Option<String>,
105    ) -> Self {
106        Self {
107            rng_seed,
108            rng_ephemeral: Shake256::default()
109                .chain(rng_seed)
110                .chain(b"ephemeral")
111                .finalize_xof(),
112            rng_static: Shake256::default()
113                .chain(rng_seed)
114                .chain(b"static")
115                .finalize_xof(),
116            rng: StdRng::from_seed(rng_seed),
117            event_sender: mpsc::unbounded_channel().0,
118            event_receiver: mpsc::unbounded_channel().1.into(),
119            snark_block_proof_verify: mpsc::unbounded_channel().0,
120            ledger_manager: LedgerManager::spawn(Default::default()),
121            snark_worker: None,
122            block_producer: None,
123            archive: None,
124            p2p: P2pServiceCtx::mocked(p2p_sec_key),
125            stats: Some(Stats::new()),
126            rpc: RpcService::new(),
127            recorder: Recorder::None,
128            replayer: Some(ReplayerState {
129                initial_monotonic: redux::Instant::now(),
130                initial_time,
131                expected_actions: Default::default(),
132                replay_dynamic_effects_lib: dynamic_effects_lib.unwrap_or_default(),
133            }),
134            invariants_state: Default::default(),
135        }
136    }
137}
138
139impl AsMut<NodeService> for NodeService {
140    fn as_mut(&mut self) -> &mut NodeService {
141        self
142    }
143}
144
145impl redux::Service for NodeService {}
146
147impl node::Service for NodeService {
148    fn queues(&mut self) -> node::service::Queues {
149        node::service::Queues {
150            events: self.event_receiver.len(),
151            snark_block_verify: self.snark_block_proof_verify.len(),
152            ledger: self.ledger_manager.pending_calls(),
153            vrf_evaluator: self
154                .block_producer
155                .as_ref()
156                .map(|v| v.vrf_pending_requests()),
157            block_prover: self
158                .block_producer
159                .as_ref()
160                .map(|v| v.prove_pending_requests()),
161            p2p_webrtc: self.p2p.webrtc.pending_cmds(),
162            #[cfg(feature = "p2p-libp2p")]
163            p2p_libp2p: self.p2p.mio.pending_cmds(),
164            rpc: self.rpc.req_receiver().len(),
165        }
166    }
167
168    fn stats(&mut self) -> Option<&mut Stats> {
169        self.stats()
170    }
171
172    fn recorder(&mut self) -> &mut Recorder {
173        &mut self.recorder
174    }
175
176    fn is_replay(&self) -> bool {
177        self.replayer.is_some()
178    }
179}
180
181impl redux::TimeService for NodeService {
182    fn monotonic_time(&mut self) -> redux::Instant {
183        self.replayer
184            .as_ref()
185            .map(|v| v.next_monotonic_time())
186            .unwrap_or_else(redux::Instant::now)
187    }
188}
189
190impl node::service::EventSourceService for NodeService {
191    fn next_event(&mut self) -> Option<Event> {
192        self.event_receiver.try_next()
193    }
194}
195
196impl node::service::LedgerService for NodeService {
197    fn ledger_manager(&self) -> &LedgerManager {
198        &self.ledger_manager
199    }
200
201    fn force_sync_calls(&self) -> bool {
202        self.replayer.is_some()
203    }
204}
205
206impl node::service::TransitionFrontierGenesisService for NodeService {
207    fn load_genesis(&mut self, config: Arc<GenesisConfig>) {
208        let res = match config.load() {
209            Err(err) => Err(err.to_string()),
210            Ok((masks, data)) => {
211                let is_archive = self.archive().is_some();
212                masks.into_iter().for_each(|mut mask| {
213                    if !is_archive {
214                        // Optimization: We don't need token owners if the node is not an archive
215                        mask.unset_token_owners();
216                    }
217                    self.ledger_manager.insert_genesis_ledger(mask);
218                });
219                Ok(data)
220            }
221        };
222        let _ = self.event_sender.send(Event::GenesisLoad(res));
223    }
224}
225
226impl node::core::invariants::InvariantService for NodeService {
227    type ClusterInvariantsState<'a> = std::cell::RefMut<'a, InvariantsState>;
228
229    fn invariants_state(&mut self) -> &mut InvariantsState {
230        &mut self.invariants_state
231    }
232}