mina_node_testing/simulator/
mod.rs1mod config;
2pub use config::*;
3
4use mina_p2p_messages::v2::{
5 CurrencyFeeStableV1, UnsignedExtendedUInt64Int64ForVersionTagsStableV1,
6};
7
8use std::{collections::BTreeSet, time::Duration};
9
10use node::{
11 ActionKind, ActionWithMeta, BlockProducerConfig, SnarkerConfig, SnarkerStrategy, State,
12};
13
14use crate::{
15 cluster::ClusterNodeId,
16 node::{Node, RustNodeBlockProducerTestingConfig, RustNodeTestingConfig},
17 scenario::ListenerNode,
18 scenarios::{ClusterRunner, RunCfg},
19 service::NodeTestingService,
20};
21
22pub struct Simulator {
23 initial_time: redux::Timestamp,
24 config: SimulatorConfig,
25 start_t: Option<redux::Instant>,
26}
27
28impl Simulator {
29 pub fn new(initial_time: redux::Timestamp, config: SimulatorConfig) -> Self {
30 Self {
31 initial_time,
32 config,
33 start_t: None,
34 }
35 }
36
37 fn initial_time(&self) -> redux::Timestamp {
38 self.initial_time
39 }
40
41 async fn seed_config_async(&self, _runner: &ClusterRunner<'_>) -> RustNodeTestingConfig {
42 RustNodeTestingConfig {
43 initial_time: self.initial_time(),
44 genesis: self.config.genesis.clone(),
45 max_peers: 1000,
46 initial_peers: Vec::new(),
47 peer_id: Default::default(),
48 block_producer: None,
49 snark_worker: None,
50 timeouts: Default::default(),
51 libp2p_port: None,
52 recorder: self.config.recorder.clone(),
53 peer_discovery: true,
54 }
55 }
56
57 async fn wait_for_all_nodes_synced(&mut self, runner: &mut ClusterRunner<'_>) {
58 eprintln!("waiting for all rust nodes to sync up");
59 let is_synced = |state: &State| {
60 state.transition_frontier.sync.is_synced()
61 && state.transition_frontier.best_tip().is_some()
62 };
63 while !runner.nodes_iter().all(|(_, node)| is_synced(node.state())) {
64 runner
65 .run(
66 RunCfg::default()
67 .timeout(Duration::from_secs(300))
68 .action_handler(move |_, _, _, action| {
69 matches!(
70 action.action().kind(),
71 ActionKind::TransitionFrontierGenesisInject
72 | ActionKind::TransitionFrontierSynced
73 )
74 }),
75 )
76 .await
77 .expect("error while waiting to sync genesis block from ocaml");
78 }
79 eprintln!("all rust nodes synced up");
80 }
81
82 async fn set_up_seed_nodes(&mut self, runner: &mut ClusterRunner<'_>) {
83 eprintln!("setting up rust seed nodes: {}", self.config.seed_nodes);
84 let seed_config = self.seed_config_async(runner).await;
85
86 for _ in 0..(self.config.seed_nodes) {
87 runner.add_rust_node(seed_config.clone());
88 }
89
90 self.wait_for_all_nodes_synced(runner).await;
91 }
92
93 fn seed_nodes_iter<'a>(
94 &self,
95 runner: &'a ClusterRunner<'_>,
96 ) -> impl 'a + Iterator<Item = (ClusterNodeId, &'a Node)> {
97 runner.nodes_iter().take(self.config.seed_nodes)
98 }
99
100 fn seed_node_dial_addrs(&self, runner: &ClusterRunner<'_>) -> Vec<ListenerNode> {
101 self.seed_nodes_iter(runner)
102 .map(|(id, _)| id.into())
103 .collect()
104 }
105
106 async fn set_up_normal_nodes(&mut self, runner: &mut ClusterRunner<'_>) {
107 if self.config.normal_nodes == 0 {
108 return;
109 }
110
111 eprintln!("setting up normal nodes: {}", self.config.normal_nodes);
112
113 let node_config = RustNodeTestingConfig {
114 max_peers: 100,
115 initial_peers: self.seed_node_dial_addrs(runner),
116 ..self.seed_config_async(runner).await
117 };
118
119 for _ in 0..(self.config.normal_nodes) {
120 runner.add_rust_node(node_config.clone());
121 }
122
123 self.wait_for_all_nodes_synced(runner).await;
124 }
125
126 async fn set_up_snark_worker_nodes(&mut self, runner: &mut ClusterRunner<'_>) {
127 if self.config.snark_workers == 0 {
128 return;
129 }
130
131 eprintln!(
132 "setting up rust snark worker nodes: {}",
133 self.config.snark_workers
134 );
135
136 let node_config = RustNodeTestingConfig {
137 max_peers: 100,
138 initial_peers: self.seed_node_dial_addrs(runner),
139 ..self.seed_config_async(runner).await
140 };
141
142 let bp_pub_keys = runner
143 .nodes_iter()
144 .filter_map(|(_, node)| {
145 let sec_key = &node.config().block_producer.as_ref()?.sec_key;
146 Some(sec_key.public_key())
147 })
148 .collect::<BTreeSet<_>>();
149
150 let snarker_accounts = runner
151 .accounts_with_sec_keys(ClusterNodeId::new_unchecked(0))
152 .filter(|(sec_key, _)| !bp_pub_keys.contains(&sec_key.public_key()))
153 .take(self.config.snark_workers)
154 .collect::<Vec<_>>();
155
156 for (sec_key, account) in snarker_accounts {
157 eprintln!(
158 "snark worker({}) balance: {} mina",
159 sec_key.public_key(),
160 account.balance.to_amount().as_u64()
161 );
162 let config = RustNodeTestingConfig {
163 snark_worker: Some(SnarkerConfig {
164 public_key: sec_key.public_key(),
165 fee: CurrencyFeeStableV1(UnsignedExtendedUInt64Int64ForVersionTagsStableV1(
166 10_000_000.into(),
167 )),
168 strategy: SnarkerStrategy::Sequential,
169 auto_commit: true,
170 }),
171 ..node_config.clone()
172 };
173 runner.add_rust_node(config);
174 }
175
176 self.wait_for_all_nodes_synced(runner).await;
177 }
178
179 async fn set_up_block_producer_nodes(&mut self, runner: &mut ClusterRunner<'_>) {
180 if self.config.block_producers == 0 {
181 return;
182 }
183
184 let block_producers = runner.block_producer_sec_keys(ClusterNodeId::new_unchecked(0));
185
186 assert!(self.config.block_producers <= block_producers.len());
187 eprintln!(
188 "setting up rust block producer nodes: {}/{}",
189 self.config.block_producers,
190 block_producers.len()
191 );
192
193 let node_config = RustNodeTestingConfig {
194 max_peers: 100,
195 initial_peers: self.seed_node_dial_addrs(runner),
196 ..self.seed_config_async(runner).await
197 };
198
199 for (sec_key, stake) in block_producers
200 .into_iter()
201 .take(self.config.block_producers)
202 {
203 eprintln!(
204 "block producer({}) stake: {stake} mina",
205 sec_key.public_key()
206 );
207 let config = RustNodeTestingConfig {
208 block_producer: Some(RustNodeBlockProducerTestingConfig {
209 config: BlockProducerConfig {
210 pub_key: sec_key.public_key().into(),
211 custom_coinbase_receiver: None,
212 proposed_protocol_version: None,
213 },
214 sec_key,
215 }),
216 ..node_config.clone()
217 };
218 runner.add_rust_node(config);
219 }
220
221 self.wait_for_all_nodes_synced(runner).await;
222 }
223
224 pub async fn setup_and_run_with_listener<AL, ALF>(
225 &mut self,
226 runner: &mut ClusterRunner<'_>,
227 listener: ALF,
228 ) where
229 ALF: FnMut() -> AL,
230 AL: 'static
231 + Send
232 + FnMut(ClusterNodeId, &State, &NodeTestingService, &ActionWithMeta) -> bool,
233 {
234 self.setup(runner).await;
235 self.run_with_listener(runner, listener).await;
236 }
237
238 pub async fn setup_and_run(&mut self, runner: &mut ClusterRunner<'_>) {
239 self.setup(runner).await;
240 self.run_with_listener(runner, || |_, _, _, _| false).await;
241 }
242
243 pub async fn setup(&mut self, runner: &mut ClusterRunner<'_>) {
244 self.set_up_seed_nodes(runner).await;
245 self.set_up_normal_nodes(runner).await;
246 self.set_up_snark_worker_nodes(runner).await;
247 self.set_up_block_producer_nodes(runner).await;
248 }
249
250 pub async fn run(&mut self, runner: &mut ClusterRunner<'_>) {
251 self.run_with_listener(runner, || |_, _, _, _| false).await;
252 }
253
254 pub async fn run_with_listener<AL, ALF>(
255 &mut self,
256 runner: &mut ClusterRunner<'_>,
257 mut listener: ALF,
258 ) where
259 ALF: FnMut() -> AL,
260 AL: 'static
261 + Send
262 + FnMut(ClusterNodeId, &State, &NodeTestingService, &ActionWithMeta) -> bool,
263 {
264 let run_until = self.config.run_until.clone();
265 let advance_time = self.config.advance_time.clone();
266 let start_t = *self.start_t.get_or_insert_with(redux::Instant::now);
267 let mut last_printed_slot = 0;
268 let virtual_initial_time = self.initial_time();
269
270 while start_t.elapsed() < self.config.run_until_timeout {
271 tokio::task::yield_now().await;
272 let cfg = RunCfg::default()
273 .advance_time(advance_time.clone())
274 .timeout(Duration::ZERO)
275 .action_handler(listener());
276 let _ = runner.run(cfg).await;
277
278 let printed_elapsed_time = {
279 let state = runner.nodes_iter().next().unwrap().1.state();
280 if let Some(cur_slot) = state
281 .cur_global_slot()
282 .filter(|cur| *cur > last_printed_slot)
283 {
284 let real_elapsed = start_t.elapsed();
285 let virtual_elapsed = state.time().checked_sub(virtual_initial_time).unwrap();
286 last_printed_slot = cur_slot;
287
288 eprintln!("[elapsed] real: {real_elapsed:?}, virtual: {virtual_elapsed:?}, global_slot: {cur_slot}");
289 true
290 } else {
291 false
292 }
293 };
294
295 if printed_elapsed_time {
296 for (node_id, node) in runner.nodes_iter() {
297 let Some(best_tip) = node.state().transition_frontier.best_tip() else {
298 continue;
299 };
300 let consensus_state = &best_tip.header().protocol_state.body.consensus_state;
301
302 eprintln!(
303 "[node_status] node_{node_id} {} - {} [{}]; snarks: {}",
304 best_tip.height(),
305 best_tip.hash(),
306 best_tip.producer(),
307 best_tip.staged_ledger_diff().0.completed_works.len(),
308 );
309 let stop = match &run_until {
310 SimulatorRunUntil::Forever => false,
311 SimulatorRunUntil::Epoch(epoch) => {
312 consensus_state.epoch_count.as_u32() >= *epoch
313 }
314 SimulatorRunUntil::BlockchainLength(height) => {
315 let start_height = node::core::constants::constraint_constants()
316 .fork
317 .as_ref()
318 .map_or(0, |c| c.blockchain_length);
319 best_tip.height() >= start_height + *height
320 }
321 };
322 if stop {
323 return;
324 }
325 }
326 }
327 }
328
329 panic!("simulation timed out");
330 }
331}