mina_node_testing/scenarios/multi_node/
connection_discovery.rs

1use std::time::Duration;
2
3use crate::{
4    cluster::{ClusterNodeId, ClusterOcamlNodeId},
5    node::{OcamlNodeTestingConfig, RustNodeTestingConfig},
6    scenario::{ListenerNode, ScenarioStep},
7    scenarios::{get_peers_iter, ClusterRunner, RunCfg, PEERS_QUERY},
8};
9use anyhow::Context;
10use node::{
11    p2p::{identify::P2pIdentifyAction, peer::P2pPeerAction, PeerId},
12    ActionKind, P2pAction,
13};
14use tokio::time::sleep;
15
16/// Ensure that Rust node can pass information about peers when used as a seed node.
17/// 1. Create rust seed node and wait for it to be ready
18/// 2. Create 2 Ocaml nodes with rust seed node as initial peer
19/// 3. Check that Ocaml nodes know each other via rust seed node
20#[derive(documented::Documented, Default, Clone, Copy)]
21pub struct RustNodeAsSeed;
22
23impl RustNodeAsSeed {
24    pub async fn run(self, mut runner: ClusterRunner<'_>) {
25        std::env::set_var("MINA_DISCOVERY_FILTER_ADDR", "false");
26        let rust_node_id = runner.add_rust_node(RustNodeTestingConfig::devnet_default());
27        let rust_node_dial_addr = runner.node(rust_node_id).unwrap().dial_addr();
28        let rust_peer_id = *rust_node_dial_addr.peer_id();
29        wait_for_node_ready(&mut runner, rust_node_id).await;
30
31        let ocaml_node_config = OcamlNodeTestingConfig {
32            initial_peers: vec![rust_node_dial_addr],
33            ..Default::default()
34        };
35
36        let ocaml_node0 = runner.add_ocaml_node(ocaml_node_config.clone());
37        let ocaml_peer_id0 = runner.ocaml_node(ocaml_node0).unwrap().peer_id();
38
39        // this is needed, to make sure that `ocaml_node0` connects before `ocaml_node1`
40        sleep(Duration::from_secs(30)).await;
41
42        let ocaml_node1 = runner.add_ocaml_node(ocaml_node_config.clone());
43        let ocaml_peer_id1 = runner.ocaml_node(ocaml_node1).unwrap().peer_id();
44
45        wait_for_ready_connection(
46            &mut runner,
47            rust_node_id,
48            ocaml_peer_id0,
49            true,
50            Some(Duration::from_secs(500)),
51        )
52        .await;
53        wait_for_ready_connection(
54            &mut runner,
55            rust_node_id,
56            ocaml_peer_id1,
57            true,
58            Some(Duration::from_secs(300)),
59        )
60        .await;
61
62        let _ = runner
63            .run(RunCfg::default().timeout(Duration::from_secs(120)))
64            .await;
65
66        let ocaml_node0_check =
67            check_ocaml_peers(&mut runner, ocaml_node0, [rust_peer_id, ocaml_peer_id1])
68                .await
69                .unwrap_or_default();
70
71        let ocaml_node1_check =
72            check_ocaml_peers(&mut runner, ocaml_node1, [rust_peer_id, ocaml_peer_id0])
73                .await
74                .unwrap_or_default();
75
76        assert!(ocaml_node0_check, "OCaml node 0 doesn't have valid peers");
77        assert!(ocaml_node1_check, "OCaml node 1 doesn't have valid peers");
78
79        let has_peer_in_routing_table =
80            check_kademlia_entries(&mut runner, rust_node_id, [ocaml_peer_id0, ocaml_peer_id1])
81                .unwrap_or_default();
82
83        assert!(
84            has_peer_in_routing_table,
85            "Peers not found in rust node's routing table"
86        );
87    }
88}
89
90/// Test Rust node peer discovery when OCaml node connects to it
91/// 1. Create rust node and wait for it to be ready
92/// 2. Create OCaml node with rust node as initial peer
93/// 3. Check that OCaml node connects to rust node
94#[derive(documented::Documented, Default, Clone, Copy)]
95pub struct OCamlToRust;
96
97impl OCamlToRust {
98    pub async fn run(self, mut runner: ClusterRunner<'_>) {
99        std::env::set_var("MINA_DISCOVERY_FILTER_ADDR", "false");
100        let rust_node_id = runner.add_rust_node(RustNodeTestingConfig::devnet_default());
101        let rust_node_dial_addr = runner.node(rust_node_id).unwrap().dial_addr();
102        let rust_peer_id = *rust_node_dial_addr.peer_id();
103        wait_for_node_ready(&mut runner, rust_node_id).await;
104
105        let ocaml_node_config = OcamlNodeTestingConfig {
106            initial_peers: vec![rust_node_dial_addr],
107            ..Default::default()
108        };
109
110        let ocaml_node = runner.add_ocaml_node(ocaml_node_config.clone());
111        let ocaml_peer_id = runner.ocaml_node(ocaml_node).unwrap().peer_id();
112
113        wait_for_ready_connection(
114            &mut runner,
115            rust_node_id,
116            ocaml_peer_id,
117            true,
118            Some(Duration::from_secs(300)),
119        )
120        .await;
121
122        wait_for_identify(
123            &mut runner,
124            rust_node_id,
125            ocaml_peer_id,
126            "github.com/codaprotocol/coda/tree/master/src/app/libp2p_helper",
127        )
128        .await;
129
130        let ocaml_check = check_ocaml_peers(&mut runner, ocaml_node, [rust_peer_id])
131            .await
132            .expect("Error querying graphql");
133
134        assert!(ocaml_check, "OCaml node doesn't have rust as peer");
135    }
136}
137
138/// Tests Rust node peer discovery when it connects to OCaml node
139/// 1. Create Rust node and wait for it to be ready
140/// 2. Create OCaml node and wait for it to be ready
141/// 3. Connect rust node to Ocaml node
142/// 4. Check that it is connected
143/// 5. Check for kademlia and identify
144#[derive(documented::Documented, Default, Clone, Copy)]
145pub struct RustToOCaml;
146
147impl RustToOCaml {
148    pub async fn run(self, mut runner: ClusterRunner<'_>) {
149        std::env::set_var("MINA_DISCOVERY_FILTER_ADDR", "false");
150        let rust_node_id = runner.add_rust_node(RustNodeTestingConfig::devnet_default());
151        let rust_peer_id = runner.node(rust_node_id).expect("Node not found").peer_id();
152        wait_for_node_ready(&mut runner, rust_node_id).await;
153
154        let ocaml_seed_config = OcamlNodeTestingConfig::default();
155
156        let seed_node = runner.add_ocaml_node(ocaml_seed_config);
157        let seed_peer_id = runner.ocaml_node(seed_node).unwrap().peer_id();
158
159        runner.wait_for_ocaml(seed_node).await;
160
161        runner
162            .exec_step(ScenarioStep::ConnectNodes {
163                dialer: rust_node_id,
164                listener: ListenerNode::Ocaml(seed_node),
165            })
166            .await
167            .expect("Error connecting nodes");
168
169        wait_for_ready_connection(&mut runner, rust_node_id, seed_peer_id, false, None).await;
170        wait_for_identify(
171            &mut runner,
172            rust_node_id,
173            seed_peer_id,
174            "github.com/codaprotocol/coda/tree/master/src/app/libp2p_helper",
175        )
176        .await;
177
178        let ocaml_has_rust_peer = check_ocaml_peers(&mut runner, seed_node, [rust_peer_id])
179            .await
180            .unwrap_or_default();
181        assert!(ocaml_has_rust_peer, "Ocaml doesn't have rust node");
182    }
183}
184
185/// Tests Rust node peer discovery when OCaml node is connected to it via an OCaml seed node.
186/// 1. Create Rust node and wait for it to be ready
187/// 2. Create OCaml seed node and wait for it to be ready
188/// 3. Connect rust node to OCaml seed node
189/// 4. Create OCaml node and connect to OCaml seed node
190/// 5. Check that OCaml node connects to rust node via seed
191#[derive(documented::Documented, Default, Clone, Copy)]
192pub struct OCamlToRustViaSeed;
193
194impl OCamlToRustViaSeed {
195    pub async fn run(self, mut runner: ClusterRunner<'_>) {
196        std::env::set_var("MINA_DISCOVERY_FILTER_ADDR", "false");
197        let rust_node_id = runner.add_rust_node(RustNodeTestingConfig::devnet_default());
198        wait_for_node_ready(&mut runner, rust_node_id).await;
199
200        let ocaml_seed_config = OcamlNodeTestingConfig::default();
201        let seed_node = runner.add_ocaml_node(ocaml_seed_config.clone());
202        let (seed_peer_id, seed_addr) = runner
203            .ocaml_node(seed_node)
204            .map(|node| (node.peer_id(), node.dial_addr()))
205            .unwrap();
206
207        runner.wait_for_ocaml(seed_node).await;
208
209        runner
210            .exec_step(ScenarioStep::ConnectNodes {
211                dialer: rust_node_id,
212                listener: ListenerNode::Ocaml(seed_node),
213            })
214            .await
215            .unwrap();
216
217        wait_for_ready_connection(&mut runner, rust_node_id, seed_peer_id, false, None).await;
218
219        let ocaml_node = runner.add_ocaml_node(OcamlNodeTestingConfig {
220            initial_peers: vec![seed_addr],
221            ..ocaml_seed_config
222        });
223        let ocaml_peer_id = runner.ocaml_node(ocaml_node).unwrap().peer_id();
224
225        runner.wait_for_ocaml(ocaml_node).await;
226        wait_for_ready_connection(&mut runner, rust_node_id, ocaml_peer_id, true, None).await;
227    }
228}
229
230/// Tests Rust node peer discovery when it connects to OCaml node via an OCaml seed node.
231/// 1. Create rust node and wait for it to be ready
232/// 2. Create OCaml seed node and wait for it to be ready
233/// 3. Create OCaml node with OCaml seed as initial peer
234/// 4. Wait for OCaml node to be ready
235/// 5. Connect rust node to OCaml seed node
236/// 6. Check that rust node connects to OCaml node via OCaml seed node
237#[derive(documented::Documented, Default, Clone, Copy)]
238pub struct RustToOCamlViaSeed;
239
240impl RustToOCamlViaSeed {
241    pub async fn run(self, mut runner: ClusterRunner<'_>) {
242        std::env::set_var("MINA_DISCOVERY_FILTER_ADDR", "false");
243        let rust_node_id = runner.add_rust_node(RustNodeTestingConfig::devnet_default());
244        wait_for_node_ready(&mut runner, rust_node_id).await;
245
246        let ocaml_seed_config = OcamlNodeTestingConfig::default();
247
248        let seed_node = runner.add_ocaml_node(ocaml_seed_config.clone());
249        runner.wait_for_ocaml(seed_node).await;
250
251        let (seed_peer_id, seed_addr) = runner
252            .ocaml_node(seed_node)
253            .map(|node| (node.peer_id(), node.dial_addr()))
254            .unwrap();
255
256        let ocaml_node = runner.add_ocaml_node(OcamlNodeTestingConfig {
257            initial_peers: vec![seed_addr],
258            ..ocaml_seed_config
259        });
260
261        let ocaml_peer_id = runner.ocaml_node(ocaml_node).unwrap().peer_id();
262        runner.wait_for_ocaml(ocaml_node).await;
263
264        runner
265            .exec_step(ScenarioStep::ConnectNodes {
266                dialer: rust_node_id,
267                listener: ListenerNode::Ocaml(seed_node),
268            })
269            .await
270            .unwrap();
271
272        wait_for_ready_connection(&mut runner, rust_node_id, seed_peer_id, false, None).await;
273        wait_for_ready_connection(&mut runner, rust_node_id, ocaml_peer_id, false, None).await;
274    }
275}
276
277pub async fn wait_for_node_ready(runner: &mut ClusterRunner<'_>, node_id: ClusterNodeId) {
278    runner
279        .run(RunCfg::default().action_handler(move |id, _, _, action| {
280            node_id == id && matches!(action.action().kind(), ActionKind::P2pInitializeInitialize)
281        }))
282        .await
283        .expect("Node not ready")
284}
285
286pub async fn wait_for_identify(
287    runner: &mut ClusterRunner<'_>,
288    node_id: ClusterNodeId,
289    connecting_peer_id: PeerId,
290    agent_version: &str,
291) {
292    let agent_version = agent_version.to_owned();
293    runner
294        .run(
295            RunCfg::default()
296                .action_handler(move |id, _, _, action| {
297                    id == node_id
298                        && matches!(
299                            action.action(),
300                            node::Action::P2p(P2pAction::Identify(P2pIdentifyAction::UpdatePeerInformation {
301                                peer_id,
302                                info,
303                                ..
304                            })) if peer_id == &connecting_peer_id && info.agent_version == Some(agent_version.to_string())
305                        )
306                }),
307        )
308        .await
309        .expect("Identify not exchanged");
310}
311
312async fn wait_for_ready_connection(
313    runner: &mut ClusterRunner<'_>,
314    node_id: ClusterNodeId,
315    connecting_peer_id: PeerId,
316    incoming_: bool,
317    duration: Option<Duration>,
318) {
319    runner
320        .run(
321            RunCfg::default()
322                .timeout(duration.unwrap_or(Duration::from_secs(60)))
323                .action_handler(move |id, _, _, action| {
324                    id == node_id
325                        && matches!(
326                            action.action(),
327                            &node::Action::P2p(P2pAction::Peer(P2pPeerAction::Ready {
328                                peer_id,
329                                incoming
330                            })) if peer_id == connecting_peer_id && incoming == incoming_
331                        )
332                }),
333        )
334        .await
335        .expect("Nodes not connected");
336}
337
338async fn check_ocaml_peers<A>(
339    runner: &mut ClusterRunner<'_>,
340    node_id: ClusterOcamlNodeId,
341    peer_ids: A,
342) -> anyhow::Result<bool>
343where
344    A: IntoIterator<Item = PeerId>,
345{
346    let data = runner
347        .ocaml_node(node_id)
348        .expect("OCaml node not found")
349        .grapql_query(PEERS_QUERY)
350        .await?;
351
352    let peers = get_peers_iter(&data)
353        .with_context(|| "Failed to convert graphql response")?
354        .flatten()
355        .map(|peer| peer.2.to_owned())
356        .collect::<Vec<_>>();
357
358    Ok(peer_ids
359        .into_iter()
360        .all(|peer_id| peers.contains(&peer_id.to_libp2p_string())))
361}
362
363pub fn check_kademlia_entries<A>(
364    runner: &mut ClusterRunner<'_>,
365    node_id: ClusterNodeId,
366    peer_ids: A,
367) -> anyhow::Result<bool>
368where
369    A: IntoIterator<Item = PeerId>,
370{
371    let table = &runner
372        .node(node_id)
373        .with_context(|| "Node not found")?
374        .state()
375        .p2p
376        .ready()
377        .with_context(|| "P2p state not ready")?
378        .network
379        .scheduler
380        .discovery_state()
381        .with_context(|| "Discovery state not ready")?
382        .routing_table;
383
384    Ok(peer_ids.into_iter().all(|peer_id| {
385        table
386            .look_up(&peer_id.try_into().unwrap())
387            .map(|entry| entry.peer_id == peer_id)
388            .unwrap_or_default()
389    }))
390}