mina_node_testing/cluster/runner/
mod.rs1mod run;
2pub use run::*;
3
4use std::{path::PathBuf, time::Duration};
5
6use ledger::BaseLedger;
7use node::{
8 account::{AccountPublicKey, AccountSecretKey},
9 event_source::Event,
10 ledger::LedgerService,
11 ActionKind, State,
12};
13use rand::{rngs::StdRng, SeedableRng};
14use time::OffsetDateTime;
15
16use crate::{
17 cluster::{Cluster, ClusterNodeId, ClusterOcamlNodeId},
18 network_debugger::Debugger,
19 node::{
20 DaemonJson, DaemonJsonGenConfig, Node, NodeTestingConfig, NonDeterministicEvent, OcamlNode,
21 OcamlNodeTestingConfig, OcamlStep, RustNodeTestingConfig,
22 },
23 scenario::ScenarioStep,
24 service::{DynEffects, PendingEventId},
25};
26
27pub struct ClusterRunner<'a> {
28 cluster: &'a mut Cluster,
29 add_step: Box<dyn 'a + Send + FnMut(&ScenarioStep)>,
30 rng: StdRng,
31 latest_advance_time: Option<redux::Timestamp>,
32}
33
34impl<'a> ClusterRunner<'a> {
35 pub fn new<F>(cluster: &'a mut Cluster, add_step: F) -> Self
36 where
37 F: 'a + Send + FnMut(&ScenarioStep),
38 {
39 Self {
40 cluster,
41 add_step: Box::new(add_step),
42 rng: StdRng::seed_from_u64(0),
43 latest_advance_time: None,
44 }
45 }
46
47 pub fn node(&self, node_id: ClusterNodeId) -> Option<&Node> {
48 self.cluster.node(node_id)
49 }
50
51 fn node_mut(&mut self, node_id: ClusterNodeId) -> Option<&mut Node> {
52 self.cluster.node_mut(node_id)
53 }
54
55 pub fn ocaml_node(&self, node_id: ClusterOcamlNodeId) -> Option<&OcamlNode> {
56 self.cluster.ocaml_node(node_id)
57 }
58
59 pub fn nodes_iter(&self) -> impl Iterator<Item = (ClusterNodeId, &Node)> {
60 self.cluster.nodes_iter()
61 }
62
63 pub fn ocaml_nodes_iter(&self) -> impl Iterator<Item = (ClusterOcamlNodeId, &OcamlNode)> {
64 self.cluster.ocaml_nodes_iter()
65 }
66
67 pub fn daemon_json_gen(
68 &mut self,
69 genesis_timestamp: &str,
70 config: DaemonJsonGenConfig,
71 ) -> DaemonJson {
72 DaemonJson::gen(
73 |sec_key| self.cluster.add_account_sec_key(sec_key),
74 genesis_timestamp,
75 config,
76 )
77 }
78
79 pub fn daemon_json_gen_with_counts(
80 &mut self,
81 genesis_timestamp: &str,
82 whales_n: usize,
83 fish_n: usize,
84 ) -> DaemonJson {
85 DaemonJson::gen_with_counts(
86 |sec_key| self.cluster.add_account_sec_key(sec_key),
87 genesis_timestamp,
88 whales_n,
89 fish_n,
90 )
91 }
92
93 pub fn daemon_json_load(&mut self, path: PathBuf, genesis_timestamp: &str) -> DaemonJson {
94 DaemonJson::load(
95 |sec_key| self.cluster.add_account_sec_key(sec_key),
96 path,
97 Some(genesis_timestamp),
98 )
99 }
100
101 pub fn get_initial_time(&self) -> Option<redux::Timestamp> {
102 self.cluster.get_initial_time()
103 }
104
105 pub fn set_initial_time(&mut self, initial_time: redux::Timestamp) {
106 self.cluster.set_initial_time(initial_time)
107 }
108
109 pub fn get_account_sec_key(&self, pub_key: &AccountPublicKey) -> Option<&AccountSecretKey> {
110 self.cluster.get_account_sec_key(pub_key)
111 }
112
113 pub fn add_rust_node(&mut self, testing_config: RustNodeTestingConfig) -> ClusterNodeId {
114 let step = ScenarioStep::AddNode {
115 config: Box::new(testing_config.into()),
116 };
117 (self.add_step)(&step);
118 let ScenarioStep::AddNode { config } = step else {
119 unreachable!()
120 };
121 let NodeTestingConfig::Rust(config) = *config else {
122 unreachable!()
123 };
124
125 self.cluster.add_rust_node(config)
126 }
127
128 pub fn add_ocaml_node(&mut self, testing_config: OcamlNodeTestingConfig) -> ClusterOcamlNodeId {
129 let step = ScenarioStep::AddNode {
130 config: Box::new(testing_config.into()),
131 };
132 (self.add_step)(&step);
133 let ScenarioStep::AddNode { config } = step else {
134 unreachable!()
135 };
136 let NodeTestingConfig::Ocaml(config) = *config else {
137 unreachable!()
138 };
139
140 self.cluster.add_ocaml_node(config)
141 }
142
143 pub async fn exec_step(&mut self, step: ScenarioStep) -> anyhow::Result<bool> {
144 match &step {
145 ScenarioStep::Event { node_id, event } => {
146 let node_id = *node_id;
147 let event_id = self.cluster.wait_for_pending_event(node_id, event).await?;
148 let node = self.cluster.node(node_id).unwrap();
149 let event_ref = node.get_pending_event(event_id).unwrap();
150 if let Some(event) = NonDeterministicEvent::new(event_ref) {
151 (self.add_step)(&ScenarioStep::NonDeterministicEvent { node_id, event });
152 } else {
153 (self.add_step)(&step);
154 }
155 Ok(self
156 .node_mut(node_id)
157 .unwrap()
158 .take_event_and_dispatch(event_id))
159 }
160 _ => {
161 (self.add_step)(&step);
162 self.cluster.exec_step(step).await
163 }
164 }
165 }
166
167 async fn exec_step_with_dyn_effects(
168 &mut self,
169 dyn_effects: DynEffects,
170 node_id: ClusterNodeId,
171 step: ScenarioStep,
172 ) -> DynEffects {
173 self.node_mut(node_id).unwrap().set_dyn_effects(dyn_effects);
174 self.exec_step(step).await.unwrap();
175 self.node_mut(node_id)
176 .unwrap()
177 .remove_dyn_effects()
178 .unwrap()
179 }
180
181 pub async fn run_until_nodes_synced(
182 &mut self,
183 mut timeout: Duration,
184 nodes: &[ClusterNodeId],
185 ) -> anyhow::Result<()> {
186 while !timeout.is_zero()
187 && !nodes.iter().all(|node| {
188 self.node(*node)
189 .unwrap()
190 .state()
191 .transition_frontier
192 .sync
193 .is_synced()
194 })
195 {
196 let t = redux::Instant::now();
197 self.run(
198 RunCfg::default()
199 .timeout(timeout)
200 .action_handler(|_, _, _, action| {
201 matches!(action.action().kind(), ActionKind::TransitionFrontierSynced)
202 }),
203 )
204 .await?;
205 timeout = timeout.checked_sub(t.elapsed()).unwrap_or_default();
206 }
207 if timeout.is_zero() {
208 anyhow::bail!("timeout has elapsed while waiting for nodes to be synced");
209 }
210 Ok(())
211 }
212
213 pub fn pending_events(
214 &mut self,
215 poll: bool,
216 ) -> impl Iterator<
217 Item = (
218 ClusterNodeId,
219 &State,
220 impl Iterator<Item = (PendingEventId, &Event)>,
221 ),
222 > {
223 self.cluster.pending_events(poll)
224 }
225
226 pub fn node_pending_events(
227 &mut self,
228 node_id: ClusterNodeId,
229 poll: bool,
230 ) -> anyhow::Result<(&State, impl Iterator<Item = (PendingEventId, &Event)>)> {
231 self.cluster.node_pending_events(node_id, poll)
232 }
233
234 pub async fn wait_for_pending_events(&mut self) {
235 self.cluster.wait_for_pending_events().await
236 }
237
238 pub async fn wait_for_pending_events_with_timeout(&mut self, timeout: Duration) -> bool {
239 self.cluster
240 .wait_for_pending_events_with_timeout(timeout)
241 .await
242 }
243
244 pub fn debugger(&self) -> Option<&Debugger> {
245 self.cluster.debugger()
246 }
247
248 pub fn block_producer_sec_keys(&self, node_id: ClusterNodeId) -> Vec<(AccountSecretKey, u64)> {
254 let Some(block_producers) = None.or_else(|| {
255 let node = self.node(node_id)?;
256 let best_tip = node.state().transition_frontier.best_tip()?;
257 let staking_ledger_hash = best_tip.staking_epoch_ledger_hash();
258 LedgerService::ledger_manager(node.service()).producers_with_delegates(
259 staking_ledger_hash,
260 move |pub_key| {
261 pub_key != &AccountSecretKey::genesis_producer().public_key_compressed()
262 },
263 )
264 }) else {
265 return Default::default();
266 };
267
268 let mut block_producers = block_producers
269 .into_iter()
270 .map(|(pub_key, delegates)| {
271 let sec_key = self
272 .get_account_sec_key(&pub_key)
273 .expect("sec key for block producer not found");
274 let stake: u64 = delegates.into_iter().map(|(_, _, balance)| balance).sum();
275 (sec_key.clone(), stake)
276 })
277 .collect::<Vec<_>>();
278
279 block_producers.sort_by(|(_, s1), (_, s2)| s2.cmp(s1));
281 block_producers
282 }
283
284 pub fn accounts_with_sec_keys<'b>(
285 &'b self,
286 node_id: ClusterNodeId,
287 ) -> Box<dyn 'b + Iterator<Item = (AccountSecretKey, Box<ledger::Account>)>> {
288 let Some(mask) = self.node(node_id).and_then(|node| {
289 let best_tip = node.state().transition_frontier.best_tip()?;
290 let ledger_hash = best_tip.merkle_root_hash();
291 let (mask, _) = LedgerService::ledger_manager(node.service()).get_mask(ledger_hash)?;
292 Some(mask)
293 }) else {
294 return Box::new(std::iter::empty());
295 };
296
297 let depth = mask.depth() as usize;
298 let num_accounts = mask.num_accounts() as u64;
299 Box::new(
300 (0..num_accounts)
301 .map(ledger::AccountIndex)
302 .filter_map(move |index| mask.get(ledger::Address::from_index(index, depth)))
303 .filter_map(|account| {
304 let pub_key = account.public_key.clone().into();
305 let sec_key = self.get_account_sec_key(&pub_key)?;
306 Some((sec_key.clone(), account))
307 }),
308 )
309 }
310
311 pub async fn produce_blocks_until<F>(
314 &mut self,
315 producer_node: ClusterNodeId,
316 log_tag: &str,
317 timeout: Duration,
318 step_duration: Duration,
319 keep_synced: bool,
320 predicate: F,
321 ) -> u32
322 where
323 F: Fn(&State, u32, u32) -> bool,
324 {
325 let now = redux::Instant::now();
326
327 let mut last_slot: u32 = 0;
328 let mut produced_blocks: u32 = 0;
329
330 let nodes: Vec<_> = self.nodes_iter().map(|(id, _)| id).collect();
331 while now.elapsed() <= timeout {
332 if last_slot == 0 {
335 let by_nanos = Duration::from_secs(3 * 60).as_nanos() as u64;
336 self.exec_step(ScenarioStep::AdvanceTime { by_nanos })
337 .await
338 .unwrap();
339 }
340
341 let _ = self.run(RunCfg::default().timeout(step_duration)).await;
343 if keep_synced {
344 self.run_until_nodes_synced(Duration::from_secs(5 * 60), &nodes)
346 .await
347 .unwrap();
348 }
349
350 let (state, _) = self.node_pending_events(producer_node, false).unwrap();
351
352 let current_state_machine_time = state.time();
353 let current_state_machine_time_u64: u64 = current_state_machine_time.into();
354 let current_state_machine_time_formated =
355 OffsetDateTime::from_unix_timestamp_nanos(current_state_machine_time_u64 as i128)
356 .unwrap();
357
358 let best_tip = if let Some(best_tip) = state.transition_frontier.best_tip() {
359 best_tip
360 } else {
361 eprintln!("[{log_tag}] No best tip");
362 continue;
363 };
364
365 let current_global_slot = state.cur_global_slot().unwrap();
366
367 let next_won_slot = state
368 .block_producer
369 .vrf_evaluator()
370 .and_then(|vrf_state| vrf_state.next_won_slot(current_global_slot, best_tip));
371
372 let best_tip_slot = &best_tip
373 .consensus_state()
374 .curr_global_slot_since_hard_fork
375 .slot_number
376 .as_u32();
377
378 let current_time = OffsetDateTime::now_utc();
379 eprintln!("[{log_tag}][{current_time}][{current_state_machine_time_formated}] Slot(best tip / current slot): {best_tip_slot} / {current_global_slot}");
380
381 if best_tip_slot <= &0 {
382 let by_nanos = Duration::from_secs(3 * 60).as_nanos() as u64;
383 self.exec_step(ScenarioStep::AdvanceTime { by_nanos })
384 .await
385 .unwrap();
386 continue;
387 } else if best_tip_slot > &last_slot {
388 last_slot = *best_tip_slot;
389 produced_blocks += 1;
390 } else {
391 continue;
392 }
393
394 let (state, _) = self.node_pending_events(producer_node, false).unwrap();
395
396 if predicate(state, last_slot, produced_blocks) {
397 eprintln!("[{log_tag}] Condition met");
398 return produced_blocks;
399 }
400
401 if let Some(won_slot) = next_won_slot {
402 if let Some(diff) = won_slot.slot_time.checked_sub(current_state_machine_time) {
403 eprintln!("[{log_tag}] advancing time by {diff:?}");
404 let by_nanos = diff.as_nanos() as u64;
405 self.exec_step(ScenarioStep::AdvanceTime { by_nanos })
406 .await
407 .unwrap();
408 } else {
409 continue;
410 }
411 } else {
412 continue;
413 }
414 }
415
416 panic!("Global timeout reached");
417 }
418
419 pub async fn advance_to_epoch_bounds(
422 &mut self,
423 producer_node: ClusterNodeId,
424 timeout: Duration,
425 step_duration: Duration,
426 ) -> u32 {
427 const SLOTS_PER_EPOCH: u32 = 7_140;
428
429 let (state, _) = self.node_pending_events(producer_node, false).unwrap();
430 let current_epoch = state.current_epoch().unwrap();
431 let latest_slot = state.cur_global_slot().unwrap();
432 let current_epoch_end = current_epoch * SLOTS_PER_EPOCH + SLOTS_PER_EPOCH - 1;
433 let to_epoch_bound = ((current_epoch_end - latest_slot) - 3) as u64;
434
435 let diff = Duration::from_secs(3 * 60 * to_epoch_bound);
436
437 eprintln!("[EPOCH BOUNDS] advancing time by {diff:?}");
438 let by_nanos = diff.as_nanos() as u64;
439 self.exec_step(ScenarioStep::AdvanceTime { by_nanos })
440 .await
441 .unwrap();
442
443 self.produce_blocks_until(
444 producer_node,
445 "EPOCH BOUNDS",
446 timeout,
447 step_duration,
448 true,
449 |state, last_slot, produced_blocks| {
450 eprintln!("\nSnarks: {}", state.snark_pool.last_index());
451 eprintln!("Produced blocks: {produced_blocks}");
452 last_slot >= current_epoch_end
453 },
454 )
455 .await
456 }
457
458 pub async fn wait_for_ocaml(&mut self, node_id: ClusterOcamlNodeId) {
459 self.exec_step(ScenarioStep::Ocaml {
460 node_id,
461 step: OcamlStep::WaitReady {
462 timeout: Duration::from_secs(6 * 60),
463 },
464 })
465 .await
466 .expect("Error waiting for ocaml node");
467 }
468}