mina_node_testing/scenarios/p2p/
basic_connection_handling.rs1use std::time::Duration;
2
3use node::{
4 event_source::Event,
5 p2p::{P2pConnectionEvent, P2pEvent, P2pState, P2pTimeouts, PeerId},
6};
7
8use crate::{
9 node::RustNodeTestingConfig,
10 scenarios::{
11 add_rust_nodes1, connect_rust_nodes, get_peer_state, peer_is_ready, run_until_no_events,
12 wait_for_connection_event, wait_for_nodes_listening_on_localhost, ClusterRunner,
13 ConnectionPredicates, Driver,
14 },
15};
16
17fn has_active_peer(p2p_state: &P2pState, peer_id: &PeerId) -> bool {
18 p2p_state.ready_peers_iter().any(|(id, _)| id == peer_id)
19}
20
21#[derive(documented::Documented, Default, Clone, Copy)]
23pub struct SimultaneousConnections;
24
25impl SimultaneousConnections {
26 pub async fn run(self, runner: ClusterRunner<'_>) {
27 let mut driver = Driver::new(runner);
28
29 let testing_config = RustNodeTestingConfig::devnet_default().with_timeouts(P2pTimeouts {
30 best_tip_with_proof: None,
32 ..Default::default()
33 });
34 let (node1, peer_id1) = driver.add_rust_node(testing_config.clone());
35 let (node2, peer_id2) = driver.add_rust_node(testing_config);
36
37 assert!(
38 wait_for_nodes_listening_on_localhost(
39 &mut driver,
40 Duration::from_secs(30),
41 [node1, node2]
42 )
43 .await
44 .unwrap(),
45 "nodes should be listening"
46 );
47
48 driver
49 .exec_step(crate::scenario::ScenarioStep::ConnectNodes {
50 dialer: node1,
51 listener: crate::scenario::ListenerNode::Rust(node2),
52 })
53 .await
54 .expect("connect event should be dispatched");
55 driver
56 .exec_step(crate::scenario::ScenarioStep::ConnectNodes {
57 dialer: node2,
58 listener: crate::scenario::ListenerNode::Rust(node1),
59 })
60 .await
61 .expect("connect event should be dispatched");
62
63 let quiet =
65 run_until_no_events(&mut driver, Duration::from_secs(5), Duration::from_secs(20))
66 .await
67 .unwrap();
68 assert!(
69 quiet,
70 "no quiet period with no events since nodes are connected"
71 );
72
73 assert!(
74 peer_is_ready(driver.inner(), node1, &peer_id2),
75 "node2 should be a ready peer of node1, but it is {:?}",
76 get_peer_state(driver.inner(), node1, &peer_id2)
77 );
78 assert!(
79 peer_is_ready(driver.inner(), node2, &peer_id1),
80 "node2 should be a ready peer of node1, but it is {:?}",
81 get_peer_state(driver.inner(), node2, &peer_id1)
82 );
83 }
84}
85
86#[derive(documented::Documented, Default, Clone, Copy)]
88pub struct AllNodesConnectionsAreSymmetric;
89
90impl AllNodesConnectionsAreSymmetric {
91 pub async fn run(self, runner: ClusterRunner<'_>) {
92 const MAX: u16 = 32;
93
94 let mut driver = Driver::new(runner);
95
96 let testing_config = RustNodeTestingConfig::devnet_default().with_timeouts(P2pTimeouts {
97 best_tip_with_proof: None,
99 ..Default::default()
100 });
101
102 let (seed_id, _) = driver.add_rust_node(testing_config.clone());
103
104 let peers: Vec<_> = (0..MAX)
105 .map(|_| {
106 driver.add_rust_node(testing_config.clone().initial_peers(vec![seed_id.into()]))
107 })
108 .collect();
109
110 let quiet = run_until_no_events(
112 &mut driver,
113 Duration::from_secs(5),
114 Duration::from_secs(2 * 60),
115 )
116 .await
117 .unwrap();
118 assert!(
119 quiet,
120 "no quiet period with no events since nodes are connected"
121 );
122
123 for (peer1, peer_id1) in &peers {
125 let peer1_p2p_state = &driver.inner().node(*peer1).unwrap().state().p2p.unwrap();
126 for (peer2, peer_id2) in &peers {
127 if peer2 == peer1 {
128 continue;
129 }
130 let peer2_p2p_state = &driver.inner().node(*peer2).unwrap().state().p2p.unwrap();
131
132 if has_active_peer(peer2_p2p_state, peer_id1) {
133 assert!(
134 has_active_peer(peer1_p2p_state, peer_id2),
135 "node {peer2} should be an active peer of the node {peer1}, but it is {:?}",
136 peer1_p2p_state.peers.get(peer_id2)
137 );
138 } else {
139 assert!(
140 !has_active_peer(peer1_p2p_state, peer_id2),
141 "node {peer2} should not be an active peer of the node {peer1}, but it is"
142 );
143 }
144 }
145 }
146 }
147}
148
149#[derive(documented::Documented, Default, Clone, Copy)]
151pub struct SeedConnectionsAreSymmetric;
152
153impl SeedConnectionsAreSymmetric {
154 pub async fn run(self, runner: ClusterRunner<'_>) {
155 const MAX: u16 = 32;
156
157 let mut driver = Driver::new(runner);
158
159 let (node_ut, node_ut_peer_id) =
160 driver.add_rust_node(RustNodeTestingConfig::devnet_default_no_rpc_timeouts());
161
162 let peers: Vec<_> = (0..MAX)
163 .map(|_| {
164 driver.add_rust_node(
165 RustNodeTestingConfig::devnet_default_no_rpc_timeouts()
166 .initial_peers(vec![node_ut.into()]),
167 )
168 })
169 .collect();
170
171 driver
173 .run_until(Duration::from_secs(2 * 60), |_, _, _| false)
174 .await
175 .unwrap();
176
177 for (peer, peer_id) in peers {
179 if peer_is_ready(driver.inner(), peer, &node_ut_peer_id) {
180 assert!(
181 peer_is_ready(driver.inner(), node_ut, &peer_id),
182 "node {peer} should be in the node's peer list"
183 );
184 } else {
185 assert!(
186 !peer_is_ready(driver.inner(), node_ut, &peer_id),
187 "node {peer} should't be in the node's peer list"
188 );
189 }
190 }
191 }
192}
193
194#[derive(documented::Documented, Default, Clone, Copy)]
196pub struct MaxNumberOfPeersIncoming;
197
198impl MaxNumberOfPeersIncoming {
199 pub async fn run(self, runner: ClusterRunner<'_>) {
200 const TOTAL: u16 = 32;
201 const MAX: u16 = 16;
202
203 let mut driver = Driver::new(runner);
204
205 let (node_ut, nut_peer_id) = driver.add_rust_node(
206 RustNodeTestingConfig::devnet_default_no_rpc_timeouts().max_peers(MAX.into()),
207 );
208
209 let config = RustNodeTestingConfig::devnet_default().with_timeouts(P2pTimeouts {
210 reconnect_timeout: None,
212 ..P2pTimeouts::without_rpc()
213 });
214 let peers: Vec<_> = add_rust_nodes1(&mut driver, TOTAL, config);
215
216 let satisfied = wait_for_nodes_listening_on_localhost(
218 &mut driver,
219 Duration::from_secs(3 * 60),
220 [node_ut],
221 )
222 .await
223 .unwrap();
224 assert!(satisfied, "all peers should be listening");
225
226 println!("connecting nodes....");
227
228 for (peer, _peer_id) in &peers {
229 connect_rust_nodes(driver.inner_mut(), *peer, node_ut).await;
230 let connected = wait_for_connection_event(
231 &mut driver,
232 Duration::from_secs(60),
233 (*peer, ConnectionPredicates::PeerFinalized(nut_peer_id)),
234 )
235 .await
236 .unwrap();
237 assert!(connected, "connection to node {peer} is not finalized");
238 }
239
240 println!("running cluster...");
241 driver.run(Duration::from_secs(60)).await.unwrap();
242 println!("checking assertions...");
243
244 let state = driver.inner().node(node_ut).unwrap().state();
246 let count = state.p2p.ready_peers_iter().count();
247 assert!(
248 count <= usize::from(MAX),
249 "max number of peers exceeded: {count}"
250 );
251
252 let peers_connected = || {
254 peers
255 .iter()
256 .filter_map(|(peer, _)| driver.inner().node(*peer))
257 .filter(|peer| {
258 peer.state()
259 .p2p
260 .get_peer(&nut_peer_id)
261 .and_then(|peer| peer.status.as_ready())
262 .is_some()
263 })
264 };
265 assert!(
266 peers_connected().count() <= usize::from(MAX),
267 "peers connections to the node exceed the max number of connections: {}",
268 peers_connected().count()
269 );
270 }
271}
272
273#[derive(documented::Documented, Default, Clone, Copy)]
275pub struct MaxNumberOfPeersIs1;
276
277impl MaxNumberOfPeersIs1 {
278 pub async fn run(self, runner: ClusterRunner<'_>) {
279 const CONNECTED_TIME_SEC: u64 = 10;
280 let mut driver = Driver::new(runner);
281
282 let (node1, _) = driver.add_rust_node(RustNodeTestingConfig::devnet_default().max_peers(1));
283 let (node2, _) = driver.add_rust_node(RustNodeTestingConfig::devnet_default().max_peers(1));
284
285 assert!(
286 wait_for_nodes_listening_on_localhost(&mut driver, Duration::from_secs(30), [node2])
287 .await
288 .unwrap(),
289 "nodes should be listening"
290 );
291
292 driver
293 .exec_step(crate::scenario::ScenarioStep::ConnectNodes {
294 dialer: node1,
295 listener: crate::scenario::ListenerNode::Rust(node2),
296 })
297 .await
298 .expect("connect event should be dispatched");
299
300 let disconnected = driver
302 .run_until(Duration::from_secs(CONNECTED_TIME_SEC), |_, event, _| {
303 matches!(
304 event,
305 Event::P2p(P2pEvent::Connection(P2pConnectionEvent::Closed(_)))
306 )
307 })
308 .await
309 .unwrap();
310
311 assert!(!disconnected, "there shouldn't be a disconnection");
312 }
313}
314
315#[derive(documented::Documented, Default, Clone, Copy)]
319pub struct ConnectionStability;
320
321impl ConnectionStability {
322 pub async fn run(self, runner: ClusterRunner<'_>) {
323 const CONNECTED_TIME_SEC: u64 = 60;
324 let mut driver = Driver::new(runner);
325
326 let (node1, _) = driver.add_rust_node(RustNodeTestingConfig::devnet_default().max_peers(1));
327 let (node2, _) = driver.add_rust_node(RustNodeTestingConfig::devnet_default().max_peers(1));
328
329 assert!(
330 wait_for_nodes_listening_on_localhost(&mut driver, Duration::from_secs(30), [node2])
331 .await
332 .unwrap(),
333 "nodes should be listening"
334 );
335
336 driver
337 .exec_step(crate::scenario::ScenarioStep::ConnectNodes {
338 dialer: node1,
339 listener: crate::scenario::ListenerNode::Rust(node2),
340 })
341 .await
342 .expect("connect event should be dispatched");
343
344 let disconnected = driver
346 .run_until(Duration::from_secs(CONNECTED_TIME_SEC), |_, event, _| {
347 matches!(
348 event,
349 Event::P2p(P2pEvent::Connection(P2pConnectionEvent::Closed(_)))
350 )
351 })
352 .await
353 .unwrap();
354
355 assert!(!disconnected, "there shouldn't be a disconnection");
356 }
357}