mina_node_testing/scenarios/multi_node/
basic_connectivity_initial_joining.rs

1#![allow(warnings)]
2
3use std::{
4    collections::{BTreeMap, HashMap},
5    time::Duration,
6};
7
8use node::{
9    event_source::Event,
10    p2p::{P2pConnectionEvent, P2pEvent},
11};
12
13use crate::{node::RustNodeTestingConfig, scenario::ScenarioStep, scenarios::ClusterRunner};
14
15/// Global test that aims to be deterministic.
16/// Launch `TOTAL_PEERS` number of nodes with `MAX_PEERS_PER_NODE` is est as the maximum number of peers.
17/// Launch a seed node where `TOTAL_PEERS` is set as the maximum number of peers.
18/// Run the simulation until the following condition is satisfied:
19/// * Each node is connected to a number of peers determined by the `P2pState::min_peers` method.
20/// Fail the test if any node exceeds the maximum number of connections.
21/// Fail the test if the specified number of steps occur but the condition is not met.
22#[derive(documented::Documented, Default, Clone, Copy)]
23pub struct MultiNodeBasicConnectivityInitialJoining;
24
25impl MultiNodeBasicConnectivityInitialJoining {
26    pub async fn run(self, mut runner: ClusterRunner<'_>) {
27        const TOTAL_PEERS: usize = 20;
28        const STEPS_PER_PEER: usize = 10;
29        const EXTRA_STEPS: usize = 2000;
30        const MAX_PEERS_PER_NODE: usize = 12;
31        const STEP_DELAY: Duration = Duration::from_millis(200);
32
33        let seed_node =
34            runner.add_rust_node(RustNodeTestingConfig::devnet_default().max_peers(TOTAL_PEERS));
35
36        eprintln!("launch Mina seed node, id: {seed_node}");
37
38        let mut nodes = vec![seed_node];
39
40        for step in 0..(TOTAL_PEERS * STEPS_PER_PEER + EXTRA_STEPS) {
41            tokio::time::sleep(STEP_DELAY).await;
42
43            if step % STEPS_PER_PEER == 0 && nodes.len() < TOTAL_PEERS {
44                let node = runner.add_rust_node(
45                    RustNodeTestingConfig::devnet_default()
46                        .max_peers(MAX_PEERS_PER_NODE)
47                        .initial_peers(vec![seed_node.into()]),
48                );
49                eprintln!("launch Mina node, id: {node}, connects to {seed_node}");
50
51                nodes.push(node);
52            }
53
54            let mut connection_events = BTreeMap::<_, BTreeMap<_, Vec<String>>>::default();
55
56            let mut steps = vec![];
57            for node_id in &nodes {
58                let node_id = *node_id;
59                let this_id = runner.node(node_id).unwrap().state().p2p.my_id();
60
61                let node_steps = runner
62                    .node_pending_events(node_id, true)
63                    .unwrap()
64                    .1
65                    .map(|(_, event)| {
66                        match event {
67                            Event::P2p(P2pEvent::Connection(P2pConnectionEvent::Finalized(
68                                peer_id,
69                                result,
70                            ))) => connection_events
71                                .entry(this_id)
72                                .or_default()
73                                .entry(*peer_id)
74                                .or_default()
75                                .push(
76                                    result
77                                        .as_ref()
78                                        .err()
79                                        .cloned()
80                                        .unwrap_or_else(|| "ok".to_owned()),
81                                ),
82                            _ => {}
83                        }
84                        ScenarioStep::Event {
85                            node_id,
86                            event: event.to_string(),
87                        }
88                    })
89                    .collect::<Vec<_>>();
90                steps.extend(node_steps);
91            }
92
93            for step in steps {
94                runner.exec_step(step).await.unwrap();
95            }
96
97            if nodes.len() < TOTAL_PEERS {
98                continue;
99            }
100
101            let mut conditions_met = true;
102            for &node_id in &nodes {
103                runner
104                    .exec_step(ScenarioStep::AdvanceNodeTime {
105                        node_id,
106                        by_nanos: STEP_DELAY.as_nanos() as _,
107                    })
108                    .await
109                    .unwrap();
110
111                runner
112                    .exec_step(ScenarioStep::CheckTimeouts { node_id })
113                    .await
114                    .unwrap();
115
116                let node = runner.node(node_id).expect("node must exist");
117                let p2p = &node.state().p2p;
118                let ready_peers = p2p.ready_peers_iter().count();
119
120                // each node connected to some peers
121                conditions_met &=
122                    p2p.ready().is_some() && ready_peers >= node.state().p2p.unwrap().min_peers();
123
124                // maximum is not exceeded
125                let max_peers = if node_id == seed_node {
126                    TOTAL_PEERS
127                } else {
128                    MAX_PEERS_PER_NODE
129                };
130                assert!(ready_peers <= max_peers);
131            }
132
133            if conditions_met {
134                for (this_id, events) in &connection_events {
135                    for (peer_id, results) in events {
136                        for result in results {
137                            eprintln!("{this_id} <-> {peer_id}: {result}");
138                        }
139                    }
140                }
141
142                let mut total_connections_known = 0;
143                let mut total_connections_ready = 0;
144                for &node_id in &nodes {
145                    let node = runner.node(node_id).expect("node must exist");
146
147                    let p2p = &node.state().p2p;
148                    let ready_peers = p2p.ready_peers_iter().count();
149                    let my_id = p2p.my_id();
150
151                    let known_peers: usize = node
152                        .state()
153                        .p2p
154                        .ready()
155                        .and_then(|p2p| p2p.network.scheduler.discovery_state())
156                        .map_or(0, |discovery_state| {
157                            discovery_state
158                                .routing_table
159                                .closest_peers(&my_id.try_into().unwrap())
160                                .count()
161                        });
162                    let state_machine_peers = if cfg!(feature = "p2p-webrtc") {
163                        ready_peers
164                    } else {
165                        ready_peers.max(known_peers)
166                    };
167                    total_connections_ready += ready_peers;
168                    total_connections_known += state_machine_peers;
169                    eprintln!("node {} has {ready_peers} peers", p2p.my_id(),);
170                }
171
172                // TODO: calculate per peer
173                if let Some(debugger) = runner.debugger() {
174                    tokio::time::sleep(Duration::from_secs(10)).await;
175
176                    let connections = debugger
177                        .connections_raw(0)
178                        .map(|(id, c)| (id, (c.info.addr, c.info.fd, c.info.pid, c.incoming)))
179                        .collect::<HashMap<_, _>>();
180
181                    // dbg
182                    for (id, cn) in &connections {
183                        eprintln!("{id}: {}", serde_json::to_string(cn).unwrap());
184                    }
185                    // dbg
186                    for (id, msg) in debugger.messages(0, "") {
187                        eprintln!("{id}: {}", serde_json::to_string(&msg).unwrap());
188                    }
189                    // TODO: fix debugger returns timeout
190                    let connections = debugger
191                        .connections()
192                        .filter_map(|id| Some((id, connections.get(&id)?.clone())))
193                        .collect::<HashMap<_, _>>();
194                    let incoming = connections.iter().filter(|(_, (_, _, _, i))| *i).count();
195                    let outgoing = connections.len() - incoming;
196                    eprintln!(
197                        "debugger seen {incoming} incoming connections and {outgoing} outgoing connections",
198                    );
199                    let state_machine_peers = if cfg!(feature = "p2p-webrtc") {
200                        total_connections_ready
201                    } else {
202                        total_connections_ready.max(total_connections_known)
203                    };
204                    assert_eq!(
205                        incoming + outgoing,
206                        state_machine_peers,
207                        "debugger must see the same number of connections as the state machine"
208                    );
209                } else {
210                    eprintln!("no debugger, run test with --use-debugger for additional check");
211                }
212
213                eprintln!("success");
214
215                return;
216            }
217        }
218
219        for node_id in &nodes {
220            let node = runner.node(*node_id).expect("node must exist");
221            let p2p: &node::p2p::P2pState = &node.state().p2p.unwrap();
222            let ready_peers = p2p.ready_peers_iter().count();
223            // each node connected to some peers
224            println!("must hold {ready_peers} >= {}", p2p.min_peers());
225        }
226
227        // for node_id in nodes {
228        //     let node = runner.node(node_id).expect("node must exist");
229        //     println!("{node_id:?} - p2p state: {:#?}", &node.state().p2p);
230        // }
231
232        assert!(false);
233    }
234}