Skip to main content

mina_node_testing/cluster/
mod.rs

1//! Cluster Management for Multi-Node Testing
2//!
3//! This module provides the core infrastructure for managing clusters of
4//! Mina nodes during testing scenarios. It supports both Rust and OCaml
5//! node implementations, enabling cross-implementation testing and complex
6//! multi-node scenarios.
7//!
8//! # Key Components
9//!
10//! - [`Cluster`] - Main cluster coordinator managing node lifecycle
11//! - Node addition methods for different node types
12//! - Port allocation and resource management
13//! - Scenario execution and state tracking
14//! - Network debugger integration
15//!
16//! # Node Addition Methods
17//!
18//! - [`Cluster::add_rust_node`] - Add Rust implementation nodes
19//! - [`Cluster::add_ocaml_node`] - Add OCaml implementation nodes
20//!
21//! # Example
22//!
23//! ```rust,no_run
24//! let mut cluster = Cluster::new(ClusterConfig::default());
25//!
26//! // Add Rust node with custom configuration
27//! let rust_node = cluster.add_rust_node(RustNodeTestingConfig::default());
28//!
29//! // Add OCaml node for cross-implementation testing
30//! let ocaml_node = cluster.add_ocaml_node(OcamlNodeTestingConfig::default());
31//! ```
32
33mod config;
34pub use config::{ClusterConfig, ProofKind};
35
36mod p2p_task_spawner;
37
38mod node_id;
39use mina_core::channels::Aborter;
40pub use node_id::{ClusterNodeId, ClusterOcamlNodeId};
41
42pub mod runner;
43
44use std::{
45    collections::{BTreeMap, VecDeque},
46    io::Read,
47    path::{Path, PathBuf},
48    sync::{Arc, Mutex as StdMutex},
49    time::Duration,
50};
51
52use libp2p::futures::{stream::FuturesUnordered, StreamExt};
53
54use ledger::proofs::provers::BlockProver;
55use mina_node::{
56    account::{AccountPublicKey, AccountSecretKey},
57    core::{
58        consensus::ConsensusConstants,
59        constants::constraint_constants,
60        invariants::InvariantsState,
61        log::{info, system_time, warn},
62        requests::RpcId,
63        thread,
64    },
65    event_source::Event,
66    p2p::{
67        channels::ChannelId, identity::SecretKey as P2pSecretKey, P2pConnectionEvent, P2pEvent,
68        P2pLimits, P2pMeshsubConfig, PeerId,
69    },
70    service::{Recorder, Service},
71    snark::{get_srs, BlockVerifier, TransactionVerifier, VerifierSRS},
72    BuildEnv, Config, GlobalConfig, LedgerConfig, P2pConfig, SnarkConfig, State,
73    TransitionFrontierConfig,
74};
75use mina_node_invariants::{InvariantResult, Invariants};
76use mina_node_native::{http_server, NodeServiceBuilder};
77use serde::{de::DeserializeOwned, Serialize};
78use temp_dir::TempDir;
79
80use crate::{
81    network_debugger::Debugger,
82    node::{
83        DaemonJson, Node, NodeTestingConfig, NonDeterministicEvent, OcamlNode, OcamlNodeConfig,
84        OcamlNodeTestingConfig, OcamlStep, RustNodeTestingConfig, TestPeerId,
85    },
86    scenario::{ListenerNode, Scenario, ScenarioId, ScenarioStep},
87    service::{NodeTestingService, PendingEventId},
88};
89
90#[allow(dead_code)]
91fn mina_path<P: AsRef<Path>>(path: P) -> Option<PathBuf> {
92    std::env::var_os("HOME").map(|home| PathBuf::from(home).join(".cache/mina").join(path))
93}
94
95#[allow(dead_code)]
96fn read_index<T: DeserializeOwned>(name: &str) -> Option<T> {
97    mina_path(name)
98        .and_then(|path| {
99            if !path.exists() {
100                return None;
101            }
102            match std::fs::File::open(path) {
103                Ok(v) => Some(v),
104                Err(e) => {
105                    warn!(system_time(); "cannot find verifier index for {name}: {e}");
106                    None
107                }
108            }
109        })
110        .and_then(|mut file| {
111            let mut buf = Vec::new();
112            file.read_to_end(&mut buf).ok().and(Some(buf))
113        })
114        .and_then(|bytes| match postcard::from_bytes(&bytes) {
115            Ok(v) => Some(v),
116            Err(e) => {
117                warn!(system_time(); "cannot read verifier index for {name}: {e}");
118                None
119            }
120        })
121}
122
123#[allow(dead_code)]
124fn write_index<T: Serialize>(name: &str, index: &T) -> Option<()> {
125    mina_path(name)
126        .and_then(|path| {
127            let Some(parent) = path.parent() else {
128                warn!(system_time(); "cannot get parent for {path:?}");
129                return None;
130            };
131            if let Err(e) = std::fs::create_dir_all(parent) {
132                warn!(system_time(); "cannot create parent dir for {parent:?}: {e}");
133                return None;
134            }
135            match std::fs::File::create(&path) {
136                Ok(v) => Some(v),
137                Err(e) => {
138                    warn!(system_time(); "cannot create file {path:?}: {e}");
139                    None
140                }
141            }
142        })
143        .and_then(|file| match postcard::to_io(index, file) {
144            Ok(_) => Some(()),
145            Err(e) => {
146                warn!(system_time(); "cannot write verifier index for {name}: {e}");
147                None
148            }
149        })
150}
151
152lazy_static::lazy_static! {
153    static ref VERIFIER_SRS: Arc<VerifierSRS> = get_srs();
154}
155
156/// Manages a cluster of Mina nodes for testing scenarios.
157///
158/// The `Cluster` struct coordinates multiple node instances, handling
159/// resource allocation, configuration, and lifecycle management. It supports
160/// both Rust and OCaml node implementations for comprehensive testing.
161///
162/// # Default Behaviors
163///
164/// - **Port allocation**: Automatically assigns available ports from the
165///   configured range, testing availability before assignment
166/// - **Keypair management**: Uses deterministic keypairs for Rust nodes and
167///   rotates through predefined keypairs for OCaml nodes
168/// - **Resource isolation**: Each node gets isolated temporary directories
169/// - **Verifier indices**: Shared verifier SRS and indices across all nodes
170/// - **Network debugging**: Optional debugger integration for CI environments
171///
172/// # Node Addition
173///
174/// The cluster provides specialized methods for adding different node types:
175/// - Rust nodes via [`add_rust_node`](Self::add_rust_node)
176/// - OCaml nodes via [`add_ocaml_node`](Self::add_ocaml_node)
177pub struct Cluster {
178    /// Cluster-wide configuration settings
179    pub config: ClusterConfig,
180    /// Current scenario execution state
181    scenario: ClusterScenarioRun,
182    /// Iterator over available ports for node allocation
183    available_ports: Box<dyn Iterator<Item = u16> + Send>,
184    /// Registry of account secret keys for deterministic testing
185    account_sec_keys: BTreeMap<AccountPublicKey, AccountSecretKey>,
186    /// Collection of active Rust nodes
187    nodes: Vec<Node>,
188    /// Collection of active OCaml nodes (Option for lifecycle management)
189    ocaml_nodes: Vec<Option<OcamlNode>>,
190    /// Genesis timestamp for deterministic time progression
191    initial_time: Option<redux::Timestamp>,
192
193    /// Counter for generating unique RPC request IDs
194    rpc_counter: usize,
195    /// Index for rotating OCaml LibP2P keypairs
196    ocaml_libp2p_keypair_i: usize,
197
198    /// Shared verifier SRS for proof verification
199    verifier_srs: Arc<VerifierSRS>,
200    /// Block verifier index for consensus validation
201    block_verifier_index: BlockVerifier,
202    /// Transaction verifier index for transaction validation
203    work_verifier_index: TransactionVerifier,
204
205    /// Optional network traffic debugger
206    debugger: Option<Debugger>,
207    /// Shared state for invariant checking across nodes
208    invariants_state: Arc<StdMutex<InvariantsState>>,
209}
210
211/// Tracks the execution state of scenario chains within a cluster.
212///
213/// Manages the progression through scenario steps and maintains history
214/// of completed scenarios for debugging and analysis.
215#[derive(Serialize)]
216pub struct ClusterScenarioRun {
217    /// Queue of scenarios to be executed (supports scenario inheritance)
218    chain: VecDeque<Scenario>,
219    /// History of completed scenarios
220    finished: Vec<Scenario>,
221    /// Current step index within the active scenario
222    cur_step: usize,
223}
224
225impl Cluster {
226    pub fn new(config: ClusterConfig) -> Self {
227        let available_ports = config
228            .port_range()
229            .filter(|port| std::net::TcpListener::bind(("0.0.0.0", *port)).is_ok());
230        let debugger = if config.is_use_debugger() {
231            Some(Debugger::drone_ci())
232        } else {
233            None
234        };
235        Self {
236            config,
237            scenario: ClusterScenarioRun {
238                chain: Default::default(),
239                finished: Default::default(),
240                cur_step: 0,
241            },
242            available_ports: Box::new(available_ports),
243            account_sec_keys: Default::default(),
244            nodes: Vec::new(),
245            ocaml_nodes: Vec::new(),
246            initial_time: None,
247
248            rpc_counter: 0,
249            ocaml_libp2p_keypair_i: 0,
250
251            verifier_srs: VERIFIER_SRS.clone(),
252            block_verifier_index: BlockVerifier::make(),
253            work_verifier_index: TransactionVerifier::make(),
254
255            debugger,
256            invariants_state: Arc::new(StdMutex::new(Default::default())),
257        }
258    }
259
260    pub fn available_port(&mut self) -> Option<u16> {
261        self.available_ports.next()
262    }
263
264    pub fn add_account_sec_key(&mut self, sec_key: AccountSecretKey) {
265        self.account_sec_keys.insert(sec_key.public_key(), sec_key);
266    }
267
268    pub fn get_account_sec_key(&self, pub_key: &AccountPublicKey) -> Option<&AccountSecretKey> {
269        self.account_sec_keys.get(pub_key).or_else(|| {
270            AccountSecretKey::deterministic_iter().find(|sec_key| &sec_key.public_key() == pub_key)
271        })
272    }
273
274    pub fn set_initial_time(&mut self, initial_time: redux::Timestamp) {
275        self.initial_time = Some(initial_time)
276    }
277
278    pub fn get_initial_time(&self) -> Option<redux::Timestamp> {
279        self.initial_time
280    }
281
282    /// Add a new Rust implementation node to the cluster.
283    ///
284    /// Creates and configures a Rust Mina node with the specified testing
285    /// configuration. This method handles all aspects of node initialization
286    /// including port allocation, key generation, service setup, and state
287    /// initialization.
288    ///
289    /// # Default Behaviors
290    ///
291    /// - **Port allocation**: HTTP and LibP2P ports automatically assigned
292    ///   from available port range
293    /// - **Peer identity**: Deterministic LibP2P keypair based on node index
294    /// - **Work directory**: Isolated temporary directory per node
295    /// - **Invariants**: Automatic invariant checking enabled
296    /// - **HTTP server**: Spawned on separate thread for API access
297    /// - **Proof verification**: Shared verifier indices and SRS
298    ///
299    /// # Configuration Options
300    ///
301    /// - `peer_id`: Deterministic or custom LibP2P identity
302    /// - `libp2p_port`: Custom P2P port (auto-assigned if None)
303    /// - `initial_peers`: Peer connection targets (supports node references)
304    /// - `block_producer`: Optional block production configuration
305    /// - `genesis`: Genesis ledger and protocol constants
306    /// - `snark_worker`: SNARK work generation settings
307    ///
308    /// # Returns
309    ///
310    /// Returns a [`ClusterNodeId`] that can be used to reference this node
311    /// in scenarios and for inter-node connections.
312    ///
313    /// # Panics
314    ///
315    /// Panics if:
316    /// - No available ports in the configured range
317    /// - Node service initialization fails
318    /// - Invalid genesis configuration
319    pub fn add_rust_node(&mut self, testing_config: RustNodeTestingConfig) -> ClusterNodeId {
320        let rng_seed = [0; 32];
321        let node_config = testing_config.clone();
322        let node_id = ClusterNodeId::new_unchecked(self.nodes.len());
323
324        info!(
325            system_time();
326            "Adding Rust node {} with config: max_peers={}, snark_worker={:?}, \
327             block_producer={}",
328            node_id.index(),
329            testing_config.max_peers,
330            testing_config.snark_worker,
331            testing_config.block_producer.is_some()
332        );
333
334        let work_dir = TempDir::new().unwrap();
335        let shutdown_initiator = Aborter::default();
336        let shutdown_listener = shutdown_initiator.aborted();
337        let p2p_sec_key = match testing_config.peer_id {
338            TestPeerId::Derived => {
339                info!(system_time(); "Using deterministic peer ID for node {}", node_id.index());
340                P2pSecretKey::deterministic(node_id.index())
341            }
342            TestPeerId::Bytes(bytes) => {
343                info!(system_time(); "Using custom peer ID for node {}", node_id.index());
344                P2pSecretKey::from_bytes(bytes)
345            }
346        };
347
348        let http_port = self
349            .available_ports
350            .next()
351            .ok_or_else(|| {
352                anyhow::anyhow!(
353                    "couldn't find available port in port range: {:?}",
354                    self.config.port_range()
355                )
356            })
357            .unwrap();
358        let libp2p_port = testing_config.libp2p_port.unwrap_or_else(|| {
359            self.available_ports
360                .next()
361                .ok_or_else(|| {
362                    anyhow::anyhow!(
363                        "couldn't find available port in port range: {:?}",
364                        self.config.port_range()
365                    )
366                })
367                .unwrap()
368        });
369
370        info!(
371            system_time();
372            "Assigned ports for Rust node {}: HTTP={}, LibP2P={}",
373            node_id.index(),
374            http_port,
375            libp2p_port
376        );
377
378        let (block_producer_sec_key, block_producer_config) = testing_config
379            .block_producer
380            .map(|v| {
381                info!(
382                    system_time();
383                    "Configuring block producer for Rust node {} with public key: {}",
384                    node_id.index(),
385                    v.sec_key.public_key()
386                );
387                (v.sec_key, v.config)
388            })
389            .unzip();
390
391        let initial_peers: Vec<_> = testing_config
392            .initial_peers
393            .into_iter()
394            .map(|node| {
395                let addr = match &node {
396                    ListenerNode::Rust(id) => {
397                        info!(system_time(); "Adding Rust peer {} as initial peer", id.index());
398                        self.node(*id).unwrap().dial_addr()
399                    }
400                    ListenerNode::Ocaml(id) => {
401                        info!(system_time(); "Adding OCaml peer {} as initial peer", id.index());
402                        self.ocaml_node(*id).unwrap().dial_addr()
403                    }
404                    ListenerNode::Custom(addr) => {
405                        info!(system_time(); "Adding custom peer: {:?}", addr);
406                        addr.clone()
407                    }
408                };
409                addr
410            })
411            .collect();
412
413        if !initial_peers.is_empty() {
414            info!(
415                system_time();
416                "Rust node {} configured with {} initial peers",
417                node_id.index(),
418                initial_peers.len()
419            );
420        } else {
421            info!(system_time(); "Rust node {} configured as seed node (no initial peers)", node_id.index());
422        }
423
424        let protocol_constants = testing_config
425            .genesis
426            .protocol_constants()
427            .expect("wrong protocol constants");
428        let consensus_consts =
429            ConsensusConstants::create(constraint_constants(), &protocol_constants);
430
431        let config = Config {
432            ledger: LedgerConfig {},
433            snark: SnarkConfig {
434                // TODO(binier): use cache
435                block_verifier_index: self.block_verifier_index.clone(),
436                block_verifier_srs: self.verifier_srs.clone(),
437                work_verifier_index: self.work_verifier_index.clone(),
438                work_verifier_srs: self.verifier_srs.clone(),
439            },
440            global: GlobalConfig {
441                build: BuildEnv::get().into(),
442                snarker: testing_config.snark_worker,
443                consensus_constants: consensus_consts.clone(),
444                client_port: Some(http_port),
445                testing_run: true,
446                chain_id_override: None,
447                skip_proof_verification: false,
448            },
449            p2p: P2pConfig {
450                libp2p_port: Some(libp2p_port),
451                listen_port: Some(http_port),
452                identity_pub_key: p2p_sec_key.public_key(),
453                initial_peers,
454                external_addrs: vec![],
455                enabled_channels: ChannelId::iter_all().collect(),
456                peer_discovery: testing_config.peer_discovery,
457                timeouts: testing_config.timeouts,
458                limits: P2pLimits::default().with_max_peers(Some(testing_config.max_peers)),
459                meshsub: P2pMeshsubConfig {
460                    initial_time: testing_config
461                        .initial_time
462                        .checked_sub(redux::Timestamp::ZERO)
463                        .unwrap_or_default(),
464                    ..Default::default()
465                },
466            },
467            transition_frontier: TransitionFrontierConfig::new(testing_config.genesis),
468            block_producer: block_producer_config,
469            archive: None,
470            tx_pool: ledger::transaction_pool::Config {
471                trust_system: (),
472                pool_max_size: 3000,
473                slot_tx_end: None,
474            },
475        };
476
477        let mut service_builder = NodeServiceBuilder::new(rng_seed);
478        service_builder
479            .ledger_init()
480            .p2p_init_with_custom_task_spawner(
481                p2p_sec_key.clone(),
482                p2p_task_spawner::P2pTaskSpawner::new(shutdown_listener.clone()),
483            )
484            .gather_stats()
485            .record(match testing_config.recorder {
486                crate::node::Recorder::None => Recorder::None,
487                crate::node::Recorder::StateWithInputActions => {
488                    Recorder::only_input_actions(work_dir.path())
489                }
490            });
491
492        if let Some(keypair) = block_producer_sec_key {
493            info!(system_time(); "Initializing block producer for Rust node {}", node_id.index());
494            let provers = BlockProver::make(None, None);
495            service_builder.block_producer_init(keypair, Some(provers));
496        }
497
498        let real_service = service_builder
499            .build()
500            .map_err(|err| anyhow::anyhow!("node service build failed! error: {err}"))
501            .unwrap();
502
503        // spawn http-server
504        let runtime = tokio::runtime::Builder::new_current_thread()
505            .enable_all()
506            .build()
507            .unwrap();
508        let shutdown = shutdown_listener.clone();
509        let rpc_sender = real_service.rpc_sender();
510        thread::Builder::new()
511            .name("mina_http_server".to_owned())
512            .spawn(move || {
513                let local_set = tokio::task::LocalSet::new();
514                let task = async {
515                    tokio::select! {
516                        _ = shutdown.wait() => {}
517                        _ = http_server::run(http_port, rpc_sender) => {}
518                    }
519                };
520                local_set.block_on(&runtime, task);
521            })
522            .unwrap();
523
524        let invariants_state = self.invariants_state.clone();
525        let mut service =
526            NodeTestingService::new(real_service, node_id, invariants_state, shutdown_initiator);
527
528        service.set_proof_kind(self.config.proof_kind());
529        if self.config.all_rust_to_rust_use_webrtc() {
530            service.set_rust_to_rust_use_webrtc();
531        }
532        if self.config.is_replay() {
533            service.set_replay();
534        }
535
536        let state = mina_node::State::new(config, &consensus_consts, testing_config.initial_time);
537        fn effects(
538            store: &mut mina_node::Store<NodeTestingService>,
539            action: mina_node::ActionWithMeta,
540        ) {
541            // if action.action().kind().to_string().starts_with("BlockProducer") {
542            //     dbg!(action.action());
543            // }
544
545            store.service.dyn_effects(store.state.get(), &action);
546            let peer_id = store.state().p2p.my_id();
547            mina_core::log::trace!(action.time(); "{peer_id}: {:?}", action.action().kind());
548
549            for (invariant, res) in Invariants::check_all(store, &action) {
550                // TODO(binier): record instead of panicing.
551                match res {
552                    InvariantResult::Ignored(reason) => {
553                        unreachable!("No invariant should be ignored! ignore reason: {reason:?}");
554                    }
555                    InvariantResult::Violation(violation) => {
556                        panic!(
557                            "Invariant({}) violated! violation: {violation}",
558                            invariant.to_str()
559                        );
560                    }
561                    InvariantResult::Updated => {}
562                    InvariantResult::Ok => {}
563                }
564            }
565
566            mina_node::effects(store, action)
567        }
568        let mut store = mina_node::Store::new(
569            mina_node::reducer,
570            effects,
571            service,
572            testing_config.initial_time.into(),
573            state,
574        );
575        // record initial state.
576        {
577            store
578                .service
579                .recorder()
580                .initial_state(rng_seed, p2p_sec_key, store.state.get());
581        }
582
583        let node = Node::new(work_dir, node_config, store);
584
585        info!(
586            system_time();
587            "Successfully created Rust node {} at ports HTTP={}, LibP2P={}",
588            node_id.index(),
589            http_port,
590            libp2p_port
591        );
592
593        self.nodes.push(node);
594        node_id
595    }
596
597    /// Add a new OCaml implementation node to the cluster.
598    ///
599    /// Creates and spawns an OCaml Mina daemon process with the specified
600    /// configuration. This method handles process spawning, port allocation,
601    /// directory setup, and daemon configuration.
602    ///
603    /// # Default Behaviors
604    ///
605    /// - **Executable selection**: Automatically detects local binary or
606    ///   falls back to default Docker image
607    /// - **Port allocation**: LibP2P, GraphQL, and client ports automatically
608    ///   assigned from available range
609    /// - **Keypair rotation**: Uses predefined LibP2P keypairs, rotating
610    ///   through the set for each new node
611    /// - **Process management**: Spawns daemon with proper environment
612    ///   variables and argument configuration
613    /// - **Logging**: Stdout/stderr forwarded with port-based prefixes
614    /// - **Docker support**: Automatic container management when using Docker
615    ///
616    /// # Configuration Options
617    ///
618    /// - `initial_peers`: List of peer connection targets
619    /// - `daemon_json`: Genesis configuration (file path or in-memory JSON)
620    /// - `block_producer`: Optional block production key
621    ///
622    /// # Docker vs Local Execution
623    ///
624    /// The method automatically determines execution mode:
625    /// 1. Attempts to use locally installed `mina` binary
626    /// 2. Falls back to Docker with default image if binary not found
627    /// 3. Custom Docker images supported via configuration
628    ///
629    /// # Returns
630    ///
631    /// Returns a [`ClusterOcamlNodeId`] for referencing this OCaml node
632    /// in scenarios and peer connections.
633    ///
634    /// # Panics
635    ///
636    /// Panics if:
637    /// - No available ports in the configured range
638    /// - Temporary directory creation fails
639    /// - OCaml daemon process spawn fails
640    pub fn add_ocaml_node(&mut self, testing_config: OcamlNodeTestingConfig) -> ClusterOcamlNodeId {
641        let node_i = self.ocaml_nodes.len();
642
643        info!(
644            system_time();
645            "Adding OCaml node {} with {} initial peers, block_producer={}",
646            node_i,
647            testing_config.initial_peers.len(),
648            testing_config.block_producer.is_some()
649        );
650
651        let executable = self.config.ocaml_node_executable();
652        let mut next_port = || {
653            self.available_ports.next().ok_or_else(|| {
654                anyhow::anyhow!(
655                    "couldn't find available port in port range: {:?}",
656                    self.config.port_range()
657                )
658            })
659        };
660
661        let temp_dir = temp_dir::TempDir::new().expect("failed to create tempdir");
662        let libp2p_port = next_port().unwrap();
663        let graphql_port = next_port().unwrap();
664        let client_port = next_port().unwrap();
665
666        info!(
667            system_time();
668            "Assigned ports for OCaml node {}: LibP2P={}, GraphQL={}, Client={}",
669            node_i,
670            libp2p_port,
671            graphql_port,
672            client_port
673        );
674
675        let node = OcamlNode::start(OcamlNodeConfig {
676            executable,
677            dir: temp_dir,
678            libp2p_keypair_i: self.ocaml_libp2p_keypair_i,
679            libp2p_port,
680            graphql_port,
681            client_port,
682            initial_peers: testing_config.initial_peers,
683            daemon_json: testing_config.daemon_json,
684            block_producer: testing_config.block_producer,
685        })
686        .expect("failed to start ocaml node");
687
688        info!(
689            system_time();
690            "Successfully started OCaml node {} with keypair index {}",
691            node_i,
692            self.ocaml_libp2p_keypair_i
693        );
694
695        self.ocaml_libp2p_keypair_i += 1;
696
697        self.ocaml_nodes.push(Some(node));
698        ClusterOcamlNodeId::new_unchecked(node_i)
699    }
700
701    pub async fn start(&mut self, scenario: Scenario) -> Result<(), anyhow::Error> {
702        let mut parent_id = scenario.info.parent_id.clone();
703        self.scenario.chain.push_back(scenario);
704
705        while let Some(ref id) = parent_id {
706            let scenario = Scenario::load(id).await?;
707            parent_id.clone_from(&scenario.info.parent_id);
708            self.scenario.chain.push_back(scenario);
709        }
710
711        let scenario = self.scenario.cur_scenario();
712
713        for config in scenario.info.nodes.clone() {
714            match config {
715                NodeTestingConfig::Rust(config) => {
716                    self.add_rust_node(config.clone());
717                }
718                NodeTestingConfig::Ocaml(config) => {
719                    self.add_ocaml_node(config.clone());
720                }
721            }
722        }
723
724        Ok(())
725    }
726
727    pub async fn reload_scenarios(&mut self) -> Result<(), anyhow::Error> {
728        for scenario in &mut self.scenario.chain {
729            scenario.reload().await?;
730        }
731        Ok(())
732    }
733
734    pub fn next_scenario_and_step(&self) -> Option<(&ScenarioId, usize)> {
735        self.scenario
736            .peek_i()
737            .map(|(scenario_i, step_i)| (&self.scenario.chain[scenario_i].info.id, step_i))
738    }
739
740    pub fn target_scenario(&self) -> Option<&ScenarioId> {
741        self.scenario.target_scenario().map(|v| &v.info.id)
742    }
743
744    pub fn nodes_iter(&self) -> impl Iterator<Item = (ClusterNodeId, &Node)> {
745        self.nodes
746            .iter()
747            .enumerate()
748            .map(|(i, node)| (ClusterNodeId::new_unchecked(i), node))
749    }
750
751    pub fn ocaml_nodes_iter(&self) -> impl Iterator<Item = (ClusterOcamlNodeId, &OcamlNode)> {
752        self.ocaml_nodes
753            .iter()
754            .enumerate()
755            .filter_map(|(i, node)| node.as_ref().map(|node| (i, node)))
756            .map(|(i, node)| (ClusterOcamlNodeId::new_unchecked(i), node))
757    }
758
759    pub fn node(&self, node_id: ClusterNodeId) -> Option<&Node> {
760        self.nodes.get(node_id.index())
761    }
762
763    pub fn node_by_peer_id(&self, peer_id: PeerId) -> Option<&Node> {
764        self.nodes_iter()
765            .find(|(_, node)| node.peer_id() == peer_id)
766            .map(|(_, node)| node)
767    }
768
769    pub fn node_mut(&mut self, node_id: ClusterNodeId) -> Option<&mut Node> {
770        self.nodes.get_mut(node_id.index())
771    }
772
773    pub fn ocaml_node(&self, node_id: ClusterOcamlNodeId) -> Option<&OcamlNode> {
774        self.ocaml_nodes
775            .get(node_id.index())
776            .map(|opt| opt.as_ref().expect("tried to access removed ocaml node"))
777    }
778
779    pub fn ocaml_node_by_peer_id(&self, peer_id: PeerId) -> Option<&OcamlNode> {
780        self.ocaml_nodes_iter()
781            .find(|(_, node)| node.peer_id() == peer_id)
782            .map(|(_, node)| node)
783    }
784
785    pub fn pending_events(
786        &mut self,
787        poll: bool,
788    ) -> impl Iterator<
789        Item = (
790            ClusterNodeId,
791            &State,
792            impl Iterator<Item = (PendingEventId, &Event)>,
793        ),
794    > {
795        self.nodes.iter_mut().enumerate().map(move |(i, node)| {
796            let node_id = ClusterNodeId::new_unchecked(i);
797            let (state, pending_events) = node.pending_events_with_state(poll);
798            (node_id, state, pending_events)
799        })
800    }
801
802    pub fn node_pending_events(
803        &mut self,
804        node_id: ClusterNodeId,
805        poll: bool,
806    ) -> Result<(&State, impl Iterator<Item = (PendingEventId, &Event)>), anyhow::Error> {
807        let node = self
808            .nodes
809            .get_mut(node_id.index())
810            .ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?;
811        Ok(node.pending_events_with_state(poll))
812    }
813
814    pub async fn wait_for_pending_events(&mut self) {
815        let mut nodes = &mut self.nodes[..];
816        let mut futures = FuturesUnordered::new();
817
818        while let Some((node, nodes_rest)) = nodes.split_first_mut() {
819            nodes = nodes_rest;
820            futures.push(async { node.wait_for_next_pending_event().await.is_some() });
821        }
822
823        while let Some(has_event) = futures.next().await {
824            if has_event {
825                break;
826            }
827        }
828    }
829
830    pub async fn wait_for_pending_events_with_timeout(&mut self, timeout: Duration) -> bool {
831        let timeout = tokio::time::sleep(timeout);
832
833        tokio::select! {
834            _ = self.wait_for_pending_events() => true,
835            _ = timeout => false,
836        }
837    }
838
839    pub async fn wait_for_pending_event(
840        &mut self,
841        node_id: ClusterNodeId,
842        event_pattern: &str,
843    ) -> anyhow::Result<PendingEventId> {
844        let node = self
845            .nodes
846            .get_mut(node_id.index())
847            .ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?;
848        let timeout = tokio::time::sleep(Duration::from_secs(300));
849        tokio::select! {
850            opt = node.wait_for_event(event_pattern) => opt.ok_or_else(|| anyhow::anyhow!("wait_for_event: None")),
851            _ = timeout => {
852                let pending_events = node.pending_events(false).map(|(_, event)| event.to_string()).collect::<Vec<_>>();
853                 Err(anyhow::anyhow!("waiting for event timed out! node {node_id:?}, event: \"{event_pattern}\"\n{pending_events:?}"))
854            }
855        }
856    }
857
858    pub async fn wait_for_event_and_dispatch(
859        &mut self,
860        node_id: ClusterNodeId,
861        event_pattern: &str,
862    ) -> anyhow::Result<bool> {
863        let event_id = self.wait_for_pending_event(node_id, event_pattern).await?;
864        let node = self.nodes.get_mut(node_id.index()).unwrap();
865        Ok(node.take_event_and_dispatch(event_id))
866    }
867
868    pub async fn add_steps_and_save(&mut self, steps: impl IntoIterator<Item = ScenarioStep>) {
869        let scenario = self.scenario.chain.back_mut().unwrap();
870        steps
871            .into_iter()
872            .for_each(|step| scenario.add_step(step).unwrap());
873        scenario.save().await.unwrap();
874    }
875
876    pub async fn exec_to_end(&mut self) -> Result<(), anyhow::Error> {
877        let mut i = 0;
878        let total = self.scenario.cur_scenario().steps.len();
879        loop {
880            info!(system_time(); "Executing step {}/{}", i + 1, total);
881            if !self.exec_next().await? {
882                break Ok(());
883            }
884            i += 1;
885        }
886    }
887
888    pub async fn exec_until(
889        &mut self,
890        target_scenario: ScenarioId,
891        step_i: Option<usize>,
892    ) -> Result<(), anyhow::Error> {
893        if self
894            .scenario
895            .finished
896            .iter()
897            .any(|v| v.info.id == target_scenario)
898        {
899            return Err(anyhow::anyhow!(
900                "cluster already finished '{target_scenario}' scenario"
901            ));
902        }
903
904        while self
905            .scenario
906            .peek()
907            .is_some_and(|(scenario, _)| scenario.info.id != target_scenario)
908        {
909            if !self.exec_next().await? {
910                break;
911            }
912        }
913
914        while self
915            .scenario
916            .peek()
917            .is_some_and(|(scenario, _)| scenario.info.id == target_scenario)
918        {
919            if let Some(step_i) = step_i {
920                if self.scenario.peek_i().unwrap().1 >= step_i {
921                    break;
922                }
923            }
924            if !self.exec_next().await? {
925                break;
926            }
927        }
928
929        Ok(())
930    }
931
932    pub async fn exec_next(&mut self) -> Result<bool, anyhow::Error> {
933        let (_scenario, step) = match self.scenario.peek() {
934            Some(v) => v,
935            None => return Ok(false),
936        };
937        let dispatched = self.exec_step(step.clone()).await?;
938
939        if dispatched {
940            self.scenario.advance();
941        }
942
943        Ok(dispatched)
944    }
945
946    pub async fn exec_step(&mut self, step: ScenarioStep) -> anyhow::Result<bool> {
947        Ok(match step {
948            ScenarioStep::Event { node_id, event } => {
949                return self.wait_for_event_and_dispatch(node_id, &event).await;
950            }
951            ScenarioStep::ManualEvent { node_id, event } => self
952                .nodes
953                .get_mut(node_id.index())
954                .ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?
955                .dispatch_event(*event),
956            ScenarioStep::NonDeterministicEvent { node_id, event } => {
957                let event = match *event {
958                    NonDeterministicEvent::P2pConnectionClosed(peer_id) => {
959                        let node = self
960                            .nodes
961                            .get_mut(node_id.index())
962                            .ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?;
963                        node.p2p_disconnect(peer_id);
964                        let event =
965                            Event::P2p(P2pEvent::Connection(P2pConnectionEvent::Closed(peer_id)));
966                        return self
967                            .wait_for_event_and_dispatch(node_id, &event.to_string())
968                            .await;
969                    }
970                    NonDeterministicEvent::P2pConnectionFinalized(peer_id, res) => {
971                        let node = self
972                            .nodes
973                            .get(node_id.index())
974                            .ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?;
975                        let res_is_ok = res.is_ok();
976                        let event = Event::P2p(P2pEvent::Connection(
977                            P2pConnectionEvent::Finalized(peer_id, res),
978                        ));
979
980                        if res_is_ok {
981                            let is_peer_connected =
982                                node.state().p2p.get_ready_peer(&peer_id).is_some();
983                            if is_peer_connected {
984                                // we are already connected, so skip the extra event.
985                                return Ok(true);
986                            }
987                            eprintln!("non_deterministic_wait_for_event_and_dispatch({node_id:?}): {event}");
988                            return self
989                                .wait_for_event_and_dispatch(node_id, &event.to_string())
990                                .await;
991                        } else {
992                            event
993                        }
994                    }
995                    NonDeterministicEvent::RpcReadonly(id, req) => Event::Rpc(id, req),
996                };
997                eprintln!("non_deterministic_event_dispatch({node_id:?}): {event}");
998                self.nodes
999                    .get_mut(node_id.index())
1000                    .ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?
1001                    .dispatch_event(event)
1002            }
1003            ScenarioStep::AddNode { config } => match *config {
1004                NodeTestingConfig::Rust(config) => {
1005                    self.add_rust_node(config);
1006                    // TODO(binier): wait for node ports to be opened instead.
1007                    tokio::time::sleep(Duration::from_secs(2)).await;
1008                    true
1009                }
1010                NodeTestingConfig::Ocaml(config) => {
1011                    // before starting ocaml node, read and save secret
1012                    // keys from daemon.json.
1013                    let mut json_owned = None;
1014                    let json = match &config.daemon_json {
1015                        DaemonJson::Custom(path) => {
1016                            let bytes = tokio::fs::read(path).await.map_err(|err| {
1017                                anyhow::anyhow!(
1018                                    "error reading daemon.json from path({path}): {err}"
1019                                )
1020                            })?;
1021                            let json = serde_json::from_slice(&bytes).map_err(|err| {
1022                                anyhow::anyhow!(
1023                                    "failed to parse damon.json from path({path}): {err}"
1024                                )
1025                            })?;
1026                            json_owned.insert(json)
1027                        }
1028                        DaemonJson::InMem(json) => json,
1029                    };
1030                    let accounts = json["ledger"]["accounts"].as_array().ok_or_else(|| {
1031                        anyhow::anyhow!("daemon.json `.ledger.accounts` is not array")
1032                    })?;
1033
1034                    accounts
1035                        .iter()
1036                        .filter_map(|account| account["sk"].as_str())
1037                        .filter_map(|sk| sk.parse().ok())
1038                        .for_each(|sk| self.add_account_sec_key(sk));
1039
1040                    self.add_ocaml_node(config);
1041                    true
1042                }
1043            },
1044            ScenarioStep::ConnectNodes { dialer, listener } => {
1045                let listener_addr = match listener {
1046                    ListenerNode::Rust(listener) => {
1047                        let listener = self
1048                            .nodes
1049                            .get(listener.index())
1050                            .ok_or_else(|| anyhow::anyhow!("node {listener:?} not found"))?;
1051
1052                        listener.dial_addr()
1053                    }
1054                    ListenerNode::Ocaml(listener) => {
1055                        let listener = self
1056                            .ocaml_nodes
1057                            .get(listener.index())
1058                            .ok_or_else(|| anyhow::anyhow!("ocaml node {listener:?} not found"))?
1059                            .as_ref()
1060                            .ok_or_else(|| {
1061                                anyhow::anyhow!("tried to access removed ocaml node {listener:?}")
1062                            })?;
1063
1064                        listener.dial_addr()
1065                    }
1066                    ListenerNode::Custom(addr) => addr.clone(),
1067                };
1068
1069                self.rpc_counter += 1;
1070                let rpc_id = RpcId::new_unchecked(usize::MAX, self.rpc_counter);
1071                let dialer = self
1072                    .nodes
1073                    .get_mut(dialer.index())
1074                    .ok_or_else(|| anyhow::anyhow!("node {dialer:?} not found"))?;
1075
1076                let req = mina_node::rpc::RpcRequest::P2pConnectionOutgoing(listener_addr);
1077                dialer.dispatch_event(Event::Rpc(rpc_id, Box::new(req)))
1078            }
1079            ScenarioStep::CheckTimeouts { node_id } => {
1080                let node = self
1081                    .nodes
1082                    .get_mut(node_id.index())
1083                    .ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?;
1084                node.check_timeouts();
1085                true
1086            }
1087            ScenarioStep::AdvanceTime { by_nanos } => {
1088                for node in &mut self.nodes {
1089                    node.advance_time(by_nanos)
1090                }
1091                true
1092            }
1093            ScenarioStep::AdvanceNodeTime { node_id, by_nanos } => {
1094                let node = self
1095                    .nodes
1096                    .get_mut(node_id.index())
1097                    .ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?;
1098                node.advance_time(by_nanos);
1099                true
1100            }
1101            ScenarioStep::Ocaml { node_id, step } => {
1102                let node = self.ocaml_nodes.get_mut(node_id.index());
1103                let node =
1104                    node.ok_or_else(|| anyhow::anyhow!("ocaml node {node_id:?} not found"))?;
1105                if matches!(step, OcamlStep::KillAndRemove) {
1106                    let mut node = node.take().ok_or_else(|| {
1107                        anyhow::anyhow!("tried to access removed ocaml node {node_id:?}")
1108                    })?;
1109                    node.exec(step).await?
1110                } else {
1111                    let node = node.as_mut().ok_or_else(|| {
1112                        anyhow::anyhow!("tried to access removed ocaml node {node_id:?}")
1113                    })?;
1114                    node.exec(step).await?
1115                }
1116            }
1117        })
1118    }
1119
1120    pub fn debugger(&self) -> Option<&Debugger> {
1121        self.debugger.as_ref()
1122    }
1123}
1124
1125impl ClusterScenarioRun {
1126    pub fn target_scenario(&self) -> Option<&Scenario> {
1127        self.chain.back().or_else(|| self.finished.last())
1128    }
1129
1130    pub fn cur_scenario(&self) -> &Scenario {
1131        self.chain.front().unwrap()
1132    }
1133
1134    pub fn peek_i(&self) -> Option<(usize, usize)> {
1135        self.chain
1136            .iter()
1137            .enumerate()
1138            .filter_map(|(i, scenario)| {
1139                let step_i = if i == 0 { self.cur_step } else { 0 };
1140                scenario.steps.get(step_i)?;
1141                Some((i, step_i))
1142            })
1143            .nth(0)
1144    }
1145
1146    pub fn peek(&self) -> Option<(&Scenario, &ScenarioStep)> {
1147        self.peek_i().map(|(scenario_i, step_i)| {
1148            let scenario = &self.chain[scenario_i];
1149            let step = &scenario.steps[step_i];
1150            (scenario, step)
1151        })
1152    }
1153
1154    fn advance(&mut self) {
1155        if let Some((scenario_i, step_i)) = self.peek_i() {
1156            self.finished.extend(self.chain.drain(..scenario_i));
1157            if self.cur_step == step_i {
1158                self.cur_step += 1;
1159            } else {
1160                self.cur_step = step_i;
1161            }
1162        }
1163    }
1164}