mina_node_testing/scenarios/p2p/
basic_outgoing_connections.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    time::Duration,
4};
5
6use node::p2p::{
7    connection::outgoing::{P2pConnectionOutgoingInitLibp2pOpts, P2pConnectionOutgoingInitOpts},
8    identity::SecretKey,
9    P2pPeerStatus, P2pTimeouts, PeerId,
10};
11
12use crate::{
13    cluster::ClusterNodeId,
14    node::RustNodeTestingConfig,
15    scenario::ListenerNode,
16    scenarios::{
17        add_rust_nodes, add_rust_nodes_with, peer_is_ready, wait_for_connection_error,
18        wait_for_connection_established, wait_for_nodes_listening_on_localhost, ClusterRunner,
19        Driver,
20    },
21};
22
23fn custom_listener(peer_id: PeerId, port: u16) -> ListenerNode {
24    P2pConnectionOutgoingInitOpts::LibP2P(P2pConnectionOutgoingInitLibp2pOpts {
25        peer_id,
26        host: node::p2p::webrtc::Host::Ipv4([127, 0, 0, 1].into()),
27        port,
28    })
29    .into()
30}
31
32/// Node should be able to make an outgoing connection to a listening node.
33#[derive(documented::Documented, Default, Clone, Copy)]
34pub struct MakeOutgoingConnection;
35
36impl MakeOutgoingConnection {
37    pub async fn run(self, runner: ClusterRunner<'_>) {
38        let mut driver = Driver::new(runner);
39
40        let (node1, _) = driver.add_rust_node(RustNodeTestingConfig::devnet_default());
41        let (node2, peer_id2) = driver.add_rust_node(RustNodeTestingConfig::devnet_default());
42
43        // wait for the peer to listen
44        let satisfied =
45            wait_for_nodes_listening_on_localhost(&mut driver, Duration::from_secs(30), [node2])
46                .await
47                .unwrap();
48        assert!(satisfied, "the peer should be listening");
49
50        driver
51            .exec_step(crate::scenario::ScenarioStep::ConnectNodes {
52                dialer: node1,
53                listener: crate::scenario::ListenerNode::Rust(node2),
54            })
55            .await
56            .expect("connect event should be dispatched");
57
58        let connected = wait_for_connection_established(
59            &mut driver,
60            Duration::from_secs(30),
61            (node1, &peer_id2),
62        )
63        .await
64        .unwrap();
65        assert!(connected, "peer should be connected");
66
67        assert!(
68            peer_is_ready(driver.inner(), node1, &peer_id2),
69            "peer should be ready"
70        );
71    }
72}
73
74/// Node should be able to create multiple outgoing connections.
75#[derive(documented::Documented, Default, Clone, Copy)]
76pub struct MakeMultipleOutgoingConnections;
77
78impl MakeMultipleOutgoingConnections {
79    pub async fn run(self, runner: ClusterRunner<'_>) {
80        const MAX: u8 = 32;
81
82        let mut driver = Driver::new(runner);
83
84        let config =
85            RustNodeTestingConfig::devnet_default().with_timeouts(P2pTimeouts::without_rpc());
86
87        let (node_ut, _) = driver.add_rust_node(config.clone());
88        let (peers, mut peer_ids): (Vec<ClusterNodeId>, BTreeSet<PeerId>) =
89            add_rust_nodes(&mut driver, MAX, config.clone());
90
91        // wait for all peers to listen
92        let satisfied = wait_for_nodes_listening_on_localhost(
93            &mut driver,
94            Duration::from_secs(3 * 60),
95            peers.clone(),
96        )
97        .await
98        .unwrap();
99        assert!(satisfied, "all peers should be listening");
100
101        // connect node under test to all peers
102        driver
103            .run(Duration::from_secs(15))
104            .await
105            .expect("cluster should be running");
106        for peer in peers {
107            driver
108                .exec_step(crate::scenario::ScenarioStep::ConnectNodes {
109                    dialer: node_ut,
110                    listener: crate::scenario::ListenerNode::Rust(peer),
111                })
112                .await
113                .expect("connect event should be dispatched");
114        }
115
116        let connected = wait_for_connection_established(
117            &mut driver,
118            Duration::from_secs(3 * 60),
119            (node_ut, &mut peer_ids),
120        )
121        .await
122        .unwrap();
123        assert!(connected, "did not connect to peers: {:?}", peer_ids);
124    }
125}
126
127/// Node shouldn't establish connection with a node with the same peer_id.
128#[derive(documented::Documented, Default, Clone, Copy)]
129pub struct DontConnectToNodeWithSameId;
130
131impl DontConnectToNodeWithSameId {
132    pub async fn run(self, runner: ClusterRunner<'_>) {
133        let mut driver = Driver::new(runner);
134
135        let bytes: [u8; 32] = rand::random();
136
137        // start a node with the same peer_id on different port
138        let (node, _) =
139            driver.add_rust_node(RustNodeTestingConfig::devnet_default().with_peer_id(bytes));
140        // wait for it to be ready
141        assert!(
142            wait_for_nodes_listening_on_localhost(&mut driver, Duration::from_secs(30), [node])
143                .await
144                .unwrap(),
145            "node should be listening"
146        );
147
148        // start node under test with the other node as its initial peer
149        let (node_ut, _) =
150            driver.add_rust_node(RustNodeTestingConfig::devnet_default().with_peer_id(bytes));
151
152        driver
153            .exec_step(crate::scenario::ScenarioStep::ConnectNodes {
154                dialer: node_ut,
155                listener: crate::scenario::ListenerNode::Rust(node),
156            })
157            .await
158            .expect("connect event should be dispatched");
159
160        let connected =
161            wait_for_connection_established(&mut driver, Duration::from_secs(3 * 60), node_ut)
162                .await
163                .unwrap();
164        assert!(!connected, "the node sholdn't try to connect to itself");
165    }
166}
167
168/// Node shouldn't connect to itself even if its address specified in initial peers.
169#[derive(documented::Documented, Default, Clone, Copy)]
170pub struct DontConnectToSelfInitialPeer;
171
172impl DontConnectToSelfInitialPeer {
173    pub async fn run(self, runner: ClusterRunner<'_>) {
174        let mut driver = Driver::new(runner);
175
176        let bytes = [0xfe; 32];
177        let port = 13001;
178        let peer_id = SecretKey::from_bytes(bytes).public_key().peer_id();
179        let self_opts =
180            P2pConnectionOutgoingInitOpts::LibP2P(P2pConnectionOutgoingInitLibp2pOpts {
181                peer_id,
182                host: node::p2p::webrtc::Host::Ipv4([127, 0, 0, 1].into()),
183                port,
184            });
185        let (node_ut, _) = driver.add_rust_node(
186            RustNodeTestingConfig::devnet_default()
187                .with_peer_id(bytes)
188                .with_libp2p_port(port)
189                .initial_peers(vec![self_opts.into()]),
190        );
191        assert!(
192            wait_for_nodes_listening_on_localhost(&mut driver, Duration::from_secs(30), [node_ut])
193                .await
194                .unwrap(),
195            "node should be listening"
196        );
197
198        let connected =
199            wait_for_connection_established(&mut driver, Duration::from_secs(10), node_ut)
200                .await
201                .unwrap();
202        assert!(!connected, "the node sholdn't try to connect to itself");
203    }
204}
205
206/// Node shouldn't connect to a node with the same peer id even if its address specified in initial peers.
207#[derive(documented::Documented, Default, Clone, Copy)]
208pub struct DontConnectToInitialPeerWithSameId;
209
210impl DontConnectToInitialPeerWithSameId {
211    pub async fn run(self, runner: ClusterRunner<'_>) {
212        let mut driver = Driver::new(runner);
213
214        let bytes: [u8; 32] = rand::random();
215
216        // start a node with the same peer_id on different port
217        let (node, _) =
218            driver.add_rust_node(RustNodeTestingConfig::devnet_default().with_peer_id(bytes));
219        // wait for it to be ready
220        assert!(
221            wait_for_nodes_listening_on_localhost(&mut driver, Duration::from_secs(30), [node])
222                .await
223                .unwrap(),
224            "node should be listening"
225        );
226
227        // start node under test with the other node as its initial peer
228        let (node_ut, _) = driver.add_rust_node(
229            RustNodeTestingConfig::devnet_default()
230                .with_peer_id(bytes)
231                .initial_peers(vec![node.into()]),
232        );
233
234        let connected =
235            wait_for_connection_established(&mut driver, Duration::from_secs(3 * 60), node_ut)
236                .await
237                .unwrap();
238        assert!(!connected, "the node sholdn't try to connect to itself");
239    }
240}
241
242/// Node should be able to connect to all initial peers.
243#[derive(documented::Documented, Default, Clone, Copy)]
244pub struct ConnectToInitialPeers;
245
246impl ConnectToInitialPeers {
247    pub async fn run(self, runner: ClusterRunner<'_>) {
248        const MAX: u8 = 32;
249
250        let mut driver = Driver::new(runner);
251
252        let (peers, peer_ids): (Vec<ClusterNodeId>, Vec<_>) = add_rust_nodes_with(
253            &mut driver,
254            MAX,
255            RustNodeTestingConfig::devnet_default(),
256            |state| state.p2p.my_id(),
257        );
258
259        // wait for all peers to listen
260        let satisfied = wait_for_nodes_listening_on_localhost(
261            &mut driver,
262            Duration::from_secs(3 * 60),
263            peers.clone(),
264        )
265        .await
266        .unwrap();
267        assert!(satisfied, "all peers should be listening");
268
269        let initial_peers = peers.iter().map(|id| (*id).into()).collect();
270        let (node_ut, _) = driver
271            .add_rust_node(RustNodeTestingConfig::devnet_default().initial_peers(initial_peers));
272
273        // matches event "the node established connection with peer"
274        let mut peer_ids = peer_ids.into_iter().collect::<BTreeSet<_>>();
275        let connected = wait_for_connection_established(
276            &mut driver,
277            Duration::from_secs(3 * 60),
278            (node_ut, &mut peer_ids),
279        )
280        .await
281        .unwrap();
282        if !connected {
283            println!(
284                "{:#?}",
285                driver
286                    .inner()
287                    .node(node_ut)
288                    .unwrap()
289                    .state()
290                    .p2p
291                    .unwrap()
292                    .peers
293            );
294        }
295        assert!(connected, "did not connect to peers: {:?}", peer_ids);
296    }
297}
298
299/// Node should be able to connect to all initial peers after they become ready.
300#[derive(documented::Documented, Default, Clone, Copy)]
301pub struct ConnectToInitialPeersBecomeReady;
302
303impl ConnectToInitialPeersBecomeReady {
304    pub async fn run(self, runner: ClusterRunner<'_>) {
305        const MAX: u8 = 32;
306
307        let mut driver = Driver::new(runner);
308
309        let (initial_peers, peer_id_bytes_port): (Vec<_>, Vec<_>) = (0..MAX)
310            .map(|i| {
311                let bytes = [i + 1; 32];
312                let port = 12000 + i as u16;
313                let init_opts =
314                    custom_listener(SecretKey::from_bytes(bytes).public_key().peer_id(), port);
315                (init_opts, (bytes, port))
316            })
317            .unzip();
318        let (node_ut, _) = driver
319            .add_rust_node(RustNodeTestingConfig::devnet_default().initial_peers(initial_peers));
320
321        driver
322            .wait_for(Duration::from_secs(10), |_, _, _| false)
323            .await
324            .unwrap();
325
326        let (_peers, mut peer_ids): (Vec<ClusterNodeId>, BTreeSet<PeerId>) = peer_id_bytes_port
327            .into_iter()
328            .map(|(peer_id_bytes, port)| {
329                driver.add_rust_node(
330                    RustNodeTestingConfig::devnet_default()
331                        .with_libp2p_port(port)
332                        .with_peer_id(peer_id_bytes),
333                )
334            })
335            .unzip();
336
337        let connected = wait_for_connection_established(
338            &mut driver,
339            Duration::from_secs(3 * 60),
340            (node_ut, &mut peer_ids),
341        )
342        .await
343        .unwrap();
344        if !connected {
345            println!(
346                "{:#?}",
347                driver
348                    .inner()
349                    .node(node_ut)
350                    .unwrap()
351                    .state()
352                    .p2p
353                    .unwrap()
354                    .peers
355            );
356        }
357        assert!(connected, "did not connect to peers: {:?}", peer_ids);
358    }
359}
360
361/// Node should repeat connecting to unavailable initial peer.
362#[derive(documented::Documented, Default, Clone, Copy)]
363pub struct ConnectToUnavailableInitialPeers;
364
365impl ConnectToUnavailableInitialPeers {
366    pub async fn run(self, runner: ClusterRunner<'_>) {
367        const MAX: u16 = 2;
368        const RETRIES: u8 = 3;
369
370        let mut driver = Driver::new(runner);
371
372        let (initial_peers, peer_ids): (Vec<_>, Vec<_>) = (0..MAX)
373            .map(|i| {
374                let port = 11200 + i;
375                let peer_id = SecretKey::rand().public_key().peer_id();
376                let addr = ListenerNode::Custom(
377                    P2pConnectionOutgoingInitLibp2pOpts {
378                        peer_id,
379                        host: [127, 0, 0, 1].into(),
380                        port,
381                    }
382                    .into(),
383                );
384                (addr, peer_id)
385            })
386            .unzip();
387
388        let (node_ut, _) = driver.add_rust_node(
389            RustNodeTestingConfig::devnet_default()
390                .initial_peers(initial_peers)
391                .with_timeouts(P2pTimeouts {
392                    outgoing_error_reconnect_timeout: Some(Duration::from_secs(3)),
393                    ..Default::default()
394                }),
395        );
396
397        let mut peer_retries = BTreeMap::from_iter(peer_ids.into_iter().map(|port| (port, 0_u8)));
398
399        let satisfied = wait_for_connection_error(
400            &mut driver,
401            Duration::from_secs(3 * 60),
402            |node_id: ClusterNodeId, peer_id: &PeerId, peer_status: &P2pPeerStatus| {
403                if node_id != node_ut {
404                    return false;
405                }
406                assert!(
407                    peer_status.is_error(),
408                    "connection to {peer_id} shouldn't succeed"
409                );
410                let retries = peer_retries.get_mut(peer_id).unwrap();
411                *retries += 1;
412                if *retries >= RETRIES {
413                    peer_retries.remove(peer_id);
414                }
415                peer_retries.is_empty()
416            },
417        )
418        .await
419        .unwrap();
420
421        assert!(
422            satisfied,
423            "did not reach retry limit for peers: {:?}",
424            peer_retries
425        );
426    }
427}