mina_node_testing/scenarios/multi_node/
basic_connectivity_peer_discovery.rs

1#![allow(warnings)]
2
3use std::time::Duration;
4
5use node::p2p::PeerId;
6
7use crate::{
8    cluster::ClusterOcamlNodeId,
9    node::{DaemonJson, OcamlNodeTestingConfig, OcamlStep, RustNodeTestingConfig},
10    scenario::ScenarioStep,
11    scenarios::ClusterRunner,
12};
13
14/// Global test with OCaml nodes.
15/// Run an OCaml node as a seed node. Run three normal OCaml nodes connecting only to the seed node.
16/// Wait 3 minutes for the OCaml nodes to start.
17/// Run the Rust node (application under test).
18/// Wait for the Rust node to complete peer discovery and connect to all OCaml nodes.
19/// (This step ensures that the Rust node can discover OCaml nodes).
20/// Run another OCaml node that connects only to the seed node.
21/// So this additional OCaml node only knows the address of the seed node.
22/// Wait for the additional OCaml node to initiate a connection to the Rust node.
23/// (This step ensures that the OCaml node can discover the Rust node).
24/// Fail the test on the timeout.
25#[derive(documented::Documented, Default, Clone, Copy)]
26pub struct MultiNodeBasicConnectivityPeerDiscovery;
27
28impl MultiNodeBasicConnectivityPeerDiscovery {
29    pub async fn run(self, mut runner: ClusterRunner<'_>) {
30        const STEPS: usize = 4_000;
31        const STEP_DELAY: Duration = Duration::from_millis(200);
32        const TOTAL_OCAML_NODES: u16 = 4;
33        const PAUSE_UNTIL_OCAML_NODES_READY: Duration = Duration::from_secs(30 * 60);
34
35        let ocaml_seed_config = OcamlNodeTestingConfig {
36            initial_peers: Vec::new(),
37            daemon_json: DaemonJson::Custom("/var/lib/coda/config_6929a7ec.json".to_owned()),
38            block_producer: None,
39        };
40
41        let seed_a = runner.add_ocaml_node(ocaml_seed_config.clone());
42        let seed_a_dial_addr = runner.ocaml_node(seed_a).unwrap().dial_addr();
43
44        eprintln!("launching OCaml seed node: {seed_a_dial_addr}");
45
46        let ocaml_node_config = OcamlNodeTestingConfig {
47            initial_peers: vec![seed_a_dial_addr],
48            ..ocaml_seed_config
49        };
50
51        tokio::time::sleep(Duration::from_secs(60)).await;
52
53        let nodes = (1..TOTAL_OCAML_NODES)
54            .map(|_| runner.add_ocaml_node(ocaml_node_config.clone()))
55            .collect::<Vec<_>>();
56
57        // wait for ocaml nodes to be ready
58        for node in &nodes {
59            runner
60                .exec_step(ScenarioStep::Ocaml {
61                    node_id: *node,
62                    step: OcamlStep::WaitReady {
63                        timeout: PAUSE_UNTIL_OCAML_NODES_READY,
64                    },
65                })
66                .await
67                .expect("OCaml node should be ready");
68        }
69        eprintln!("OCaml nodes should be ready now");
70
71        let config = RustNodeTestingConfig::devnet_default()
72            //.with_daemon_json("genesis_ledgers/devnet-full.json")
73            .max_peers(100)
74            .initial_peers(
75                nodes
76                    .iter()
77                    .chain(std::iter::once(&seed_a))
78                    .map(|node_id| (*node_id).into())
79                    .collect(),
80            );
81        let node_id = runner.add_rust_node(config);
82        eprintln!("launching Rust node {node_id}");
83
84        let mut additional_ocaml_node = None::<(ClusterOcamlNodeId, PeerId)>;
85
86        let mut timeout = STEPS;
87        loop {
88            if timeout == 0 {
89                break;
90            }
91            timeout -= 1;
92
93            tokio::time::sleep(STEP_DELAY).await;
94
95            let steps = runner
96                .pending_events(true)
97                .map(|(node_id, _, events)| {
98                    events.map(move |(_, event)| ScenarioStep::Event {
99                        node_id,
100                        event: event.to_string(),
101                    })
102                })
103                .flatten()
104                .collect::<Vec<_>>();
105
106            for step in steps {
107                runner.exec_step(step).await.unwrap();
108            }
109
110            runner
111                .exec_step(ScenarioStep::AdvanceNodeTime {
112                    node_id,
113                    by_nanos: STEP_DELAY.as_nanos() as _,
114                })
115                .await
116                .unwrap();
117
118            runner
119                .exec_step(ScenarioStep::CheckTimeouts { node_id })
120                .await
121                .unwrap();
122
123            let this = runner.node(node_id).unwrap();
124            // finish discovering
125            if this
126                .state()
127                .p2p
128                .ready()
129                .and_then(|p2p| p2p.network.scheduler.discovery_state())
130                .is_some_and(|discovery_state| discovery_state.is_bootstrapped())
131            {
132                // the node must find all already running OCaml nodes
133                // assert_eq!(this.state().p2p.peers.len(), TOTAL_OCAML_NODES as usize);
134                if additional_ocaml_node.is_none() {
135                    eprintln!("the Rust node finished peer discovery",);
136                    eprintln!(
137                        "connected peers: {:?}",
138                        this.state().p2p.unwrap().peers.keys().collect::<Vec<_>>()
139                    );
140                    let node_id = runner.add_ocaml_node(ocaml_node_config.clone());
141                    let node = runner.ocaml_node(node_id).unwrap();
142                    eprintln!("launching additional OCaml node {}", node.dial_addr());
143
144                    additional_ocaml_node = Some((node_id, node.peer_id()));
145                }
146            }
147
148            if let Some((_, additional_ocaml_node_peer_id)) = &additional_ocaml_node {
149                let peer_id = additional_ocaml_node_peer_id;
150
151                if runner
152                    .node(node_id)
153                    .unwrap()
154                    .state()
155                    .p2p
156                    .ready()
157                    .and_then(|p2p| {
158                        p2p.peers
159                            .iter()
160                            .filter(|(id, n)| n.is_libp2p && id == &peer_id)
161                            .filter_map(|(_, n)| n.status.as_ready())
162                            .find(|n| n.is_incoming)
163                    })
164                    .is_some()
165                {
166                    eprintln!("the additional OCaml node connected to Rust node");
167                    eprintln!("success");
168
169                    break;
170                }
171            }
172        }
173
174        if timeout == 0 {
175            panic!("timeout");
176        }
177    }
178}