openmina_node_common/service/
builder.rs

1use ledger::proofs::provers::BlockProver;
2use node::{
3    account::AccountSecretKey,
4    core::channels::mpsc,
5    ledger::{LedgerCtx, LedgerManager},
6    p2p::{
7        identity::SecretKey as P2pSecretKey,
8        service_impl::{
9            webrtc_with_libp2p::{P2pServiceCtx, P2pServiceWebrtcWithLibp2p},
10            TaskSpawner,
11        },
12    },
13    stats::Stats,
14};
15use rand::{rngs::StdRng, SeedableRng};
16use sha3::{
17    digest::{ExtendableOutput, Update},
18    Shake256,
19};
20
21use crate::{
22    rpc::{RpcSender, RpcService},
23    EventReceiver, EventSender, NodeService,
24};
25
26use super::{
27    archive::{config::ArchiveStorageOptions, ArchiveService},
28    block_producer::BlockProducerService,
29};
30
31pub struct NodeServiceCommonBuilder {
32    rng_seed: [u8; 32],
33    rng: StdRng,
34    /// Events sent on this channel are retrieved and processed in the
35    /// `event_source` state machine defined in the `openmina-node` crate.
36    event_sender: EventSender,
37    event_receiver: EventReceiver,
38    ledger_manager: Option<LedgerManager>,
39    block_producer: Option<BlockProducerService>,
40    archive: Option<ArchiveService>,
41    p2p: Option<P2pServiceCtx>,
42    gather_stats: bool,
43    rpc: RpcService,
44}
45
46#[derive(thiserror::Error, Debug, Clone)]
47pub enum NodeServiceCommonBuildError {
48    #[error("ledger was never initialized! Please call: NodeServiceBuilder::ledger_init")]
49    LedgerNotInit,
50    #[error("p2p was never initialized! Please call: NodeServiceBuilder::p2p_init")]
51    P2pNotInit,
52}
53
54impl NodeServiceCommonBuilder {
55    pub fn new(rng_seed: [u8; 32]) -> Self {
56        let (event_sender, event_receiver) = mpsc::unbounded_channel();
57        Self {
58            rng_seed,
59            rng: StdRng::from_seed(rng_seed),
60            event_sender,
61            event_receiver: event_receiver.into(),
62            ledger_manager: None,
63            block_producer: None,
64            archive: None,
65            p2p: None,
66            rpc: RpcService::new(),
67            gather_stats: false,
68        }
69    }
70
71    pub fn event_sender(&self) -> &EventSender {
72        &self.event_sender
73    }
74
75    pub fn rpc_sender(&self) -> RpcSender {
76        self.rpc.req_sender()
77    }
78
79    pub fn ledger_init(&mut self) -> &mut Self {
80        let mut ctx = LedgerCtx::default();
81        ctx.set_event_sender(self.event_sender.clone());
82        if self.archive.is_some() {
83            ctx.set_archive_mode();
84        };
85        self.ledger_manager = Some(LedgerManager::spawn(ctx));
86        self
87    }
88
89    pub fn block_producer_init(
90        &mut self,
91        keypair: AccountSecretKey,
92        provers: Option<BlockProver>,
93    ) -> &mut Self {
94        self.block_producer = Some(BlockProducerService::start(
95            self.event_sender.clone(),
96            keypair,
97            provers,
98        ));
99        self
100    }
101
102    pub fn archive_init(&mut self, options: ArchiveStorageOptions, work_dir: String) -> &mut Self {
103        self.archive = Some(ArchiveService::start(options, work_dir));
104        self
105    }
106
107    pub fn p2p_init<S: TaskSpawner>(
108        &mut self,
109        secret_key: P2pSecretKey,
110        task_spawner: S,
111    ) -> &mut Self {
112        self.p2p = Some(<NodeService as P2pServiceWebrtcWithLibp2p>::init(
113            secret_key.clone(),
114            task_spawner,
115            self.rng_seed,
116        ));
117        self
118    }
119
120    pub fn gather_stats(&mut self) -> &mut Self {
121        self.gather_stats = true;
122        self
123    }
124
125    pub fn build(self) -> Result<NodeService, NodeServiceCommonBuildError> {
126        let ledger_manager = self
127            .ledger_manager
128            .ok_or(NodeServiceCommonBuildError::LedgerNotInit)?;
129        let p2p = self.p2p.ok_or(NodeServiceCommonBuildError::P2pNotInit)?;
130
131        Ok(NodeService {
132            rng_seed: self.rng_seed,
133            rng_ephemeral: Shake256::default()
134                .chain(self.rng_seed)
135                .chain(b"ephemeral")
136                .finalize_xof(),
137            rng_static: Shake256::default()
138                .chain(self.rng_seed)
139                .chain(b"static")
140                .finalize_xof(),
141            rng: self.rng,
142            event_sender: self.event_sender.clone(),
143            event_receiver: self.event_receiver,
144            snark_block_proof_verify: NodeService::snark_block_proof_verifier_spawn(
145                self.event_sender,
146            ),
147            ledger_manager,
148            block_producer: self.block_producer,
149            // initialized in state machine.
150            snark_worker: None,
151            archive: self.archive,
152            p2p,
153            stats: self.gather_stats.then(Stats::new),
154            rpc: self.rpc,
155            recorder: Default::default(),
156            replayer: None,
157            invariants_state: Default::default(),
158        })
159    }
160}