mina_node_testing/scenarios/multi_node/
connection_discovery.rs1use std::time::Duration;
2
3use crate::{
4 cluster::{ClusterNodeId, ClusterOcamlNodeId},
5 node::{OcamlNodeTestingConfig, RustNodeTestingConfig},
6 scenario::{ListenerNode, ScenarioStep},
7 scenarios::{get_peers_iter, ClusterRunner, RunCfg, PEERS_QUERY},
8};
9use anyhow::Context;
10use node::{
11 p2p::{identify::P2pIdentifyAction, peer::P2pPeerAction, PeerId},
12 ActionKind, P2pAction,
13};
14use tokio::time::sleep;
15
16#[derive(documented::Documented, Default, Clone, Copy)]
21pub struct RustNodeAsSeed;
22
23impl RustNodeAsSeed {
24 pub async fn run(self, mut runner: ClusterRunner<'_>) {
25 std::env::set_var("MINA_DISCOVERY_FILTER_ADDR", "false");
26 let rust_node_id = runner.add_rust_node(RustNodeTestingConfig::devnet_default());
27 let rust_node_dial_addr = runner.node(rust_node_id).unwrap().dial_addr();
28 let rust_peer_id = *rust_node_dial_addr.peer_id();
29 wait_for_node_ready(&mut runner, rust_node_id).await;
30
31 let ocaml_node_config = OcamlNodeTestingConfig {
32 initial_peers: vec![rust_node_dial_addr],
33 ..Default::default()
34 };
35
36 let ocaml_node0 = runner.add_ocaml_node(ocaml_node_config.clone());
37 let ocaml_peer_id0 = runner.ocaml_node(ocaml_node0).unwrap().peer_id();
38
39 sleep(Duration::from_secs(30)).await;
41
42 let ocaml_node1 = runner.add_ocaml_node(ocaml_node_config.clone());
43 let ocaml_peer_id1 = runner.ocaml_node(ocaml_node1).unwrap().peer_id();
44
45 wait_for_ready_connection(
46 &mut runner,
47 rust_node_id,
48 ocaml_peer_id0,
49 true,
50 Some(Duration::from_secs(500)),
51 )
52 .await;
53 wait_for_ready_connection(
54 &mut runner,
55 rust_node_id,
56 ocaml_peer_id1,
57 true,
58 Some(Duration::from_secs(300)),
59 )
60 .await;
61
62 let _ = runner
63 .run(RunCfg::default().timeout(Duration::from_secs(120)))
64 .await;
65
66 let ocaml_node0_check =
67 check_ocaml_peers(&mut runner, ocaml_node0, [rust_peer_id, ocaml_peer_id1])
68 .await
69 .unwrap_or_default();
70
71 let ocaml_node1_check =
72 check_ocaml_peers(&mut runner, ocaml_node1, [rust_peer_id, ocaml_peer_id0])
73 .await
74 .unwrap_or_default();
75
76 assert!(ocaml_node0_check, "OCaml node 0 doesn't have valid peers");
77 assert!(ocaml_node1_check, "OCaml node 1 doesn't have valid peers");
78
79 let has_peer_in_routing_table =
80 check_kademlia_entries(&mut runner, rust_node_id, [ocaml_peer_id0, ocaml_peer_id1])
81 .unwrap_or_default();
82
83 assert!(
84 has_peer_in_routing_table,
85 "Peers not found in rust node's routing table"
86 );
87 }
88}
89
90#[derive(documented::Documented, Default, Clone, Copy)]
95pub struct OCamlToRust;
96
97impl OCamlToRust {
98 pub async fn run(self, mut runner: ClusterRunner<'_>) {
99 std::env::set_var("MINA_DISCOVERY_FILTER_ADDR", "false");
100 let rust_node_id = runner.add_rust_node(RustNodeTestingConfig::devnet_default());
101 let rust_node_dial_addr = runner.node(rust_node_id).unwrap().dial_addr();
102 let rust_peer_id = *rust_node_dial_addr.peer_id();
103 wait_for_node_ready(&mut runner, rust_node_id).await;
104
105 let ocaml_node_config = OcamlNodeTestingConfig {
106 initial_peers: vec![rust_node_dial_addr],
107 ..Default::default()
108 };
109
110 let ocaml_node = runner.add_ocaml_node(ocaml_node_config.clone());
111 let ocaml_peer_id = runner.ocaml_node(ocaml_node).unwrap().peer_id();
112
113 wait_for_ready_connection(
114 &mut runner,
115 rust_node_id,
116 ocaml_peer_id,
117 true,
118 Some(Duration::from_secs(300)),
119 )
120 .await;
121
122 wait_for_identify(
123 &mut runner,
124 rust_node_id,
125 ocaml_peer_id,
126 "github.com/codaprotocol/coda/tree/master/src/app/libp2p_helper",
127 )
128 .await;
129
130 let ocaml_check = check_ocaml_peers(&mut runner, ocaml_node, [rust_peer_id])
131 .await
132 .expect("Error querying graphql");
133
134 assert!(ocaml_check, "OCaml node doesn't have rust as peer");
135 }
136}
137
138#[derive(documented::Documented, Default, Clone, Copy)]
145pub struct RustToOCaml;
146
147impl RustToOCaml {
148 pub async fn run(self, mut runner: ClusterRunner<'_>) {
149 std::env::set_var("MINA_DISCOVERY_FILTER_ADDR", "false");
150 let rust_node_id = runner.add_rust_node(RustNodeTestingConfig::devnet_default());
151 let rust_peer_id = runner.node(rust_node_id).expect("Node not found").peer_id();
152 wait_for_node_ready(&mut runner, rust_node_id).await;
153
154 let ocaml_seed_config = OcamlNodeTestingConfig::default();
155
156 let seed_node = runner.add_ocaml_node(ocaml_seed_config);
157 let seed_peer_id = runner.ocaml_node(seed_node).unwrap().peer_id();
158
159 runner.wait_for_ocaml(seed_node).await;
160
161 runner
162 .exec_step(ScenarioStep::ConnectNodes {
163 dialer: rust_node_id,
164 listener: ListenerNode::Ocaml(seed_node),
165 })
166 .await
167 .expect("Error connecting nodes");
168
169 wait_for_ready_connection(&mut runner, rust_node_id, seed_peer_id, false, None).await;
170 wait_for_identify(
171 &mut runner,
172 rust_node_id,
173 seed_peer_id,
174 "github.com/codaprotocol/coda/tree/master/src/app/libp2p_helper",
175 )
176 .await;
177
178 let ocaml_has_rust_peer = check_ocaml_peers(&mut runner, seed_node, [rust_peer_id])
179 .await
180 .unwrap_or_default();
181 assert!(ocaml_has_rust_peer, "Ocaml doesn't have rust node");
182 }
183}
184
185#[derive(documented::Documented, Default, Clone, Copy)]
192pub struct OCamlToRustViaSeed;
193
194impl OCamlToRustViaSeed {
195 pub async fn run(self, mut runner: ClusterRunner<'_>) {
196 std::env::set_var("MINA_DISCOVERY_FILTER_ADDR", "false");
197 let rust_node_id = runner.add_rust_node(RustNodeTestingConfig::devnet_default());
198 wait_for_node_ready(&mut runner, rust_node_id).await;
199
200 let ocaml_seed_config = OcamlNodeTestingConfig::default();
201 let seed_node = runner.add_ocaml_node(ocaml_seed_config.clone());
202 let (seed_peer_id, seed_addr) = runner
203 .ocaml_node(seed_node)
204 .map(|node| (node.peer_id(), node.dial_addr()))
205 .unwrap();
206
207 runner.wait_for_ocaml(seed_node).await;
208
209 runner
210 .exec_step(ScenarioStep::ConnectNodes {
211 dialer: rust_node_id,
212 listener: ListenerNode::Ocaml(seed_node),
213 })
214 .await
215 .unwrap();
216
217 wait_for_ready_connection(&mut runner, rust_node_id, seed_peer_id, false, None).await;
218
219 let ocaml_node = runner.add_ocaml_node(OcamlNodeTestingConfig {
220 initial_peers: vec![seed_addr],
221 ..ocaml_seed_config
222 });
223 let ocaml_peer_id = runner.ocaml_node(ocaml_node).unwrap().peer_id();
224
225 runner.wait_for_ocaml(ocaml_node).await;
226 wait_for_ready_connection(&mut runner, rust_node_id, ocaml_peer_id, true, None).await;
227 }
228}
229
230#[derive(documented::Documented, Default, Clone, Copy)]
238pub struct RustToOCamlViaSeed;
239
240impl RustToOCamlViaSeed {
241 pub async fn run(self, mut runner: ClusterRunner<'_>) {
242 std::env::set_var("MINA_DISCOVERY_FILTER_ADDR", "false");
243 let rust_node_id = runner.add_rust_node(RustNodeTestingConfig::devnet_default());
244 wait_for_node_ready(&mut runner, rust_node_id).await;
245
246 let ocaml_seed_config = OcamlNodeTestingConfig::default();
247
248 let seed_node = runner.add_ocaml_node(ocaml_seed_config.clone());
249 runner.wait_for_ocaml(seed_node).await;
250
251 let (seed_peer_id, seed_addr) = runner
252 .ocaml_node(seed_node)
253 .map(|node| (node.peer_id(), node.dial_addr()))
254 .unwrap();
255
256 let ocaml_node = runner.add_ocaml_node(OcamlNodeTestingConfig {
257 initial_peers: vec![seed_addr],
258 ..ocaml_seed_config
259 });
260
261 let ocaml_peer_id = runner.ocaml_node(ocaml_node).unwrap().peer_id();
262 runner.wait_for_ocaml(ocaml_node).await;
263
264 runner
265 .exec_step(ScenarioStep::ConnectNodes {
266 dialer: rust_node_id,
267 listener: ListenerNode::Ocaml(seed_node),
268 })
269 .await
270 .unwrap();
271
272 wait_for_ready_connection(&mut runner, rust_node_id, seed_peer_id, false, None).await;
273 wait_for_ready_connection(&mut runner, rust_node_id, ocaml_peer_id, false, None).await;
274 }
275}
276
277pub async fn wait_for_node_ready(runner: &mut ClusterRunner<'_>, node_id: ClusterNodeId) {
278 runner
279 .run(RunCfg::default().action_handler(move |id, _, _, action| {
280 node_id == id && matches!(action.action().kind(), ActionKind::P2pInitializeInitialize)
281 }))
282 .await
283 .expect("Node not ready")
284}
285
286pub async fn wait_for_identify(
287 runner: &mut ClusterRunner<'_>,
288 node_id: ClusterNodeId,
289 connecting_peer_id: PeerId,
290 agent_version: &str,
291) {
292 let agent_version = agent_version.to_owned();
293 runner
294 .run(
295 RunCfg::default()
296 .action_handler(move |id, _, _, action| {
297 id == node_id
298 && matches!(
299 action.action(),
300 node::Action::P2p(P2pAction::Identify(P2pIdentifyAction::UpdatePeerInformation {
301 peer_id,
302 info,
303 ..
304 })) if peer_id == &connecting_peer_id && info.agent_version == Some(agent_version.to_string())
305 )
306 }),
307 )
308 .await
309 .expect("Identify not exchanged");
310}
311
312async fn wait_for_ready_connection(
313 runner: &mut ClusterRunner<'_>,
314 node_id: ClusterNodeId,
315 connecting_peer_id: PeerId,
316 incoming_: bool,
317 duration: Option<Duration>,
318) {
319 runner
320 .run(
321 RunCfg::default()
322 .timeout(duration.unwrap_or(Duration::from_secs(60)))
323 .action_handler(move |id, _, _, action| {
324 id == node_id
325 && matches!(
326 action.action(),
327 &node::Action::P2p(P2pAction::Peer(P2pPeerAction::Ready {
328 peer_id,
329 incoming
330 })) if peer_id == connecting_peer_id && incoming == incoming_
331 )
332 }),
333 )
334 .await
335 .expect("Nodes not connected");
336}
337
338async fn check_ocaml_peers<A>(
339 runner: &mut ClusterRunner<'_>,
340 node_id: ClusterOcamlNodeId,
341 peer_ids: A,
342) -> anyhow::Result<bool>
343where
344 A: IntoIterator<Item = PeerId>,
345{
346 let data = runner
347 .ocaml_node(node_id)
348 .expect("OCaml node not found")
349 .grapql_query(PEERS_QUERY)
350 .await?;
351
352 let peers = get_peers_iter(&data)
353 .with_context(|| "Failed to convert graphql response")?
354 .flatten()
355 .map(|peer| peer.2.to_owned())
356 .collect::<Vec<_>>();
357
358 Ok(peer_ids
359 .into_iter()
360 .all(|peer_id| peers.contains(&peer_id.to_libp2p_string())))
361}
362
363pub fn check_kademlia_entries<A>(
364 runner: &mut ClusterRunner<'_>,
365 node_id: ClusterNodeId,
366 peer_ids: A,
367) -> anyhow::Result<bool>
368where
369 A: IntoIterator<Item = PeerId>,
370{
371 let table = &runner
372 .node(node_id)
373 .with_context(|| "Node not found")?
374 .state()
375 .p2p
376 .ready()
377 .with_context(|| "P2p state not ready")?
378 .network
379 .scheduler
380 .discovery_state()
381 .with_context(|| "Discovery state not ready")?
382 .routing_table;
383
384 Ok(peer_ids.into_iter().all(|peer_id| {
385 table
386 .look_up(&peer_id.try_into().unwrap())
387 .map(|entry| entry.peer_id == peer_id)
388 .unwrap_or_default()
389 }))
390}