1use std::{
2 collections::{BTreeMap, BTreeSet},
3 time::Duration,
4};
5
6use node::p2p::{
7 connection::outgoing::{P2pConnectionOutgoingInitLibp2pOpts, P2pConnectionOutgoingInitOpts},
8 identity::SecretKey,
9 P2pPeerStatus, P2pTimeouts, PeerId,
10};
11
12use crate::{
13 cluster::ClusterNodeId,
14 node::RustNodeTestingConfig,
15 scenario::ListenerNode,
16 scenarios::{
17 add_rust_nodes, add_rust_nodes_with, peer_is_ready, wait_for_connection_error,
18 wait_for_connection_established, wait_for_nodes_listening_on_localhost, ClusterRunner,
19 Driver,
20 },
21};
22
23fn custom_listener(peer_id: PeerId, port: u16) -> ListenerNode {
24 P2pConnectionOutgoingInitOpts::LibP2P(P2pConnectionOutgoingInitLibp2pOpts {
25 peer_id,
26 host: node::p2p::webrtc::Host::Ipv4([127, 0, 0, 1].into()),
27 port,
28 })
29 .into()
30}
31
32#[derive(documented::Documented, Default, Clone, Copy)]
34pub struct MakeOutgoingConnection;
35
36impl MakeOutgoingConnection {
37 pub async fn run(self, runner: ClusterRunner<'_>) {
38 let mut driver = Driver::new(runner);
39
40 let (node1, _) = driver.add_rust_node(RustNodeTestingConfig::devnet_default());
41 let (node2, peer_id2) = driver.add_rust_node(RustNodeTestingConfig::devnet_default());
42
43 let satisfied =
45 wait_for_nodes_listening_on_localhost(&mut driver, Duration::from_secs(30), [node2])
46 .await
47 .unwrap();
48 assert!(satisfied, "the peer should be listening");
49
50 driver
51 .exec_step(crate::scenario::ScenarioStep::ConnectNodes {
52 dialer: node1,
53 listener: crate::scenario::ListenerNode::Rust(node2),
54 })
55 .await
56 .expect("connect event should be dispatched");
57
58 let connected = wait_for_connection_established(
59 &mut driver,
60 Duration::from_secs(30),
61 (node1, &peer_id2),
62 )
63 .await
64 .unwrap();
65 assert!(connected, "peer should be connected");
66
67 assert!(
68 peer_is_ready(driver.inner(), node1, &peer_id2),
69 "peer should be ready"
70 );
71 }
72}
73
74#[derive(documented::Documented, Default, Clone, Copy)]
76pub struct MakeMultipleOutgoingConnections;
77
78impl MakeMultipleOutgoingConnections {
79 pub async fn run(self, runner: ClusterRunner<'_>) {
80 const MAX: u8 = 32;
81
82 let mut driver = Driver::new(runner);
83
84 let config =
85 RustNodeTestingConfig::devnet_default().with_timeouts(P2pTimeouts::without_rpc());
86
87 let (node_ut, _) = driver.add_rust_node(config.clone());
88 let (peers, mut peer_ids): (Vec<ClusterNodeId>, BTreeSet<PeerId>) =
89 add_rust_nodes(&mut driver, MAX, config.clone());
90
91 let satisfied = wait_for_nodes_listening_on_localhost(
93 &mut driver,
94 Duration::from_secs(3 * 60),
95 peers.clone(),
96 )
97 .await
98 .unwrap();
99 assert!(satisfied, "all peers should be listening");
100
101 driver
103 .run(Duration::from_secs(15))
104 .await
105 .expect("cluster should be running");
106 for peer in peers {
107 driver
108 .exec_step(crate::scenario::ScenarioStep::ConnectNodes {
109 dialer: node_ut,
110 listener: crate::scenario::ListenerNode::Rust(peer),
111 })
112 .await
113 .expect("connect event should be dispatched");
114 }
115
116 let connected = wait_for_connection_established(
117 &mut driver,
118 Duration::from_secs(3 * 60),
119 (node_ut, &mut peer_ids),
120 )
121 .await
122 .unwrap();
123 assert!(connected, "did not connect to peers: {:?}", peer_ids);
124 }
125}
126
127#[derive(documented::Documented, Default, Clone, Copy)]
129pub struct DontConnectToNodeWithSameId;
130
131impl DontConnectToNodeWithSameId {
132 pub async fn run(self, runner: ClusterRunner<'_>) {
133 let mut driver = Driver::new(runner);
134
135 let bytes: [u8; 32] = rand::random();
136
137 let (node, _) =
139 driver.add_rust_node(RustNodeTestingConfig::devnet_default().with_peer_id(bytes));
140 assert!(
142 wait_for_nodes_listening_on_localhost(&mut driver, Duration::from_secs(30), [node])
143 .await
144 .unwrap(),
145 "node should be listening"
146 );
147
148 let (node_ut, _) =
150 driver.add_rust_node(RustNodeTestingConfig::devnet_default().with_peer_id(bytes));
151
152 driver
153 .exec_step(crate::scenario::ScenarioStep::ConnectNodes {
154 dialer: node_ut,
155 listener: crate::scenario::ListenerNode::Rust(node),
156 })
157 .await
158 .expect("connect event should be dispatched");
159
160 let connected =
161 wait_for_connection_established(&mut driver, Duration::from_secs(3 * 60), node_ut)
162 .await
163 .unwrap();
164 assert!(!connected, "the node sholdn't try to connect to itself");
165 }
166}
167
168#[derive(documented::Documented, Default, Clone, Copy)]
170pub struct DontConnectToSelfInitialPeer;
171
172impl DontConnectToSelfInitialPeer {
173 pub async fn run(self, runner: ClusterRunner<'_>) {
174 let mut driver = Driver::new(runner);
175
176 let bytes = [0xfe; 32];
177 let port = 13001;
178 let peer_id = SecretKey::from_bytes(bytes).public_key().peer_id();
179 let self_opts =
180 P2pConnectionOutgoingInitOpts::LibP2P(P2pConnectionOutgoingInitLibp2pOpts {
181 peer_id,
182 host: node::p2p::webrtc::Host::Ipv4([127, 0, 0, 1].into()),
183 port,
184 });
185 let (node_ut, _) = driver.add_rust_node(
186 RustNodeTestingConfig::devnet_default()
187 .with_peer_id(bytes)
188 .with_libp2p_port(port)
189 .initial_peers(vec![self_opts.into()]),
190 );
191 assert!(
192 wait_for_nodes_listening_on_localhost(&mut driver, Duration::from_secs(30), [node_ut])
193 .await
194 .unwrap(),
195 "node should be listening"
196 );
197
198 let connected =
199 wait_for_connection_established(&mut driver, Duration::from_secs(10), node_ut)
200 .await
201 .unwrap();
202 assert!(!connected, "the node sholdn't try to connect to itself");
203 }
204}
205
206#[derive(documented::Documented, Default, Clone, Copy)]
208pub struct DontConnectToInitialPeerWithSameId;
209
210impl DontConnectToInitialPeerWithSameId {
211 pub async fn run(self, runner: ClusterRunner<'_>) {
212 let mut driver = Driver::new(runner);
213
214 let bytes: [u8; 32] = rand::random();
215
216 let (node, _) =
218 driver.add_rust_node(RustNodeTestingConfig::devnet_default().with_peer_id(bytes));
219 assert!(
221 wait_for_nodes_listening_on_localhost(&mut driver, Duration::from_secs(30), [node])
222 .await
223 .unwrap(),
224 "node should be listening"
225 );
226
227 let (node_ut, _) = driver.add_rust_node(
229 RustNodeTestingConfig::devnet_default()
230 .with_peer_id(bytes)
231 .initial_peers(vec![node.into()]),
232 );
233
234 let connected =
235 wait_for_connection_established(&mut driver, Duration::from_secs(3 * 60), node_ut)
236 .await
237 .unwrap();
238 assert!(!connected, "the node sholdn't try to connect to itself");
239 }
240}
241
242#[derive(documented::Documented, Default, Clone, Copy)]
244pub struct ConnectToInitialPeers;
245
246impl ConnectToInitialPeers {
247 pub async fn run(self, runner: ClusterRunner<'_>) {
248 const MAX: u8 = 32;
249
250 let mut driver = Driver::new(runner);
251
252 let (peers, peer_ids): (Vec<ClusterNodeId>, Vec<_>) = add_rust_nodes_with(
253 &mut driver,
254 MAX,
255 RustNodeTestingConfig::devnet_default(),
256 |state| state.p2p.my_id(),
257 );
258
259 let satisfied = wait_for_nodes_listening_on_localhost(
261 &mut driver,
262 Duration::from_secs(3 * 60),
263 peers.clone(),
264 )
265 .await
266 .unwrap();
267 assert!(satisfied, "all peers should be listening");
268
269 let initial_peers = peers.iter().map(|id| (*id).into()).collect();
270 let (node_ut, _) = driver
271 .add_rust_node(RustNodeTestingConfig::devnet_default().initial_peers(initial_peers));
272
273 let mut peer_ids = peer_ids.into_iter().collect::<BTreeSet<_>>();
275 let connected = wait_for_connection_established(
276 &mut driver,
277 Duration::from_secs(3 * 60),
278 (node_ut, &mut peer_ids),
279 )
280 .await
281 .unwrap();
282 if !connected {
283 println!(
284 "{:#?}",
285 driver
286 .inner()
287 .node(node_ut)
288 .unwrap()
289 .state()
290 .p2p
291 .unwrap()
292 .peers
293 );
294 }
295 assert!(connected, "did not connect to peers: {:?}", peer_ids);
296 }
297}
298
299#[derive(documented::Documented, Default, Clone, Copy)]
301pub struct ConnectToInitialPeersBecomeReady;
302
303impl ConnectToInitialPeersBecomeReady {
304 pub async fn run(self, runner: ClusterRunner<'_>) {
305 const MAX: u8 = 32;
306
307 let mut driver = Driver::new(runner);
308
309 let (initial_peers, peer_id_bytes_port): (Vec<_>, Vec<_>) = (0..MAX)
310 .map(|i| {
311 let bytes = [i + 1; 32];
312 let port = 12000 + i as u16;
313 let init_opts =
314 custom_listener(SecretKey::from_bytes(bytes).public_key().peer_id(), port);
315 (init_opts, (bytes, port))
316 })
317 .unzip();
318 let (node_ut, _) = driver
319 .add_rust_node(RustNodeTestingConfig::devnet_default().initial_peers(initial_peers));
320
321 driver
322 .wait_for(Duration::from_secs(10), |_, _, _| false)
323 .await
324 .unwrap();
325
326 let (_peers, mut peer_ids): (Vec<ClusterNodeId>, BTreeSet<PeerId>) = peer_id_bytes_port
327 .into_iter()
328 .map(|(peer_id_bytes, port)| {
329 driver.add_rust_node(
330 RustNodeTestingConfig::devnet_default()
331 .with_libp2p_port(port)
332 .with_peer_id(peer_id_bytes),
333 )
334 })
335 .unzip();
336
337 let connected = wait_for_connection_established(
338 &mut driver,
339 Duration::from_secs(3 * 60),
340 (node_ut, &mut peer_ids),
341 )
342 .await
343 .unwrap();
344 if !connected {
345 println!(
346 "{:#?}",
347 driver
348 .inner()
349 .node(node_ut)
350 .unwrap()
351 .state()
352 .p2p
353 .unwrap()
354 .peers
355 );
356 }
357 assert!(connected, "did not connect to peers: {:?}", peer_ids);
358 }
359}
360
361#[derive(documented::Documented, Default, Clone, Copy)]
363pub struct ConnectToUnavailableInitialPeers;
364
365impl ConnectToUnavailableInitialPeers {
366 pub async fn run(self, runner: ClusterRunner<'_>) {
367 const MAX: u16 = 2;
368 const RETRIES: u8 = 3;
369
370 let mut driver = Driver::new(runner);
371
372 let (initial_peers, peer_ids): (Vec<_>, Vec<_>) = (0..MAX)
373 .map(|i| {
374 let port = 11200 + i;
375 let peer_id = SecretKey::rand().public_key().peer_id();
376 let addr = ListenerNode::Custom(
377 P2pConnectionOutgoingInitLibp2pOpts {
378 peer_id,
379 host: [127, 0, 0, 1].into(),
380 port,
381 }
382 .into(),
383 );
384 (addr, peer_id)
385 })
386 .unzip();
387
388 let (node_ut, _) = driver.add_rust_node(
389 RustNodeTestingConfig::devnet_default()
390 .initial_peers(initial_peers)
391 .with_timeouts(P2pTimeouts {
392 outgoing_error_reconnect_timeout: Some(Duration::from_secs(3)),
393 ..Default::default()
394 }),
395 );
396
397 let mut peer_retries = BTreeMap::from_iter(peer_ids.into_iter().map(|port| (port, 0_u8)));
398
399 let satisfied = wait_for_connection_error(
400 &mut driver,
401 Duration::from_secs(3 * 60),
402 |node_id: ClusterNodeId, peer_id: &PeerId, peer_status: &P2pPeerStatus| {
403 if node_id != node_ut {
404 return false;
405 }
406 assert!(
407 peer_status.is_error(),
408 "connection to {peer_id} shouldn't succeed"
409 );
410 let retries = peer_retries.get_mut(peer_id).unwrap();
411 *retries += 1;
412 if *retries >= RETRIES {
413 peer_retries.remove(peer_id);
414 }
415 peer_retries.is_empty()
416 },
417 )
418 .await
419 .unwrap();
420
421 assert!(
422 satisfied,
423 "did not reach retry limit for peers: {:?}",
424 peer_retries
425 );
426 }
427}