mina_node_testing/simulator/
mod.rs

1mod config;
2pub use config::*;
3
4use mina_p2p_messages::v2::{
5    CurrencyFeeStableV1, UnsignedExtendedUInt64Int64ForVersionTagsStableV1,
6};
7
8use std::{collections::BTreeSet, time::Duration};
9
10use node::{
11    ActionKind, ActionWithMeta, BlockProducerConfig, SnarkerConfig, SnarkerStrategy, State,
12};
13
14use crate::{
15    cluster::ClusterNodeId,
16    node::{Node, RustNodeBlockProducerTestingConfig, RustNodeTestingConfig},
17    scenario::ListenerNode,
18    scenarios::{ClusterRunner, RunCfg},
19    service::NodeTestingService,
20};
21
22pub struct Simulator {
23    initial_time: redux::Timestamp,
24    config: SimulatorConfig,
25    start_t: Option<redux::Instant>,
26}
27
28impl Simulator {
29    pub fn new(initial_time: redux::Timestamp, config: SimulatorConfig) -> Self {
30        Self {
31            initial_time,
32            config,
33            start_t: None,
34        }
35    }
36
37    fn initial_time(&self) -> redux::Timestamp {
38        self.initial_time
39    }
40
41    async fn seed_config_async(&self, _runner: &ClusterRunner<'_>) -> RustNodeTestingConfig {
42        RustNodeTestingConfig {
43            initial_time: self.initial_time(),
44            genesis: self.config.genesis.clone(),
45            max_peers: 1000,
46            initial_peers: Vec::new(),
47            peer_id: Default::default(),
48            block_producer: None,
49            snark_worker: None,
50            timeouts: Default::default(),
51            libp2p_port: None,
52            recorder: self.config.recorder.clone(),
53            peer_discovery: true,
54        }
55    }
56
57    async fn wait_for_all_nodes_synced(&mut self, runner: &mut ClusterRunner<'_>) {
58        eprintln!("waiting for all rust nodes to sync up");
59        let is_synced = |state: &State| {
60            state.transition_frontier.sync.is_synced()
61                && state.transition_frontier.best_tip().is_some()
62        };
63        while !runner.nodes_iter().all(|(_, node)| is_synced(node.state())) {
64            runner
65                .run(
66                    RunCfg::default()
67                        .timeout(Duration::from_secs(300))
68                        .action_handler(move |_, _, _, action| {
69                            matches!(
70                                action.action().kind(),
71                                ActionKind::TransitionFrontierGenesisInject
72                                    | ActionKind::TransitionFrontierSynced
73                            )
74                        }),
75                )
76                .await
77                .expect("error while waiting to sync genesis block from ocaml");
78        }
79        eprintln!("all rust nodes synced up");
80    }
81
82    async fn set_up_seed_nodes(&mut self, runner: &mut ClusterRunner<'_>) {
83        eprintln!("setting up rust seed nodes: {}", self.config.seed_nodes);
84        let seed_config = self.seed_config_async(runner).await;
85
86        for _ in 0..(self.config.seed_nodes) {
87            runner.add_rust_node(seed_config.clone());
88        }
89
90        self.wait_for_all_nodes_synced(runner).await;
91    }
92
93    fn seed_nodes_iter<'a>(
94        &self,
95        runner: &'a ClusterRunner<'_>,
96    ) -> impl 'a + Iterator<Item = (ClusterNodeId, &'a Node)> {
97        runner.nodes_iter().take(self.config.seed_nodes)
98    }
99
100    fn seed_node_dial_addrs(&self, runner: &ClusterRunner<'_>) -> Vec<ListenerNode> {
101        self.seed_nodes_iter(runner)
102            .map(|(id, _)| id.into())
103            .collect()
104    }
105
106    async fn set_up_normal_nodes(&mut self, runner: &mut ClusterRunner<'_>) {
107        if self.config.normal_nodes == 0 {
108            return;
109        }
110
111        eprintln!("setting up normal nodes: {}", self.config.normal_nodes);
112
113        let node_config = RustNodeTestingConfig {
114            max_peers: 100,
115            initial_peers: self.seed_node_dial_addrs(runner),
116            ..self.seed_config_async(runner).await
117        };
118
119        for _ in 0..(self.config.normal_nodes) {
120            runner.add_rust_node(node_config.clone());
121        }
122
123        self.wait_for_all_nodes_synced(runner).await;
124    }
125
126    async fn set_up_snark_worker_nodes(&mut self, runner: &mut ClusterRunner<'_>) {
127        if self.config.snark_workers == 0 {
128            return;
129        }
130
131        eprintln!(
132            "setting up rust snark worker nodes: {}",
133            self.config.snark_workers
134        );
135
136        let node_config = RustNodeTestingConfig {
137            max_peers: 100,
138            initial_peers: self.seed_node_dial_addrs(runner),
139            ..self.seed_config_async(runner).await
140        };
141
142        let bp_pub_keys = runner
143            .nodes_iter()
144            .filter_map(|(_, node)| {
145                let sec_key = &node.config().block_producer.as_ref()?.sec_key;
146                Some(sec_key.public_key())
147            })
148            .collect::<BTreeSet<_>>();
149
150        let snarker_accounts = runner
151            .accounts_with_sec_keys(ClusterNodeId::new_unchecked(0))
152            .filter(|(sec_key, _)| !bp_pub_keys.contains(&sec_key.public_key()))
153            .take(self.config.snark_workers)
154            .collect::<Vec<_>>();
155
156        for (sec_key, account) in snarker_accounts {
157            eprintln!(
158                "snark worker({}) balance: {} mina",
159                sec_key.public_key(),
160                account.balance.to_amount().as_u64()
161            );
162            let config = RustNodeTestingConfig {
163                snark_worker: Some(SnarkerConfig {
164                    public_key: sec_key.public_key(),
165                    fee: CurrencyFeeStableV1(UnsignedExtendedUInt64Int64ForVersionTagsStableV1(
166                        10_000_000.into(),
167                    )),
168                    strategy: SnarkerStrategy::Sequential,
169                    auto_commit: true,
170                }),
171                ..node_config.clone()
172            };
173            runner.add_rust_node(config);
174        }
175
176        self.wait_for_all_nodes_synced(runner).await;
177    }
178
179    async fn set_up_block_producer_nodes(&mut self, runner: &mut ClusterRunner<'_>) {
180        if self.config.block_producers == 0 {
181            return;
182        }
183
184        let block_producers = runner.block_producer_sec_keys(ClusterNodeId::new_unchecked(0));
185
186        assert!(self.config.block_producers <= block_producers.len());
187        eprintln!(
188            "setting up rust block producer nodes: {}/{}",
189            self.config.block_producers,
190            block_producers.len()
191        );
192
193        let node_config = RustNodeTestingConfig {
194            max_peers: 100,
195            initial_peers: self.seed_node_dial_addrs(runner),
196            ..self.seed_config_async(runner).await
197        };
198
199        for (sec_key, stake) in block_producers
200            .into_iter()
201            .take(self.config.block_producers)
202        {
203            eprintln!(
204                "block producer({}) stake: {stake} mina",
205                sec_key.public_key()
206            );
207            let config = RustNodeTestingConfig {
208                block_producer: Some(RustNodeBlockProducerTestingConfig {
209                    config: BlockProducerConfig {
210                        pub_key: sec_key.public_key().into(),
211                        custom_coinbase_receiver: None,
212                        proposed_protocol_version: None,
213                    },
214                    sec_key,
215                }),
216                ..node_config.clone()
217            };
218            runner.add_rust_node(config);
219        }
220
221        self.wait_for_all_nodes_synced(runner).await;
222    }
223
224    pub async fn setup_and_run_with_listener<AL, ALF>(
225        &mut self,
226        runner: &mut ClusterRunner<'_>,
227        listener: ALF,
228    ) where
229        ALF: FnMut() -> AL,
230        AL: 'static
231            + Send
232            + FnMut(ClusterNodeId, &State, &NodeTestingService, &ActionWithMeta) -> bool,
233    {
234        self.setup(runner).await;
235        self.run_with_listener(runner, listener).await;
236    }
237
238    pub async fn setup_and_run(&mut self, runner: &mut ClusterRunner<'_>) {
239        self.setup(runner).await;
240        self.run_with_listener(runner, || |_, _, _, _| false).await;
241    }
242
243    pub async fn setup(&mut self, runner: &mut ClusterRunner<'_>) {
244        self.set_up_seed_nodes(runner).await;
245        self.set_up_normal_nodes(runner).await;
246        self.set_up_snark_worker_nodes(runner).await;
247        self.set_up_block_producer_nodes(runner).await;
248    }
249
250    pub async fn run(&mut self, runner: &mut ClusterRunner<'_>) {
251        self.run_with_listener(runner, || |_, _, _, _| false).await;
252    }
253
254    pub async fn run_with_listener<AL, ALF>(
255        &mut self,
256        runner: &mut ClusterRunner<'_>,
257        mut listener: ALF,
258    ) where
259        ALF: FnMut() -> AL,
260        AL: 'static
261            + Send
262            + FnMut(ClusterNodeId, &State, &NodeTestingService, &ActionWithMeta) -> bool,
263    {
264        let run_until = self.config.run_until.clone();
265        let advance_time = self.config.advance_time.clone();
266        let start_t = *self.start_t.get_or_insert_with(redux::Instant::now);
267        let mut last_printed_slot = 0;
268        let virtual_initial_time = self.initial_time();
269
270        while start_t.elapsed() < self.config.run_until_timeout {
271            tokio::task::yield_now().await;
272            let cfg = RunCfg::default()
273                .advance_time(advance_time.clone())
274                .timeout(Duration::ZERO)
275                .action_handler(listener());
276            let _ = runner.run(cfg).await;
277
278            let printed_elapsed_time = {
279                let state = runner.nodes_iter().next().unwrap().1.state();
280                if let Some(cur_slot) = state
281                    .cur_global_slot()
282                    .filter(|cur| *cur > last_printed_slot)
283                {
284                    let real_elapsed = start_t.elapsed();
285                    let virtual_elapsed = state.time().checked_sub(virtual_initial_time).unwrap();
286                    last_printed_slot = cur_slot;
287
288                    eprintln!("[elapsed] real: {real_elapsed:?}, virtual: {virtual_elapsed:?}, global_slot: {cur_slot}");
289                    true
290                } else {
291                    false
292                }
293            };
294
295            if printed_elapsed_time {
296                for (node_id, node) in runner.nodes_iter() {
297                    let Some(best_tip) = node.state().transition_frontier.best_tip() else {
298                        continue;
299                    };
300                    let consensus_state = &best_tip.header().protocol_state.body.consensus_state;
301
302                    eprintln!(
303                        "[node_status] node_{node_id} {} - {} [{}]; snarks: {}",
304                        best_tip.height(),
305                        best_tip.hash(),
306                        best_tip.producer(),
307                        best_tip.staged_ledger_diff().0.completed_works.len(),
308                    );
309                    let stop = match &run_until {
310                        SimulatorRunUntil::Forever => false,
311                        SimulatorRunUntil::Epoch(epoch) => {
312                            consensus_state.epoch_count.as_u32() >= *epoch
313                        }
314                        SimulatorRunUntil::BlockchainLength(height) => {
315                            let start_height = node::core::constants::constraint_constants()
316                                .fork
317                                .as_ref()
318                                .map_or(0, |c| c.blockchain_length);
319                            best_tip.height() >= start_height + *height
320                        }
321                    };
322                    if stop {
323                        return;
324                    }
325                }
326            }
327        }
328
329        panic!("simulation timed out");
330    }
331}