mina_node_testing/scenarios/multi_node/
basic_connectivity_initial_joining.rs1#![allow(warnings)]
2
3use std::{
4 collections::{BTreeMap, HashMap},
5 time::Duration,
6};
7
8use node::{
9 event_source::Event,
10 p2p::{P2pConnectionEvent, P2pEvent},
11};
12
13use crate::{node::RustNodeTestingConfig, scenario::ScenarioStep, scenarios::ClusterRunner};
14
15#[derive(documented::Documented, Default, Clone, Copy)]
23pub struct MultiNodeBasicConnectivityInitialJoining;
24
25impl MultiNodeBasicConnectivityInitialJoining {
26 pub async fn run(self, mut runner: ClusterRunner<'_>) {
27 const TOTAL_PEERS: usize = 20;
28 const STEPS_PER_PEER: usize = 10;
29 const EXTRA_STEPS: usize = 2000;
30 const MAX_PEERS_PER_NODE: usize = 12;
31 const STEP_DELAY: Duration = Duration::from_millis(200);
32
33 let seed_node =
34 runner.add_rust_node(RustNodeTestingConfig::devnet_default().max_peers(TOTAL_PEERS));
35
36 eprintln!("launch Mina seed node, id: {seed_node}");
37
38 let mut nodes = vec![seed_node];
39
40 for step in 0..(TOTAL_PEERS * STEPS_PER_PEER + EXTRA_STEPS) {
41 tokio::time::sleep(STEP_DELAY).await;
42
43 if step % STEPS_PER_PEER == 0 && nodes.len() < TOTAL_PEERS {
44 let node = runner.add_rust_node(
45 RustNodeTestingConfig::devnet_default()
46 .max_peers(MAX_PEERS_PER_NODE)
47 .initial_peers(vec![seed_node.into()]),
48 );
49 eprintln!("launch Mina node, id: {node}, connects to {seed_node}");
50
51 nodes.push(node);
52 }
53
54 let mut connection_events = BTreeMap::<_, BTreeMap<_, Vec<String>>>::default();
55
56 let mut steps = vec![];
57 for node_id in &nodes {
58 let node_id = *node_id;
59 let this_id = runner.node(node_id).unwrap().state().p2p.my_id();
60
61 let node_steps = runner
62 .node_pending_events(node_id, true)
63 .unwrap()
64 .1
65 .map(|(_, event)| {
66 match event {
67 Event::P2p(P2pEvent::Connection(P2pConnectionEvent::Finalized(
68 peer_id,
69 result,
70 ))) => connection_events
71 .entry(this_id)
72 .or_default()
73 .entry(*peer_id)
74 .or_default()
75 .push(
76 result
77 .as_ref()
78 .err()
79 .cloned()
80 .unwrap_or_else(|| "ok".to_owned()),
81 ),
82 _ => {}
83 }
84 ScenarioStep::Event {
85 node_id,
86 event: event.to_string(),
87 }
88 })
89 .collect::<Vec<_>>();
90 steps.extend(node_steps);
91 }
92
93 for step in steps {
94 runner.exec_step(step).await.unwrap();
95 }
96
97 if nodes.len() < TOTAL_PEERS {
98 continue;
99 }
100
101 let mut conditions_met = true;
102 for &node_id in &nodes {
103 runner
104 .exec_step(ScenarioStep::AdvanceNodeTime {
105 node_id,
106 by_nanos: STEP_DELAY.as_nanos() as _,
107 })
108 .await
109 .unwrap();
110
111 runner
112 .exec_step(ScenarioStep::CheckTimeouts { node_id })
113 .await
114 .unwrap();
115
116 let node = runner.node(node_id).expect("node must exist");
117 let p2p = &node.state().p2p;
118 let ready_peers = p2p.ready_peers_iter().count();
119
120 conditions_met &=
122 p2p.ready().is_some() && ready_peers >= node.state().p2p.unwrap().min_peers();
123
124 let max_peers = if node_id == seed_node {
126 TOTAL_PEERS
127 } else {
128 MAX_PEERS_PER_NODE
129 };
130 assert!(ready_peers <= max_peers);
131 }
132
133 if conditions_met {
134 for (this_id, events) in &connection_events {
135 for (peer_id, results) in events {
136 for result in results {
137 eprintln!("{this_id} <-> {peer_id}: {result}");
138 }
139 }
140 }
141
142 let mut total_connections_known = 0;
143 let mut total_connections_ready = 0;
144 for &node_id in &nodes {
145 let node = runner.node(node_id).expect("node must exist");
146
147 let p2p = &node.state().p2p;
148 let ready_peers = p2p.ready_peers_iter().count();
149 let my_id = p2p.my_id();
150
151 let known_peers: usize = node
152 .state()
153 .p2p
154 .ready()
155 .and_then(|p2p| p2p.network.scheduler.discovery_state())
156 .map_or(0, |discovery_state| {
157 discovery_state
158 .routing_table
159 .closest_peers(&my_id.try_into().unwrap())
160 .count()
161 });
162 let state_machine_peers = if cfg!(feature = "p2p-webrtc") {
163 ready_peers
164 } else {
165 ready_peers.max(known_peers)
166 };
167 total_connections_ready += ready_peers;
168 total_connections_known += state_machine_peers;
169 eprintln!("node {} has {ready_peers} peers", p2p.my_id(),);
170 }
171
172 if let Some(debugger) = runner.debugger() {
174 tokio::time::sleep(Duration::from_secs(10)).await;
175
176 let connections = debugger
177 .connections_raw(0)
178 .map(|(id, c)| (id, (c.info.addr, c.info.fd, c.info.pid, c.incoming)))
179 .collect::<HashMap<_, _>>();
180
181 for (id, cn) in &connections {
183 eprintln!("{id}: {}", serde_json::to_string(cn).unwrap());
184 }
185 for (id, msg) in debugger.messages(0, "") {
187 eprintln!("{id}: {}", serde_json::to_string(&msg).unwrap());
188 }
189 let connections = debugger
191 .connections()
192 .filter_map(|id| Some((id, connections.get(&id)?.clone())))
193 .collect::<HashMap<_, _>>();
194 let incoming = connections.iter().filter(|(_, (_, _, _, i))| *i).count();
195 let outgoing = connections.len() - incoming;
196 eprintln!(
197 "debugger seen {incoming} incoming connections and {outgoing} outgoing connections",
198 );
199 let state_machine_peers = if cfg!(feature = "p2p-webrtc") {
200 total_connections_ready
201 } else {
202 total_connections_ready.max(total_connections_known)
203 };
204 assert_eq!(
205 incoming + outgoing,
206 state_machine_peers,
207 "debugger must see the same number of connections as the state machine"
208 );
209 } else {
210 eprintln!("no debugger, run test with --use-debugger for additional check");
211 }
212
213 eprintln!("success");
214
215 return;
216 }
217 }
218
219 for node_id in &nodes {
220 let node = runner.node(*node_id).expect("node must exist");
221 let p2p: &node::p2p::P2pState = &node.state().p2p.unwrap();
222 let ready_peers = p2p.ready_peers_iter().count();
223 println!("must hold {ready_peers} >= {}", p2p.min_peers());
225 }
226
227 assert!(false);
233 }
234}