mina_node_testing/scenarios/solo_node/
basic_connectivity_initial_joining.rs1#![allow(warnings)]
2
3use std::{collections::HashMap, time::Duration};
4
5use libp2p::Multiaddr;
6
7use node::{
8 core::log::{debug, system_time},
9 p2p::connection::outgoing::P2pConnectionOutgoingInitOpts,
10};
11
12use crate::{
13 hosts,
14 node::RustNodeTestingConfig,
15 scenario::{ListenerNode, ScenarioStep},
16 scenarios::ClusterRunner,
17};
18
19#[derive(documented::Documented, Default, Clone, Copy)]
26pub struct SoloNodeBasicConnectivityInitialJoining;
27
28impl SoloNodeBasicConnectivityInitialJoining {
29 pub async fn run(self, mut runner: ClusterRunner<'_>) {
30 const MAX_PEERS_PER_NODE: usize = 100;
31 const KNOWN_PEERS: usize = 5; const STEPS: usize = 3_000;
33 const STEP_DELAY: Duration = Duration::from_millis(200);
34
35 let initial_peers = hosts::devnet();
36 eprintln!("set max peers per node: {MAX_PEERS_PER_NODE}");
37 for seed in &initial_peers {
38 eprintln!("add initial peer: {seed:?}");
39 }
40 let config = RustNodeTestingConfig::devnet_default()
41 .max_peers(MAX_PEERS_PER_NODE)
42 .initial_peers(initial_peers);
43
44 let node_id = runner.add_rust_node(config);
45 let peer_id = libp2p::PeerId::try_from(
46 runner
47 .node(node_id)
48 .expect("must exist")
49 .state()
50 .p2p
51 .my_id(),
52 )
53 .unwrap();
54 eprintln!("launch Mina Rust node, id: {node_id}, peer_id: {peer_id}");
55
56 for step in 0..STEPS {
57 tokio::time::sleep(STEP_DELAY).await;
58
59 let steps = runner
60 .pending_events(true)
61 .map(|(node_id, _, events)| {
62 events.map(move |(_, event)| ScenarioStep::Event {
63 node_id,
64 event: event.to_string(),
65 })
66 })
67 .flatten()
68 .collect::<Vec<_>>();
69
70 for step in steps {
71 runner.exec_step(step).await.unwrap();
72 }
73
74 runner
75 .exec_step(ScenarioStep::AdvanceNodeTime {
76 node_id,
77 by_nanos: STEP_DELAY.as_nanos() as _,
78 })
79 .await
80 .unwrap();
81
82 runner
83 .exec_step(ScenarioStep::CheckTimeouts { node_id })
84 .await
85 .unwrap();
86
87 let node = runner.node(node_id).expect("must exist");
88 let ready_peers = node.state().p2p.ready_peers_iter().count();
89 let my_id = node.state().p2p.my_id();
90 let known_peers: usize = node
91 .state()
92 .p2p
93 .ready()
94 .and_then(|p2p| p2p.network.scheduler.discovery_state())
95 .map_or(0, |discovery_state| {
96 discovery_state
97 .routing_table
98 .closest_peers(&my_id.try_into().unwrap())
99 .count()
100 });
101
102 println!("step: {step}");
103 println!("known peers: {known_peers}");
104 println!("connected peers: {ready_peers}");
105
106 if ready_peers >= KNOWN_PEERS && known_peers >= KNOWN_PEERS {
108 debug!(system_time(); "Step: {}, known peers: {}, connected peers: {}, success", step, known_peers, ready_peers);
109
110 if let Some(debugger) = runner.debugger() {
111 tokio::time::sleep(Duration::from_secs(10)).await;
112 let connections = debugger
113 .connections_raw(0)
114 .map(|(id, c)| (id, (c.info.addr, c.info.fd, c.info.pid, c.incoming)))
115 .collect::<HashMap<_, _>>();
116
117 for (id, cn) in &connections {
119 eprintln!("{id}: {}", serde_json::to_string(cn).unwrap());
120 }
121 for (id, msg) in debugger.messages(0, "") {
123 eprintln!("{id}: {}", serde_json::to_string(&msg).unwrap());
124 }
125 let connections = debugger
127 .connections()
128 .filter_map(|id| Some((id, connections.get(&id)?.clone())))
129 .collect::<HashMap<_, _>>();
130 let incoming = connections.iter().filter(|(_, (_, _, _, i))| *i).count();
131 let outgoing = connections.len() - incoming;
132 eprintln!(
133 "debugger seen {incoming} incoming connections and {outgoing} outgoing connections",
134 );
135 let state_machine_peers = if cfg!(feature = "p2p-webrtc") {
136 ready_peers
137 } else {
138 ready_peers.max(known_peers)
139 };
140 assert_eq!(
141 incoming + outgoing,
142 state_machine_peers,
143 "debugger must see the same number of connections as the state machine"
144 );
145 } else {
146 eprintln!("no debugger, run test with --use-debugger for additional check");
147 }
148
149 return;
150 }
151 }
152
153 panic!("timeout");
154 }
155}