mina_node_testing/scenarios/multi_node/
basic_connectivity_peer_discovery.rs1#![allow(warnings)]
2
3use std::time::Duration;
4
5use node::p2p::PeerId;
6
7use crate::{
8 cluster::ClusterOcamlNodeId,
9 node::{DaemonJson, OcamlNodeTestingConfig, OcamlStep, RustNodeTestingConfig},
10 scenario::ScenarioStep,
11 scenarios::ClusterRunner,
12};
13
14#[derive(documented::Documented, Default, Clone, Copy)]
26pub struct MultiNodeBasicConnectivityPeerDiscovery;
27
28impl MultiNodeBasicConnectivityPeerDiscovery {
29 pub async fn run(self, mut runner: ClusterRunner<'_>) {
30 const STEPS: usize = 4_000;
31 const STEP_DELAY: Duration = Duration::from_millis(200);
32 const TOTAL_OCAML_NODES: u16 = 4;
33 const PAUSE_UNTIL_OCAML_NODES_READY: Duration = Duration::from_secs(30 * 60);
34
35 let ocaml_seed_config = OcamlNodeTestingConfig {
36 initial_peers: Vec::new(),
37 daemon_json: DaemonJson::Custom("/var/lib/coda/config_6929a7ec.json".to_owned()),
38 block_producer: None,
39 };
40
41 let seed_a = runner.add_ocaml_node(ocaml_seed_config.clone());
42 let seed_a_dial_addr = runner.ocaml_node(seed_a).unwrap().dial_addr();
43
44 eprintln!("launching OCaml seed node: {seed_a_dial_addr}");
45
46 let ocaml_node_config = OcamlNodeTestingConfig {
47 initial_peers: vec![seed_a_dial_addr],
48 ..ocaml_seed_config
49 };
50
51 tokio::time::sleep(Duration::from_secs(60)).await;
52
53 let nodes = (1..TOTAL_OCAML_NODES)
54 .map(|_| runner.add_ocaml_node(ocaml_node_config.clone()))
55 .collect::<Vec<_>>();
56
57 for node in &nodes {
59 runner
60 .exec_step(ScenarioStep::Ocaml {
61 node_id: *node,
62 step: OcamlStep::WaitReady {
63 timeout: PAUSE_UNTIL_OCAML_NODES_READY,
64 },
65 })
66 .await
67 .expect("OCaml node should be ready");
68 }
69 eprintln!("OCaml nodes should be ready now");
70
71 let config = RustNodeTestingConfig::devnet_default()
72 .max_peers(100)
74 .initial_peers(
75 nodes
76 .iter()
77 .chain(std::iter::once(&seed_a))
78 .map(|node_id| (*node_id).into())
79 .collect(),
80 );
81 let node_id = runner.add_rust_node(config);
82 eprintln!("launching Rust node {node_id}");
83
84 let mut additional_ocaml_node = None::<(ClusterOcamlNodeId, PeerId)>;
85
86 let mut timeout = STEPS;
87 loop {
88 if timeout == 0 {
89 break;
90 }
91 timeout -= 1;
92
93 tokio::time::sleep(STEP_DELAY).await;
94
95 let steps = runner
96 .pending_events(true)
97 .map(|(node_id, _, events)| {
98 events.map(move |(_, event)| ScenarioStep::Event {
99 node_id,
100 event: event.to_string(),
101 })
102 })
103 .flatten()
104 .collect::<Vec<_>>();
105
106 for step in steps {
107 runner.exec_step(step).await.unwrap();
108 }
109
110 runner
111 .exec_step(ScenarioStep::AdvanceNodeTime {
112 node_id,
113 by_nanos: STEP_DELAY.as_nanos() as _,
114 })
115 .await
116 .unwrap();
117
118 runner
119 .exec_step(ScenarioStep::CheckTimeouts { node_id })
120 .await
121 .unwrap();
122
123 let this = runner.node(node_id).unwrap();
124 if this
126 .state()
127 .p2p
128 .ready()
129 .and_then(|p2p| p2p.network.scheduler.discovery_state())
130 .is_some_and(|discovery_state| discovery_state.is_bootstrapped())
131 {
132 if additional_ocaml_node.is_none() {
135 eprintln!("the Rust node finished peer discovery",);
136 eprintln!(
137 "connected peers: {:?}",
138 this.state().p2p.unwrap().peers.keys().collect::<Vec<_>>()
139 );
140 let node_id = runner.add_ocaml_node(ocaml_node_config.clone());
141 let node = runner.ocaml_node(node_id).unwrap();
142 eprintln!("launching additional OCaml node {}", node.dial_addr());
143
144 additional_ocaml_node = Some((node_id, node.peer_id()));
145 }
146 }
147
148 if let Some((_, additional_ocaml_node_peer_id)) = &additional_ocaml_node {
149 let peer_id = additional_ocaml_node_peer_id;
150
151 if runner
152 .node(node_id)
153 .unwrap()
154 .state()
155 .p2p
156 .ready()
157 .and_then(|p2p| {
158 p2p.peers
159 .iter()
160 .filter(|(id, n)| n.is_libp2p && id == &peer_id)
161 .filter_map(|(_, n)| n.status.as_ready())
162 .find(|n| n.is_incoming)
163 })
164 .is_some()
165 {
166 eprintln!("the additional OCaml node connected to Rust node");
167 eprintln!("success");
168
169 break;
170 }
171 }
172 }
173
174 if timeout == 0 {
175 panic!("timeout");
176 }
177 }
178}