mina_node_testing/cluster/runner/
mod.rs

1mod run;
2pub use run::*;
3
4use std::{path::PathBuf, time::Duration};
5
6use ledger::BaseLedger;
7use node::{
8    account::{AccountPublicKey, AccountSecretKey},
9    event_source::Event,
10    ledger::LedgerService,
11    ActionKind, State,
12};
13use rand::{rngs::StdRng, SeedableRng};
14use time::OffsetDateTime;
15
16use crate::{
17    cluster::{Cluster, ClusterNodeId, ClusterOcamlNodeId},
18    network_debugger::Debugger,
19    node::{
20        DaemonJson, DaemonJsonGenConfig, Node, NodeTestingConfig, NonDeterministicEvent, OcamlNode,
21        OcamlNodeTestingConfig, OcamlStep, RustNodeTestingConfig,
22    },
23    scenario::ScenarioStep,
24    service::{DynEffects, PendingEventId},
25};
26
27pub struct ClusterRunner<'a> {
28    cluster: &'a mut Cluster,
29    add_step: Box<dyn 'a + Send + FnMut(&ScenarioStep)>,
30    rng: StdRng,
31    latest_advance_time: Option<redux::Timestamp>,
32}
33
34impl<'a> ClusterRunner<'a> {
35    pub fn new<F>(cluster: &'a mut Cluster, add_step: F) -> Self
36    where
37        F: 'a + Send + FnMut(&ScenarioStep),
38    {
39        Self {
40            cluster,
41            add_step: Box::new(add_step),
42            rng: StdRng::seed_from_u64(0),
43            latest_advance_time: None,
44        }
45    }
46
47    pub fn node(&self, node_id: ClusterNodeId) -> Option<&Node> {
48        self.cluster.node(node_id)
49    }
50
51    fn node_mut(&mut self, node_id: ClusterNodeId) -> Option<&mut Node> {
52        self.cluster.node_mut(node_id)
53    }
54
55    pub fn ocaml_node(&self, node_id: ClusterOcamlNodeId) -> Option<&OcamlNode> {
56        self.cluster.ocaml_node(node_id)
57    }
58
59    pub fn nodes_iter(&self) -> impl Iterator<Item = (ClusterNodeId, &Node)> {
60        self.cluster.nodes_iter()
61    }
62
63    pub fn ocaml_nodes_iter(&self) -> impl Iterator<Item = (ClusterOcamlNodeId, &OcamlNode)> {
64        self.cluster.ocaml_nodes_iter()
65    }
66
67    pub fn daemon_json_gen(
68        &mut self,
69        genesis_timestamp: &str,
70        config: DaemonJsonGenConfig,
71    ) -> DaemonJson {
72        DaemonJson::gen(
73            |sec_key| self.cluster.add_account_sec_key(sec_key),
74            genesis_timestamp,
75            config,
76        )
77    }
78
79    pub fn daemon_json_gen_with_counts(
80        &mut self,
81        genesis_timestamp: &str,
82        whales_n: usize,
83        fish_n: usize,
84    ) -> DaemonJson {
85        DaemonJson::gen_with_counts(
86            |sec_key| self.cluster.add_account_sec_key(sec_key),
87            genesis_timestamp,
88            whales_n,
89            fish_n,
90        )
91    }
92
93    pub fn daemon_json_load(&mut self, path: PathBuf, genesis_timestamp: &str) -> DaemonJson {
94        DaemonJson::load(
95            |sec_key| self.cluster.add_account_sec_key(sec_key),
96            path,
97            Some(genesis_timestamp),
98        )
99    }
100
101    pub fn get_initial_time(&self) -> Option<redux::Timestamp> {
102        self.cluster.get_initial_time()
103    }
104
105    pub fn set_initial_time(&mut self, initial_time: redux::Timestamp) {
106        self.cluster.set_initial_time(initial_time)
107    }
108
109    pub fn get_account_sec_key(&self, pub_key: &AccountPublicKey) -> Option<&AccountSecretKey> {
110        self.cluster.get_account_sec_key(pub_key)
111    }
112
113    pub fn add_rust_node(&mut self, testing_config: RustNodeTestingConfig) -> ClusterNodeId {
114        let step = ScenarioStep::AddNode {
115            config: Box::new(testing_config.into()),
116        };
117        (self.add_step)(&step);
118        let ScenarioStep::AddNode { config } = step else {
119            unreachable!()
120        };
121        let NodeTestingConfig::Rust(config) = *config else {
122            unreachable!()
123        };
124
125        self.cluster.add_rust_node(config)
126    }
127
128    pub fn add_ocaml_node(&mut self, testing_config: OcamlNodeTestingConfig) -> ClusterOcamlNodeId {
129        let step = ScenarioStep::AddNode {
130            config: Box::new(testing_config.into()),
131        };
132        (self.add_step)(&step);
133        let ScenarioStep::AddNode { config } = step else {
134            unreachable!()
135        };
136        let NodeTestingConfig::Ocaml(config) = *config else {
137            unreachable!()
138        };
139
140        self.cluster.add_ocaml_node(config)
141    }
142
143    pub async fn exec_step(&mut self, step: ScenarioStep) -> anyhow::Result<bool> {
144        match &step {
145            ScenarioStep::Event { node_id, event } => {
146                let node_id = *node_id;
147                let event_id = self.cluster.wait_for_pending_event(node_id, event).await?;
148                let node = self.cluster.node(node_id).unwrap();
149                let event_ref = node.get_pending_event(event_id).unwrap();
150                if let Some(event) = NonDeterministicEvent::new(event_ref) {
151                    (self.add_step)(&ScenarioStep::NonDeterministicEvent { node_id, event });
152                } else {
153                    (self.add_step)(&step);
154                }
155                Ok(self
156                    .node_mut(node_id)
157                    .unwrap()
158                    .take_event_and_dispatch(event_id))
159            }
160            _ => {
161                (self.add_step)(&step);
162                self.cluster.exec_step(step).await
163            }
164        }
165    }
166
167    async fn exec_step_with_dyn_effects(
168        &mut self,
169        dyn_effects: DynEffects,
170        node_id: ClusterNodeId,
171        step: ScenarioStep,
172    ) -> DynEffects {
173        self.node_mut(node_id).unwrap().set_dyn_effects(dyn_effects);
174        self.exec_step(step).await.unwrap();
175        self.node_mut(node_id)
176            .unwrap()
177            .remove_dyn_effects()
178            .unwrap()
179    }
180
181    pub async fn run_until_nodes_synced(
182        &mut self,
183        mut timeout: Duration,
184        nodes: &[ClusterNodeId],
185    ) -> anyhow::Result<()> {
186        while !timeout.is_zero()
187            && !nodes.iter().all(|node| {
188                self.node(*node)
189                    .unwrap()
190                    .state()
191                    .transition_frontier
192                    .sync
193                    .is_synced()
194            })
195        {
196            let t = redux::Instant::now();
197            self.run(
198                RunCfg::default()
199                    .timeout(timeout)
200                    .action_handler(|_, _, _, action| {
201                        matches!(action.action().kind(), ActionKind::TransitionFrontierSynced)
202                    }),
203            )
204            .await?;
205            timeout = timeout.checked_sub(t.elapsed()).unwrap_or_default();
206        }
207        if timeout.is_zero() {
208            anyhow::bail!("timeout has elapsed while waiting for nodes to be synced");
209        }
210        Ok(())
211    }
212
213    pub fn pending_events(
214        &mut self,
215        poll: bool,
216    ) -> impl Iterator<
217        Item = (
218            ClusterNodeId,
219            &State,
220            impl Iterator<Item = (PendingEventId, &Event)>,
221        ),
222    > {
223        self.cluster.pending_events(poll)
224    }
225
226    pub fn node_pending_events(
227        &mut self,
228        node_id: ClusterNodeId,
229        poll: bool,
230    ) -> anyhow::Result<(&State, impl Iterator<Item = (PendingEventId, &Event)>)> {
231        self.cluster.node_pending_events(node_id, poll)
232    }
233
234    pub async fn wait_for_pending_events(&mut self) {
235        self.cluster.wait_for_pending_events().await
236    }
237
238    pub async fn wait_for_pending_events_with_timeout(&mut self, timeout: Duration) -> bool {
239        self.cluster
240            .wait_for_pending_events_with_timeout(timeout)
241            .await
242    }
243
244    pub fn debugger(&self) -> Option<&Debugger> {
245        self.cluster.debugger()
246    }
247
248    /// Block producer accounts, ordered by total stake, largest first.
249    ///
250    /// Warning: caller must ensure we are using custom daemon json if
251    /// this method is called, so that we have secret keys for
252    /// all block producers.
253    pub fn block_producer_sec_keys(&self, node_id: ClusterNodeId) -> Vec<(AccountSecretKey, u64)> {
254        let Some(block_producers) = None.or_else(|| {
255            let node = self.node(node_id)?;
256            let best_tip = node.state().transition_frontier.best_tip()?;
257            let staking_ledger_hash = best_tip.staking_epoch_ledger_hash();
258            LedgerService::ledger_manager(node.service()).producers_with_delegates(
259                staking_ledger_hash,
260                move |pub_key| {
261                    pub_key != &AccountSecretKey::genesis_producer().public_key_compressed()
262                },
263            )
264        }) else {
265            return Default::default();
266        };
267
268        let mut block_producers = block_producers
269            .into_iter()
270            .map(|(pub_key, delegates)| {
271                let sec_key = self
272                    .get_account_sec_key(&pub_key)
273                    .expect("sec key for block producer not found");
274                let stake: u64 = delegates.into_iter().map(|(_, _, balance)| balance).sum();
275                (sec_key.clone(), stake)
276            })
277            .collect::<Vec<_>>();
278
279        // order by stake
280        block_producers.sort_by(|(_, s1), (_, s2)| s2.cmp(s1));
281        block_producers
282    }
283
284    pub fn accounts_with_sec_keys<'b>(
285        &'b self,
286        node_id: ClusterNodeId,
287    ) -> Box<dyn 'b + Iterator<Item = (AccountSecretKey, Box<ledger::Account>)>> {
288        let Some(mask) = self.node(node_id).and_then(|node| {
289            let best_tip = node.state().transition_frontier.best_tip()?;
290            let ledger_hash = best_tip.merkle_root_hash();
291            let (mask, _) = LedgerService::ledger_manager(node.service()).get_mask(ledger_hash)?;
292            Some(mask)
293        }) else {
294            return Box::new(std::iter::empty());
295        };
296
297        let depth = mask.depth() as usize;
298        let num_accounts = mask.num_accounts() as u64;
299        Box::new(
300            (0..num_accounts)
301                .map(ledger::AccountIndex)
302                .filter_map(move |index| mask.get(ledger::Address::from_index(index, depth)))
303                .filter_map(|account| {
304                    let pub_key = account.public_key.clone().into();
305                    let sec_key = self.get_account_sec_key(&pub_key)?;
306                    Some((sec_key.clone(), account))
307                }),
308        )
309    }
310
311    /// Produces blocks in 5 second run intervals advancing time to the next won slot each time until predicate is true
312    /// Assumes there is a block producer running in the cluster
313    pub async fn produce_blocks_until<F>(
314        &mut self,
315        producer_node: ClusterNodeId,
316        log_tag: &str,
317        timeout: Duration,
318        step_duration: Duration,
319        keep_synced: bool,
320        predicate: F,
321    ) -> u32
322    where
323        F: Fn(&State, u32, u32) -> bool,
324    {
325        let now = redux::Instant::now();
326
327        let mut last_slot: u32 = 0;
328        let mut produced_blocks: u32 = 0;
329
330        let nodes: Vec<_> = self.nodes_iter().map(|(id, _)| id).collect();
331        while now.elapsed() <= timeout {
332            // andvance the time to slot 1
333            // TODO: this should be the next won slot, not slot 1
334            if last_slot == 0 {
335                let by_nanos = Duration::from_secs(3 * 60).as_nanos() as u64;
336                self.exec_step(ScenarioStep::AdvanceTime { by_nanos })
337                    .await
338                    .unwrap();
339            }
340
341            // run
342            let _ = self.run(RunCfg::default().timeout(step_duration)).await;
343            if keep_synced {
344                // make sure every node is synced, longer timeout in case one node disconnects and it needs to resync
345                self.run_until_nodes_synced(Duration::from_secs(5 * 60), &nodes)
346                    .await
347                    .unwrap();
348            }
349
350            let (state, _) = self.node_pending_events(producer_node, false).unwrap();
351
352            let current_state_machine_time = state.time();
353            let current_state_machine_time_u64: u64 = current_state_machine_time.into();
354            let current_state_machine_time_formated =
355                OffsetDateTime::from_unix_timestamp_nanos(current_state_machine_time_u64 as i128)
356                    .unwrap();
357
358            let best_tip = if let Some(best_tip) = state.transition_frontier.best_tip() {
359                best_tip
360            } else {
361                eprintln!("[{log_tag}] No best tip");
362                continue;
363            };
364
365            let current_global_slot = state.cur_global_slot().unwrap();
366
367            let next_won_slot = state
368                .block_producer
369                .vrf_evaluator()
370                .and_then(|vrf_state| vrf_state.next_won_slot(current_global_slot, best_tip));
371
372            let best_tip_slot = &best_tip
373                .consensus_state()
374                .curr_global_slot_since_hard_fork
375                .slot_number
376                .as_u32();
377
378            let current_time = OffsetDateTime::now_utc();
379            eprintln!("[{log_tag}][{current_time}][{current_state_machine_time_formated}] Slot(best tip / current slot): {best_tip_slot} / {current_global_slot}");
380
381            if best_tip_slot <= &0 {
382                let by_nanos = Duration::from_secs(3 * 60).as_nanos() as u64;
383                self.exec_step(ScenarioStep::AdvanceTime { by_nanos })
384                    .await
385                    .unwrap();
386                continue;
387            } else if best_tip_slot > &last_slot {
388                last_slot = *best_tip_slot;
389                produced_blocks += 1;
390            } else {
391                continue;
392            }
393
394            let (state, _) = self.node_pending_events(producer_node, false).unwrap();
395
396            if predicate(state, last_slot, produced_blocks) {
397                eprintln!("[{log_tag}] Condition met");
398                return produced_blocks;
399            }
400
401            if let Some(won_slot) = next_won_slot {
402                if let Some(diff) = won_slot.slot_time.checked_sub(current_state_machine_time) {
403                    eprintln!("[{log_tag}] advancing time by {diff:?}");
404                    let by_nanos = diff.as_nanos() as u64;
405                    self.exec_step(ScenarioStep::AdvanceTime { by_nanos })
406                        .await
407                        .unwrap();
408                } else {
409                    continue;
410                }
411            } else {
412                continue;
413            }
414        }
415
416        panic!("Global timeout reached");
417    }
418
419    /// Skip to 3 blocks before the epoch end by advancing time
420    /// Assumes there is a block producer running in the cluster
421    pub async fn advance_to_epoch_bounds(
422        &mut self,
423        producer_node: ClusterNodeId,
424        timeout: Duration,
425        step_duration: Duration,
426    ) -> u32 {
427        const SLOTS_PER_EPOCH: u32 = 7_140;
428
429        let (state, _) = self.node_pending_events(producer_node, false).unwrap();
430        let current_epoch = state.current_epoch().unwrap();
431        let latest_slot = state.cur_global_slot().unwrap();
432        let current_epoch_end = current_epoch * SLOTS_PER_EPOCH + SLOTS_PER_EPOCH - 1;
433        let to_epoch_bound = ((current_epoch_end - latest_slot) - 3) as u64;
434
435        let diff = Duration::from_secs(3 * 60 * to_epoch_bound);
436
437        eprintln!("[EPOCH BOUNDS] advancing time by {diff:?}");
438        let by_nanos = diff.as_nanos() as u64;
439        self.exec_step(ScenarioStep::AdvanceTime { by_nanos })
440            .await
441            .unwrap();
442
443        self.produce_blocks_until(
444            producer_node,
445            "EPOCH BOUNDS",
446            timeout,
447            step_duration,
448            true,
449            |state, last_slot, produced_blocks| {
450                eprintln!("\nSnarks: {}", state.snark_pool.last_index());
451                eprintln!("Produced blocks: {produced_blocks}");
452                last_slot >= current_epoch_end
453            },
454        )
455        .await
456    }
457
458    pub async fn wait_for_ocaml(&mut self, node_id: ClusterOcamlNodeId) {
459        self.exec_step(ScenarioStep::Ocaml {
460            node_id,
461            step: OcamlStep::WaitReady {
462                timeout: Duration::from_secs(6 * 60),
463            },
464        })
465        .await
466        .expect("Error waiting for ocaml node");
467    }
468}