mina_node_testing/scenarios/p2p/
basic_connection_handling.rs

1use std::time::Duration;
2
3use node::{
4    event_source::Event,
5    p2p::{P2pConnectionEvent, P2pEvent, P2pState, P2pTimeouts, PeerId},
6};
7
8use crate::{
9    node::RustNodeTestingConfig,
10    scenarios::{
11        add_rust_nodes1, connect_rust_nodes, get_peer_state, peer_is_ready, run_until_no_events,
12        wait_for_connection_event, wait_for_nodes_listening_on_localhost, ClusterRunner,
13        ConnectionPredicates, Driver,
14    },
15};
16
17fn has_active_peer(p2p_state: &P2pState, peer_id: &PeerId) -> bool {
18    p2p_state.ready_peers_iter().any(|(id, _)| id == peer_id)
19}
20
21/// Two nodes should properly handle a situation when they are connecting to each other simultaneously.
22#[derive(documented::Documented, Default, Clone, Copy)]
23pub struct SimultaneousConnections;
24
25impl SimultaneousConnections {
26    pub async fn run(self, runner: ClusterRunner<'_>) {
27        let mut driver = Driver::new(runner);
28
29        let testing_config = RustNodeTestingConfig::devnet_default().with_timeouts(P2pTimeouts {
30            // test might be failing because of best tip RPC timeout...
31            best_tip_with_proof: None,
32            ..Default::default()
33        });
34        let (node1, peer_id1) = driver.add_rust_node(testing_config.clone());
35        let (node2, peer_id2) = driver.add_rust_node(testing_config);
36
37        assert!(
38            wait_for_nodes_listening_on_localhost(
39                &mut driver,
40                Duration::from_secs(30),
41                [node1, node2]
42            )
43            .await
44            .unwrap(),
45            "nodes should be listening"
46        );
47
48        driver
49            .exec_step(crate::scenario::ScenarioStep::ConnectNodes {
50                dialer: node1,
51                listener: crate::scenario::ListenerNode::Rust(node2),
52            })
53            .await
54            .expect("connect event should be dispatched");
55        driver
56            .exec_step(crate::scenario::ScenarioStep::ConnectNodes {
57                dialer: node2,
58                listener: crate::scenario::ListenerNode::Rust(node1),
59            })
60            .await
61            .expect("connect event should be dispatched");
62
63        // Run the cluster while there are events
64        let quiet =
65            run_until_no_events(&mut driver, Duration::from_secs(5), Duration::from_secs(20))
66                .await
67                .unwrap();
68        assert!(
69            quiet,
70            "no quiet period with no events since nodes are connected"
71        );
72
73        assert!(
74            peer_is_ready(driver.inner(), node1, &peer_id2),
75            "node2 should be a ready peer of node1, but it is {:?}",
76            get_peer_state(driver.inner(), node1, &peer_id2)
77        );
78        assert!(
79            peer_is_ready(driver.inner(), node2, &peer_id1),
80            "node2 should be a ready peer of node1, but it is {:?}",
81            get_peer_state(driver.inner(), node2, &peer_id1)
82        );
83    }
84}
85
86/// Connections between all peers are symmetric, i.e. iff the node1 has the node2 among its active peers, then the node2 should have the node1 as its active peers.
87#[derive(documented::Documented, Default, Clone, Copy)]
88pub struct AllNodesConnectionsAreSymmetric;
89
90impl AllNodesConnectionsAreSymmetric {
91    pub async fn run(self, runner: ClusterRunner<'_>) {
92        const MAX: u16 = 32;
93
94        let mut driver = Driver::new(runner);
95
96        let testing_config = RustNodeTestingConfig::devnet_default().with_timeouts(P2pTimeouts {
97            // test might be failing because of best tip RPC timeout...
98            best_tip_with_proof: None,
99            ..Default::default()
100        });
101
102        let (seed_id, _) = driver.add_rust_node(testing_config.clone());
103
104        let peers: Vec<_> = (0..MAX)
105            .map(|_| {
106                driver.add_rust_node(testing_config.clone().initial_peers(vec![seed_id.into()]))
107            })
108            .collect();
109
110        // Run the cluster while there are events
111        let quiet = run_until_no_events(
112            &mut driver,
113            Duration::from_secs(5),
114            Duration::from_secs(2 * 60),
115        )
116        .await
117        .unwrap();
118        assert!(
119            quiet,
120            "no quiet period with no events since nodes are connected"
121        );
122
123        // Check that for each peer, if it is in the node's peer list, then the node is in the peer's peer list
124        for (peer1, peer_id1) in &peers {
125            let peer1_p2p_state = &driver.inner().node(*peer1).unwrap().state().p2p.unwrap();
126            for (peer2, peer_id2) in &peers {
127                if peer2 == peer1 {
128                    continue;
129                }
130                let peer2_p2p_state = &driver.inner().node(*peer2).unwrap().state().p2p.unwrap();
131
132                if has_active_peer(peer2_p2p_state, peer_id1) {
133                    assert!(
134                        has_active_peer(peer1_p2p_state, peer_id2),
135                        "node {peer2} should be an active peer of the node {peer1}, but it is {:?}",
136                        peer1_p2p_state.peers.get(peer_id2)
137                    );
138                } else {
139                    assert!(
140                        !has_active_peer(peer1_p2p_state, peer_id2),
141                        "node {peer2} should not be an active peer of the node {peer1}, but it is"
142                    );
143                }
144            }
145        }
146    }
147}
148
149/// Connections with other peers are symmetric for seed node, i.e. iff a node is the seed's peer, then it has the node among its peers.
150#[derive(documented::Documented, Default, Clone, Copy)]
151pub struct SeedConnectionsAreSymmetric;
152
153impl SeedConnectionsAreSymmetric {
154    pub async fn run(self, runner: ClusterRunner<'_>) {
155        const MAX: u16 = 32;
156
157        let mut driver = Driver::new(runner);
158
159        let (node_ut, node_ut_peer_id) =
160            driver.add_rust_node(RustNodeTestingConfig::devnet_default_no_rpc_timeouts());
161
162        let peers: Vec<_> = (0..MAX)
163            .map(|_| {
164                driver.add_rust_node(
165                    RustNodeTestingConfig::devnet_default_no_rpc_timeouts()
166                        .initial_peers(vec![node_ut.into()]),
167                )
168            })
169            .collect();
170
171        // Run the cluster for a while
172        driver
173            .run_until(Duration::from_secs(2 * 60), |_, _, _| false)
174            .await
175            .unwrap();
176
177        // Check that for each peer, if it is in the node's peer list, then the node is in the peer's peer list
178        for (peer, peer_id) in peers {
179            if peer_is_ready(driver.inner(), peer, &node_ut_peer_id) {
180                assert!(
181                    peer_is_ready(driver.inner(), node_ut, &peer_id),
182                    "node {peer} should be in the node's peer list"
183                );
184            } else {
185                assert!(
186                    !peer_is_ready(driver.inner(), node_ut, &peer_id),
187                    "node {peer} should't be in the node's peer list"
188                );
189            }
190        }
191    }
192}
193
194/// A Rust node's incoming connections should be limited.
195#[derive(documented::Documented, Default, Clone, Copy)]
196pub struct MaxNumberOfPeersIncoming;
197
198impl MaxNumberOfPeersIncoming {
199    pub async fn run(self, runner: ClusterRunner<'_>) {
200        const TOTAL: u16 = 32;
201        const MAX: u16 = 16;
202
203        let mut driver = Driver::new(runner);
204
205        let (node_ut, nut_peer_id) = driver.add_rust_node(
206            RustNodeTestingConfig::devnet_default_no_rpc_timeouts().max_peers(MAX.into()),
207        );
208
209        let config = RustNodeTestingConfig::devnet_default().with_timeouts(P2pTimeouts {
210            // don't reconnect to the node under test
211            reconnect_timeout: None,
212            ..P2pTimeouts::without_rpc()
213        });
214        let peers: Vec<_> = add_rust_nodes1(&mut driver, TOTAL, config);
215
216        // wait for all peers to listen
217        let satisfied = wait_for_nodes_listening_on_localhost(
218            &mut driver,
219            Duration::from_secs(3 * 60),
220            [node_ut],
221        )
222        .await
223        .unwrap();
224        assert!(satisfied, "all peers should be listening");
225
226        println!("connecting nodes....");
227
228        for (peer, _peer_id) in &peers {
229            connect_rust_nodes(driver.inner_mut(), *peer, node_ut).await;
230            let connected = wait_for_connection_event(
231                &mut driver,
232                Duration::from_secs(60),
233                (*peer, ConnectionPredicates::PeerFinalized(nut_peer_id)),
234            )
235            .await
236            .unwrap();
237            assert!(connected, "connection to node {peer} is not finalized");
238        }
239
240        println!("running cluster...");
241        driver.run(Duration::from_secs(60)).await.unwrap();
242        println!("checking assertions...");
243
244        // check that the number of ready peers does not exceed the maximal allowed number
245        let state = driver.inner().node(node_ut).unwrap().state();
246        let count = state.p2p.ready_peers_iter().count();
247        assert!(
248            count <= usize::from(MAX),
249            "max number of peers exceeded: {count}"
250        );
251
252        // check that the number of nodes with the node as their peer does not exceed the maximal allowed number
253        let peers_connected = || {
254            peers
255                .iter()
256                .filter_map(|(peer, _)| driver.inner().node(*peer))
257                .filter(|peer| {
258                    peer.state()
259                        .p2p
260                        .get_peer(&nut_peer_id)
261                        .and_then(|peer| peer.status.as_ready())
262                        .is_some()
263                })
264        };
265        assert!(
266            peers_connected().count() <= usize::from(MAX),
267            "peers connections to the node exceed the max number of connections: {}",
268            peers_connected().count()
269        );
270    }
271}
272
273/// Two nodes with max peers = 1 can connect to each other.
274#[derive(documented::Documented, Default, Clone, Copy)]
275pub struct MaxNumberOfPeersIs1;
276
277impl MaxNumberOfPeersIs1 {
278    pub async fn run(self, runner: ClusterRunner<'_>) {
279        const CONNECTED_TIME_SEC: u64 = 10;
280        let mut driver = Driver::new(runner);
281
282        let (node1, _) = driver.add_rust_node(RustNodeTestingConfig::devnet_default().max_peers(1));
283        let (node2, _) = driver.add_rust_node(RustNodeTestingConfig::devnet_default().max_peers(1));
284
285        assert!(
286            wait_for_nodes_listening_on_localhost(&mut driver, Duration::from_secs(30), [node2])
287                .await
288                .unwrap(),
289            "nodes should be listening"
290        );
291
292        driver
293            .exec_step(crate::scenario::ScenarioStep::ConnectNodes {
294                dialer: node1,
295                listener: crate::scenario::ListenerNode::Rust(node2),
296            })
297            .await
298            .expect("connect event should be dispatched");
299
300        // Run the cluster while there are events
301        let disconnected = driver
302            .run_until(Duration::from_secs(CONNECTED_TIME_SEC), |_, event, _| {
303                matches!(
304                    event,
305                    Event::P2p(P2pEvent::Connection(P2pConnectionEvent::Closed(_)))
306                )
307            })
308            .await
309            .unwrap();
310
311        assert!(!disconnected, "there shouldn't be a disconnection");
312    }
313}
314
315/// Two nodes should stay connected for a long period of time.
316///
317/// TODO: this is worth to make it slightly more sophisticated...
318#[derive(documented::Documented, Default, Clone, Copy)]
319pub struct ConnectionStability;
320
321impl ConnectionStability {
322    pub async fn run(self, runner: ClusterRunner<'_>) {
323        const CONNECTED_TIME_SEC: u64 = 60;
324        let mut driver = Driver::new(runner);
325
326        let (node1, _) = driver.add_rust_node(RustNodeTestingConfig::devnet_default().max_peers(1));
327        let (node2, _) = driver.add_rust_node(RustNodeTestingConfig::devnet_default().max_peers(1));
328
329        assert!(
330            wait_for_nodes_listening_on_localhost(&mut driver, Duration::from_secs(30), [node2])
331                .await
332                .unwrap(),
333            "nodes should be listening"
334        );
335
336        driver
337            .exec_step(crate::scenario::ScenarioStep::ConnectNodes {
338                dialer: node1,
339                listener: crate::scenario::ListenerNode::Rust(node2),
340            })
341            .await
342            .expect("connect event should be dispatched");
343
344        // Run the cluster while there are events
345        let disconnected = driver
346            .run_until(Duration::from_secs(CONNECTED_TIME_SEC), |_, event, _| {
347                matches!(
348                    event,
349                    Event::P2p(P2pEvent::Connection(P2pConnectionEvent::Closed(_)))
350                )
351            })
352            .await
353            .unwrap();
354
355        assert!(!disconnected, "there shouldn't be a disconnection");
356    }
357}