1mod config;
34pub use config::{ClusterConfig, ProofKind};
35
36mod p2p_task_spawner;
37
38mod node_id;
39use mina_core::channels::Aborter;
40pub use node_id::{ClusterNodeId, ClusterOcamlNodeId};
41
42pub mod runner;
43
44use std::{
45 collections::{BTreeMap, VecDeque},
46 io::Read,
47 path::{Path, PathBuf},
48 sync::{Arc, Mutex as StdMutex},
49 time::Duration,
50};
51
52use libp2p::futures::{stream::FuturesUnordered, StreamExt};
53
54use ledger::proofs::provers::BlockProver;
55use mina_node::{
56 account::{AccountPublicKey, AccountSecretKey},
57 core::{
58 consensus::ConsensusConstants,
59 constants::constraint_constants,
60 invariants::InvariantsState,
61 log::{info, system_time, warn},
62 requests::RpcId,
63 thread,
64 },
65 event_source::Event,
66 p2p::{
67 channels::ChannelId, identity::SecretKey as P2pSecretKey, P2pConnectionEvent, P2pEvent,
68 P2pLimits, P2pMeshsubConfig, PeerId,
69 },
70 service::{Recorder, Service},
71 snark::{get_srs, BlockVerifier, TransactionVerifier, VerifierSRS},
72 BuildEnv, Config, GlobalConfig, LedgerConfig, P2pConfig, SnarkConfig, State,
73 TransitionFrontierConfig,
74};
75use mina_node_invariants::{InvariantResult, Invariants};
76use mina_node_native::{http_server, NodeServiceBuilder};
77use serde::{de::DeserializeOwned, Serialize};
78use temp_dir::TempDir;
79
80use crate::{
81 network_debugger::Debugger,
82 node::{
83 DaemonJson, Node, NodeTestingConfig, NonDeterministicEvent, OcamlNode, OcamlNodeConfig,
84 OcamlNodeTestingConfig, OcamlStep, RustNodeTestingConfig, TestPeerId,
85 },
86 scenario::{ListenerNode, Scenario, ScenarioId, ScenarioStep},
87 service::{NodeTestingService, PendingEventId},
88};
89
90#[allow(dead_code)]
91fn mina_path<P: AsRef<Path>>(path: P) -> Option<PathBuf> {
92 std::env::var_os("HOME").map(|home| PathBuf::from(home).join(".cache/mina").join(path))
93}
94
95#[allow(dead_code)]
96fn read_index<T: DeserializeOwned>(name: &str) -> Option<T> {
97 mina_path(name)
98 .and_then(|path| {
99 if !path.exists() {
100 return None;
101 }
102 match std::fs::File::open(path) {
103 Ok(v) => Some(v),
104 Err(e) => {
105 warn!(system_time(); "cannot find verifier index for {name}: {e}");
106 None
107 }
108 }
109 })
110 .and_then(|mut file| {
111 let mut buf = Vec::new();
112 file.read_to_end(&mut buf).ok().and(Some(buf))
113 })
114 .and_then(|bytes| match postcard::from_bytes(&bytes) {
115 Ok(v) => Some(v),
116 Err(e) => {
117 warn!(system_time(); "cannot read verifier index for {name}: {e}");
118 None
119 }
120 })
121}
122
123#[allow(dead_code)]
124fn write_index<T: Serialize>(name: &str, index: &T) -> Option<()> {
125 mina_path(name)
126 .and_then(|path| {
127 let Some(parent) = path.parent() else {
128 warn!(system_time(); "cannot get parent for {path:?}");
129 return None;
130 };
131 if let Err(e) = std::fs::create_dir_all(parent) {
132 warn!(system_time(); "cannot create parent dir for {parent:?}: {e}");
133 return None;
134 }
135 match std::fs::File::create(&path) {
136 Ok(v) => Some(v),
137 Err(e) => {
138 warn!(system_time(); "cannot create file {path:?}: {e}");
139 None
140 }
141 }
142 })
143 .and_then(|file| match postcard::to_io(index, file) {
144 Ok(_) => Some(()),
145 Err(e) => {
146 warn!(system_time(); "cannot write verifier index for {name}: {e}");
147 None
148 }
149 })
150}
151
152lazy_static::lazy_static! {
153 static ref VERIFIER_SRS: Arc<VerifierSRS> = get_srs();
154}
155
156pub struct Cluster {
178 pub config: ClusterConfig,
180 scenario: ClusterScenarioRun,
182 available_ports: Box<dyn Iterator<Item = u16> + Send>,
184 account_sec_keys: BTreeMap<AccountPublicKey, AccountSecretKey>,
186 nodes: Vec<Node>,
188 ocaml_nodes: Vec<Option<OcamlNode>>,
190 initial_time: Option<redux::Timestamp>,
192
193 rpc_counter: usize,
195 ocaml_libp2p_keypair_i: usize,
197
198 verifier_srs: Arc<VerifierSRS>,
200 block_verifier_index: BlockVerifier,
202 work_verifier_index: TransactionVerifier,
204
205 debugger: Option<Debugger>,
207 invariants_state: Arc<StdMutex<InvariantsState>>,
209}
210
211#[derive(Serialize)]
216pub struct ClusterScenarioRun {
217 chain: VecDeque<Scenario>,
219 finished: Vec<Scenario>,
221 cur_step: usize,
223}
224
225impl Cluster {
226 pub fn new(config: ClusterConfig) -> Self {
227 let available_ports = config
228 .port_range()
229 .filter(|port| std::net::TcpListener::bind(("0.0.0.0", *port)).is_ok());
230 let debugger = if config.is_use_debugger() {
231 Some(Debugger::drone_ci())
232 } else {
233 None
234 };
235 Self {
236 config,
237 scenario: ClusterScenarioRun {
238 chain: Default::default(),
239 finished: Default::default(),
240 cur_step: 0,
241 },
242 available_ports: Box::new(available_ports),
243 account_sec_keys: Default::default(),
244 nodes: Vec::new(),
245 ocaml_nodes: Vec::new(),
246 initial_time: None,
247
248 rpc_counter: 0,
249 ocaml_libp2p_keypair_i: 0,
250
251 verifier_srs: VERIFIER_SRS.clone(),
252 block_verifier_index: BlockVerifier::make(),
253 work_verifier_index: TransactionVerifier::make(),
254
255 debugger,
256 invariants_state: Arc::new(StdMutex::new(Default::default())),
257 }
258 }
259
260 pub fn available_port(&mut self) -> Option<u16> {
261 self.available_ports.next()
262 }
263
264 pub fn add_account_sec_key(&mut self, sec_key: AccountSecretKey) {
265 self.account_sec_keys.insert(sec_key.public_key(), sec_key);
266 }
267
268 pub fn get_account_sec_key(&self, pub_key: &AccountPublicKey) -> Option<&AccountSecretKey> {
269 self.account_sec_keys.get(pub_key).or_else(|| {
270 AccountSecretKey::deterministic_iter().find(|sec_key| &sec_key.public_key() == pub_key)
271 })
272 }
273
274 pub fn set_initial_time(&mut self, initial_time: redux::Timestamp) {
275 self.initial_time = Some(initial_time)
276 }
277
278 pub fn get_initial_time(&self) -> Option<redux::Timestamp> {
279 self.initial_time
280 }
281
282 pub fn add_rust_node(&mut self, testing_config: RustNodeTestingConfig) -> ClusterNodeId {
320 let rng_seed = [0; 32];
321 let node_config = testing_config.clone();
322 let node_id = ClusterNodeId::new_unchecked(self.nodes.len());
323
324 info!(
325 system_time();
326 "Adding Rust node {} with config: max_peers={}, snark_worker={:?}, \
327 block_producer={}",
328 node_id.index(),
329 testing_config.max_peers,
330 testing_config.snark_worker,
331 testing_config.block_producer.is_some()
332 );
333
334 let work_dir = TempDir::new().unwrap();
335 let shutdown_initiator = Aborter::default();
336 let shutdown_listener = shutdown_initiator.aborted();
337 let p2p_sec_key = match testing_config.peer_id {
338 TestPeerId::Derived => {
339 info!(system_time(); "Using deterministic peer ID for node {}", node_id.index());
340 P2pSecretKey::deterministic(node_id.index())
341 }
342 TestPeerId::Bytes(bytes) => {
343 info!(system_time(); "Using custom peer ID for node {}", node_id.index());
344 P2pSecretKey::from_bytes(bytes)
345 }
346 };
347
348 let http_port = self
349 .available_ports
350 .next()
351 .ok_or_else(|| {
352 anyhow::anyhow!(
353 "couldn't find available port in port range: {:?}",
354 self.config.port_range()
355 )
356 })
357 .unwrap();
358 let libp2p_port = testing_config.libp2p_port.unwrap_or_else(|| {
359 self.available_ports
360 .next()
361 .ok_or_else(|| {
362 anyhow::anyhow!(
363 "couldn't find available port in port range: {:?}",
364 self.config.port_range()
365 )
366 })
367 .unwrap()
368 });
369
370 info!(
371 system_time();
372 "Assigned ports for Rust node {}: HTTP={}, LibP2P={}",
373 node_id.index(),
374 http_port,
375 libp2p_port
376 );
377
378 let (block_producer_sec_key, block_producer_config) = testing_config
379 .block_producer
380 .map(|v| {
381 info!(
382 system_time();
383 "Configuring block producer for Rust node {} with public key: {}",
384 node_id.index(),
385 v.sec_key.public_key()
386 );
387 (v.sec_key, v.config)
388 })
389 .unzip();
390
391 let initial_peers: Vec<_> = testing_config
392 .initial_peers
393 .into_iter()
394 .map(|node| {
395 let addr = match &node {
396 ListenerNode::Rust(id) => {
397 info!(system_time(); "Adding Rust peer {} as initial peer", id.index());
398 self.node(*id).unwrap().dial_addr()
399 }
400 ListenerNode::Ocaml(id) => {
401 info!(system_time(); "Adding OCaml peer {} as initial peer", id.index());
402 self.ocaml_node(*id).unwrap().dial_addr()
403 }
404 ListenerNode::Custom(addr) => {
405 info!(system_time(); "Adding custom peer: {:?}", addr);
406 addr.clone()
407 }
408 };
409 addr
410 })
411 .collect();
412
413 if !initial_peers.is_empty() {
414 info!(
415 system_time();
416 "Rust node {} configured with {} initial peers",
417 node_id.index(),
418 initial_peers.len()
419 );
420 } else {
421 info!(system_time(); "Rust node {} configured as seed node (no initial peers)", node_id.index());
422 }
423
424 let protocol_constants = testing_config
425 .genesis
426 .protocol_constants()
427 .expect("wrong protocol constants");
428 let consensus_consts =
429 ConsensusConstants::create(constraint_constants(), &protocol_constants);
430
431 let config = Config {
432 ledger: LedgerConfig {},
433 snark: SnarkConfig {
434 block_verifier_index: self.block_verifier_index.clone(),
436 block_verifier_srs: self.verifier_srs.clone(),
437 work_verifier_index: self.work_verifier_index.clone(),
438 work_verifier_srs: self.verifier_srs.clone(),
439 },
440 global: GlobalConfig {
441 build: BuildEnv::get().into(),
442 snarker: testing_config.snark_worker,
443 consensus_constants: consensus_consts.clone(),
444 client_port: Some(http_port),
445 testing_run: true,
446 chain_id_override: None,
447 skip_proof_verification: false,
448 },
449 p2p: P2pConfig {
450 libp2p_port: Some(libp2p_port),
451 listen_port: Some(http_port),
452 identity_pub_key: p2p_sec_key.public_key(),
453 initial_peers,
454 external_addrs: vec![],
455 enabled_channels: ChannelId::iter_all().collect(),
456 peer_discovery: testing_config.peer_discovery,
457 timeouts: testing_config.timeouts,
458 limits: P2pLimits::default().with_max_peers(Some(testing_config.max_peers)),
459 meshsub: P2pMeshsubConfig {
460 initial_time: testing_config
461 .initial_time
462 .checked_sub(redux::Timestamp::ZERO)
463 .unwrap_or_default(),
464 ..Default::default()
465 },
466 },
467 transition_frontier: TransitionFrontierConfig::new(testing_config.genesis),
468 block_producer: block_producer_config,
469 archive: None,
470 tx_pool: ledger::transaction_pool::Config {
471 trust_system: (),
472 pool_max_size: 3000,
473 slot_tx_end: None,
474 },
475 };
476
477 let mut service_builder = NodeServiceBuilder::new(rng_seed);
478 service_builder
479 .ledger_init()
480 .p2p_init_with_custom_task_spawner(
481 p2p_sec_key.clone(),
482 p2p_task_spawner::P2pTaskSpawner::new(shutdown_listener.clone()),
483 )
484 .gather_stats()
485 .record(match testing_config.recorder {
486 crate::node::Recorder::None => Recorder::None,
487 crate::node::Recorder::StateWithInputActions => {
488 Recorder::only_input_actions(work_dir.path())
489 }
490 });
491
492 if let Some(keypair) = block_producer_sec_key {
493 info!(system_time(); "Initializing block producer for Rust node {}", node_id.index());
494 let provers = BlockProver::make(None, None);
495 service_builder.block_producer_init(keypair, Some(provers));
496 }
497
498 let real_service = service_builder
499 .build()
500 .map_err(|err| anyhow::anyhow!("node service build failed! error: {err}"))
501 .unwrap();
502
503 let runtime = tokio::runtime::Builder::new_current_thread()
505 .enable_all()
506 .build()
507 .unwrap();
508 let shutdown = shutdown_listener.clone();
509 let rpc_sender = real_service.rpc_sender();
510 thread::Builder::new()
511 .name("mina_http_server".to_owned())
512 .spawn(move || {
513 let local_set = tokio::task::LocalSet::new();
514 let task = async {
515 tokio::select! {
516 _ = shutdown.wait() => {}
517 _ = http_server::run(http_port, rpc_sender) => {}
518 }
519 };
520 local_set.block_on(&runtime, task);
521 })
522 .unwrap();
523
524 let invariants_state = self.invariants_state.clone();
525 let mut service =
526 NodeTestingService::new(real_service, node_id, invariants_state, shutdown_initiator);
527
528 service.set_proof_kind(self.config.proof_kind());
529 if self.config.all_rust_to_rust_use_webrtc() {
530 service.set_rust_to_rust_use_webrtc();
531 }
532 if self.config.is_replay() {
533 service.set_replay();
534 }
535
536 let state = mina_node::State::new(config, &consensus_consts, testing_config.initial_time);
537 fn effects(
538 store: &mut mina_node::Store<NodeTestingService>,
539 action: mina_node::ActionWithMeta,
540 ) {
541 store.service.dyn_effects(store.state.get(), &action);
546 let peer_id = store.state().p2p.my_id();
547 mina_core::log::trace!(action.time(); "{peer_id}: {:?}", action.action().kind());
548
549 for (invariant, res) in Invariants::check_all(store, &action) {
550 match res {
552 InvariantResult::Ignored(reason) => {
553 unreachable!("No invariant should be ignored! ignore reason: {reason:?}");
554 }
555 InvariantResult::Violation(violation) => {
556 panic!(
557 "Invariant({}) violated! violation: {violation}",
558 invariant.to_str()
559 );
560 }
561 InvariantResult::Updated => {}
562 InvariantResult::Ok => {}
563 }
564 }
565
566 mina_node::effects(store, action)
567 }
568 let mut store = mina_node::Store::new(
569 mina_node::reducer,
570 effects,
571 service,
572 testing_config.initial_time.into(),
573 state,
574 );
575 {
577 store
578 .service
579 .recorder()
580 .initial_state(rng_seed, p2p_sec_key, store.state.get());
581 }
582
583 let node = Node::new(work_dir, node_config, store);
584
585 info!(
586 system_time();
587 "Successfully created Rust node {} at ports HTTP={}, LibP2P={}",
588 node_id.index(),
589 http_port,
590 libp2p_port
591 );
592
593 self.nodes.push(node);
594 node_id
595 }
596
597 pub fn add_ocaml_node(&mut self, testing_config: OcamlNodeTestingConfig) -> ClusterOcamlNodeId {
641 let node_i = self.ocaml_nodes.len();
642
643 info!(
644 system_time();
645 "Adding OCaml node {} with {} initial peers, block_producer={}",
646 node_i,
647 testing_config.initial_peers.len(),
648 testing_config.block_producer.is_some()
649 );
650
651 let executable = self.config.ocaml_node_executable();
652 let mut next_port = || {
653 self.available_ports.next().ok_or_else(|| {
654 anyhow::anyhow!(
655 "couldn't find available port in port range: {:?}",
656 self.config.port_range()
657 )
658 })
659 };
660
661 let temp_dir = temp_dir::TempDir::new().expect("failed to create tempdir");
662 let libp2p_port = next_port().unwrap();
663 let graphql_port = next_port().unwrap();
664 let client_port = next_port().unwrap();
665
666 info!(
667 system_time();
668 "Assigned ports for OCaml node {}: LibP2P={}, GraphQL={}, Client={}",
669 node_i,
670 libp2p_port,
671 graphql_port,
672 client_port
673 );
674
675 let node = OcamlNode::start(OcamlNodeConfig {
676 executable,
677 dir: temp_dir,
678 libp2p_keypair_i: self.ocaml_libp2p_keypair_i,
679 libp2p_port,
680 graphql_port,
681 client_port,
682 initial_peers: testing_config.initial_peers,
683 daemon_json: testing_config.daemon_json,
684 block_producer: testing_config.block_producer,
685 })
686 .expect("failed to start ocaml node");
687
688 info!(
689 system_time();
690 "Successfully started OCaml node {} with keypair index {}",
691 node_i,
692 self.ocaml_libp2p_keypair_i
693 );
694
695 self.ocaml_libp2p_keypair_i += 1;
696
697 self.ocaml_nodes.push(Some(node));
698 ClusterOcamlNodeId::new_unchecked(node_i)
699 }
700
701 pub async fn start(&mut self, scenario: Scenario) -> Result<(), anyhow::Error> {
702 let mut parent_id = scenario.info.parent_id.clone();
703 self.scenario.chain.push_back(scenario);
704
705 while let Some(ref id) = parent_id {
706 let scenario = Scenario::load(id).await?;
707 parent_id.clone_from(&scenario.info.parent_id);
708 self.scenario.chain.push_back(scenario);
709 }
710
711 let scenario = self.scenario.cur_scenario();
712
713 for config in scenario.info.nodes.clone() {
714 match config {
715 NodeTestingConfig::Rust(config) => {
716 self.add_rust_node(config.clone());
717 }
718 NodeTestingConfig::Ocaml(config) => {
719 self.add_ocaml_node(config.clone());
720 }
721 }
722 }
723
724 Ok(())
725 }
726
727 pub async fn reload_scenarios(&mut self) -> Result<(), anyhow::Error> {
728 for scenario in &mut self.scenario.chain {
729 scenario.reload().await?;
730 }
731 Ok(())
732 }
733
734 pub fn next_scenario_and_step(&self) -> Option<(&ScenarioId, usize)> {
735 self.scenario
736 .peek_i()
737 .map(|(scenario_i, step_i)| (&self.scenario.chain[scenario_i].info.id, step_i))
738 }
739
740 pub fn target_scenario(&self) -> Option<&ScenarioId> {
741 self.scenario.target_scenario().map(|v| &v.info.id)
742 }
743
744 pub fn nodes_iter(&self) -> impl Iterator<Item = (ClusterNodeId, &Node)> {
745 self.nodes
746 .iter()
747 .enumerate()
748 .map(|(i, node)| (ClusterNodeId::new_unchecked(i), node))
749 }
750
751 pub fn ocaml_nodes_iter(&self) -> impl Iterator<Item = (ClusterOcamlNodeId, &OcamlNode)> {
752 self.ocaml_nodes
753 .iter()
754 .enumerate()
755 .filter_map(|(i, node)| node.as_ref().map(|node| (i, node)))
756 .map(|(i, node)| (ClusterOcamlNodeId::new_unchecked(i), node))
757 }
758
759 pub fn node(&self, node_id: ClusterNodeId) -> Option<&Node> {
760 self.nodes.get(node_id.index())
761 }
762
763 pub fn node_by_peer_id(&self, peer_id: PeerId) -> Option<&Node> {
764 self.nodes_iter()
765 .find(|(_, node)| node.peer_id() == peer_id)
766 .map(|(_, node)| node)
767 }
768
769 pub fn node_mut(&mut self, node_id: ClusterNodeId) -> Option<&mut Node> {
770 self.nodes.get_mut(node_id.index())
771 }
772
773 pub fn ocaml_node(&self, node_id: ClusterOcamlNodeId) -> Option<&OcamlNode> {
774 self.ocaml_nodes
775 .get(node_id.index())
776 .map(|opt| opt.as_ref().expect("tried to access removed ocaml node"))
777 }
778
779 pub fn ocaml_node_by_peer_id(&self, peer_id: PeerId) -> Option<&OcamlNode> {
780 self.ocaml_nodes_iter()
781 .find(|(_, node)| node.peer_id() == peer_id)
782 .map(|(_, node)| node)
783 }
784
785 pub fn pending_events(
786 &mut self,
787 poll: bool,
788 ) -> impl Iterator<
789 Item = (
790 ClusterNodeId,
791 &State,
792 impl Iterator<Item = (PendingEventId, &Event)>,
793 ),
794 > {
795 self.nodes.iter_mut().enumerate().map(move |(i, node)| {
796 let node_id = ClusterNodeId::new_unchecked(i);
797 let (state, pending_events) = node.pending_events_with_state(poll);
798 (node_id, state, pending_events)
799 })
800 }
801
802 pub fn node_pending_events(
803 &mut self,
804 node_id: ClusterNodeId,
805 poll: bool,
806 ) -> Result<(&State, impl Iterator<Item = (PendingEventId, &Event)>), anyhow::Error> {
807 let node = self
808 .nodes
809 .get_mut(node_id.index())
810 .ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?;
811 Ok(node.pending_events_with_state(poll))
812 }
813
814 pub async fn wait_for_pending_events(&mut self) {
815 let mut nodes = &mut self.nodes[..];
816 let mut futures = FuturesUnordered::new();
817
818 while let Some((node, nodes_rest)) = nodes.split_first_mut() {
819 nodes = nodes_rest;
820 futures.push(async { node.wait_for_next_pending_event().await.is_some() });
821 }
822
823 while let Some(has_event) = futures.next().await {
824 if has_event {
825 break;
826 }
827 }
828 }
829
830 pub async fn wait_for_pending_events_with_timeout(&mut self, timeout: Duration) -> bool {
831 let timeout = tokio::time::sleep(timeout);
832
833 tokio::select! {
834 _ = self.wait_for_pending_events() => true,
835 _ = timeout => false,
836 }
837 }
838
839 pub async fn wait_for_pending_event(
840 &mut self,
841 node_id: ClusterNodeId,
842 event_pattern: &str,
843 ) -> anyhow::Result<PendingEventId> {
844 let node = self
845 .nodes
846 .get_mut(node_id.index())
847 .ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?;
848 let timeout = tokio::time::sleep(Duration::from_secs(300));
849 tokio::select! {
850 opt = node.wait_for_event(event_pattern) => opt.ok_or_else(|| anyhow::anyhow!("wait_for_event: None")),
851 _ = timeout => {
852 let pending_events = node.pending_events(false).map(|(_, event)| event.to_string()).collect::<Vec<_>>();
853 Err(anyhow::anyhow!("waiting for event timed out! node {node_id:?}, event: \"{event_pattern}\"\n{pending_events:?}"))
854 }
855 }
856 }
857
858 pub async fn wait_for_event_and_dispatch(
859 &mut self,
860 node_id: ClusterNodeId,
861 event_pattern: &str,
862 ) -> anyhow::Result<bool> {
863 let event_id = self.wait_for_pending_event(node_id, event_pattern).await?;
864 let node = self.nodes.get_mut(node_id.index()).unwrap();
865 Ok(node.take_event_and_dispatch(event_id))
866 }
867
868 pub async fn add_steps_and_save(&mut self, steps: impl IntoIterator<Item = ScenarioStep>) {
869 let scenario = self.scenario.chain.back_mut().unwrap();
870 steps
871 .into_iter()
872 .for_each(|step| scenario.add_step(step).unwrap());
873 scenario.save().await.unwrap();
874 }
875
876 pub async fn exec_to_end(&mut self) -> Result<(), anyhow::Error> {
877 let mut i = 0;
878 let total = self.scenario.cur_scenario().steps.len();
879 loop {
880 info!(system_time(); "Executing step {}/{}", i + 1, total);
881 if !self.exec_next().await? {
882 break Ok(());
883 }
884 i += 1;
885 }
886 }
887
888 pub async fn exec_until(
889 &mut self,
890 target_scenario: ScenarioId,
891 step_i: Option<usize>,
892 ) -> Result<(), anyhow::Error> {
893 if self
894 .scenario
895 .finished
896 .iter()
897 .any(|v| v.info.id == target_scenario)
898 {
899 return Err(anyhow::anyhow!(
900 "cluster already finished '{target_scenario}' scenario"
901 ));
902 }
903
904 while self
905 .scenario
906 .peek()
907 .is_some_and(|(scenario, _)| scenario.info.id != target_scenario)
908 {
909 if !self.exec_next().await? {
910 break;
911 }
912 }
913
914 while self
915 .scenario
916 .peek()
917 .is_some_and(|(scenario, _)| scenario.info.id == target_scenario)
918 {
919 if let Some(step_i) = step_i {
920 if self.scenario.peek_i().unwrap().1 >= step_i {
921 break;
922 }
923 }
924 if !self.exec_next().await? {
925 break;
926 }
927 }
928
929 Ok(())
930 }
931
932 pub async fn exec_next(&mut self) -> Result<bool, anyhow::Error> {
933 let (_scenario, step) = match self.scenario.peek() {
934 Some(v) => v,
935 None => return Ok(false),
936 };
937 let dispatched = self.exec_step(step.clone()).await?;
938
939 if dispatched {
940 self.scenario.advance();
941 }
942
943 Ok(dispatched)
944 }
945
946 pub async fn exec_step(&mut self, step: ScenarioStep) -> anyhow::Result<bool> {
947 Ok(match step {
948 ScenarioStep::Event { node_id, event } => {
949 return self.wait_for_event_and_dispatch(node_id, &event).await;
950 }
951 ScenarioStep::ManualEvent { node_id, event } => self
952 .nodes
953 .get_mut(node_id.index())
954 .ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?
955 .dispatch_event(*event),
956 ScenarioStep::NonDeterministicEvent { node_id, event } => {
957 let event = match *event {
958 NonDeterministicEvent::P2pConnectionClosed(peer_id) => {
959 let node = self
960 .nodes
961 .get_mut(node_id.index())
962 .ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?;
963 node.p2p_disconnect(peer_id);
964 let event =
965 Event::P2p(P2pEvent::Connection(P2pConnectionEvent::Closed(peer_id)));
966 return self
967 .wait_for_event_and_dispatch(node_id, &event.to_string())
968 .await;
969 }
970 NonDeterministicEvent::P2pConnectionFinalized(peer_id, res) => {
971 let node = self
972 .nodes
973 .get(node_id.index())
974 .ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?;
975 let res_is_ok = res.is_ok();
976 let event = Event::P2p(P2pEvent::Connection(
977 P2pConnectionEvent::Finalized(peer_id, res),
978 ));
979
980 if res_is_ok {
981 let is_peer_connected =
982 node.state().p2p.get_ready_peer(&peer_id).is_some();
983 if is_peer_connected {
984 return Ok(true);
986 }
987 eprintln!("non_deterministic_wait_for_event_and_dispatch({node_id:?}): {event}");
988 return self
989 .wait_for_event_and_dispatch(node_id, &event.to_string())
990 .await;
991 } else {
992 event
993 }
994 }
995 NonDeterministicEvent::RpcReadonly(id, req) => Event::Rpc(id, req),
996 };
997 eprintln!("non_deterministic_event_dispatch({node_id:?}): {event}");
998 self.nodes
999 .get_mut(node_id.index())
1000 .ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?
1001 .dispatch_event(event)
1002 }
1003 ScenarioStep::AddNode { config } => match *config {
1004 NodeTestingConfig::Rust(config) => {
1005 self.add_rust_node(config);
1006 tokio::time::sleep(Duration::from_secs(2)).await;
1008 true
1009 }
1010 NodeTestingConfig::Ocaml(config) => {
1011 let mut json_owned = None;
1014 let json = match &config.daemon_json {
1015 DaemonJson::Custom(path) => {
1016 let bytes = tokio::fs::read(path).await.map_err(|err| {
1017 anyhow::anyhow!(
1018 "error reading daemon.json from path({path}): {err}"
1019 )
1020 })?;
1021 let json = serde_json::from_slice(&bytes).map_err(|err| {
1022 anyhow::anyhow!(
1023 "failed to parse damon.json from path({path}): {err}"
1024 )
1025 })?;
1026 json_owned.insert(json)
1027 }
1028 DaemonJson::InMem(json) => json,
1029 };
1030 let accounts = json["ledger"]["accounts"].as_array().ok_or_else(|| {
1031 anyhow::anyhow!("daemon.json `.ledger.accounts` is not array")
1032 })?;
1033
1034 accounts
1035 .iter()
1036 .filter_map(|account| account["sk"].as_str())
1037 .filter_map(|sk| sk.parse().ok())
1038 .for_each(|sk| self.add_account_sec_key(sk));
1039
1040 self.add_ocaml_node(config);
1041 true
1042 }
1043 },
1044 ScenarioStep::ConnectNodes { dialer, listener } => {
1045 let listener_addr = match listener {
1046 ListenerNode::Rust(listener) => {
1047 let listener = self
1048 .nodes
1049 .get(listener.index())
1050 .ok_or_else(|| anyhow::anyhow!("node {listener:?} not found"))?;
1051
1052 listener.dial_addr()
1053 }
1054 ListenerNode::Ocaml(listener) => {
1055 let listener = self
1056 .ocaml_nodes
1057 .get(listener.index())
1058 .ok_or_else(|| anyhow::anyhow!("ocaml node {listener:?} not found"))?
1059 .as_ref()
1060 .ok_or_else(|| {
1061 anyhow::anyhow!("tried to access removed ocaml node {listener:?}")
1062 })?;
1063
1064 listener.dial_addr()
1065 }
1066 ListenerNode::Custom(addr) => addr.clone(),
1067 };
1068
1069 self.rpc_counter += 1;
1070 let rpc_id = RpcId::new_unchecked(usize::MAX, self.rpc_counter);
1071 let dialer = self
1072 .nodes
1073 .get_mut(dialer.index())
1074 .ok_or_else(|| anyhow::anyhow!("node {dialer:?} not found"))?;
1075
1076 let req = mina_node::rpc::RpcRequest::P2pConnectionOutgoing(listener_addr);
1077 dialer.dispatch_event(Event::Rpc(rpc_id, Box::new(req)))
1078 }
1079 ScenarioStep::CheckTimeouts { node_id } => {
1080 let node = self
1081 .nodes
1082 .get_mut(node_id.index())
1083 .ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?;
1084 node.check_timeouts();
1085 true
1086 }
1087 ScenarioStep::AdvanceTime { by_nanos } => {
1088 for node in &mut self.nodes {
1089 node.advance_time(by_nanos)
1090 }
1091 true
1092 }
1093 ScenarioStep::AdvanceNodeTime { node_id, by_nanos } => {
1094 let node = self
1095 .nodes
1096 .get_mut(node_id.index())
1097 .ok_or_else(|| anyhow::anyhow!("node {node_id:?} not found"))?;
1098 node.advance_time(by_nanos);
1099 true
1100 }
1101 ScenarioStep::Ocaml { node_id, step } => {
1102 let node = self.ocaml_nodes.get_mut(node_id.index());
1103 let node =
1104 node.ok_or_else(|| anyhow::anyhow!("ocaml node {node_id:?} not found"))?;
1105 if matches!(step, OcamlStep::KillAndRemove) {
1106 let mut node = node.take().ok_or_else(|| {
1107 anyhow::anyhow!("tried to access removed ocaml node {node_id:?}")
1108 })?;
1109 node.exec(step).await?
1110 } else {
1111 let node = node.as_mut().ok_or_else(|| {
1112 anyhow::anyhow!("tried to access removed ocaml node {node_id:?}")
1113 })?;
1114 node.exec(step).await?
1115 }
1116 }
1117 })
1118 }
1119
1120 pub fn debugger(&self) -> Option<&Debugger> {
1121 self.debugger.as_ref()
1122 }
1123}
1124
1125impl ClusterScenarioRun {
1126 pub fn target_scenario(&self) -> Option<&Scenario> {
1127 self.chain.back().or_else(|| self.finished.last())
1128 }
1129
1130 pub fn cur_scenario(&self) -> &Scenario {
1131 self.chain.front().unwrap()
1132 }
1133
1134 pub fn peek_i(&self) -> Option<(usize, usize)> {
1135 self.chain
1136 .iter()
1137 .enumerate()
1138 .filter_map(|(i, scenario)| {
1139 let step_i = if i == 0 { self.cur_step } else { 0 };
1140 scenario.steps.get(step_i)?;
1141 Some((i, step_i))
1142 })
1143 .nth(0)
1144 }
1145
1146 pub fn peek(&self) -> Option<(&Scenario, &ScenarioStep)> {
1147 self.peek_i().map(|(scenario_i, step_i)| {
1148 let scenario = &self.chain[scenario_i];
1149 let step = &scenario.steps[step_i];
1150 (scenario, step)
1151 })
1152 }
1153
1154 fn advance(&mut self) {
1155 if let Some((scenario_i, step_i)) = self.peek_i() {
1156 self.finished.extend(self.chain.drain(..scenario_i));
1157 if self.cur_step == step_i {
1158 self.cur_step += 1;
1159 } else {
1160 self.cur_step = step_i;
1161 }
1162 }
1163 }
1164}