mina_node_testing/scenarios/multi_node/
pubsub_advanced.rs

1use std::{
2    str,
3    sync::{Arc, Mutex},
4    time::Duration,
5};
6
7use mina_p2p_messages::{binprot::BinProtRead, gossip, v2};
8use node::{
9    p2p::{P2pNetworkAction, P2pNetworkPubsubAction, PeerId},
10    transition_frontier::genesis::{GenesisConfig, NonStakers},
11    Action, ActionWithMeta, P2pAction,
12};
13
14use crate::{
15    node::Recorder,
16    scenarios::{ClusterRunner, RunCfgAdvanceTime},
17    service::NodeTestingService,
18    simulator::{Simulator, SimulatorConfig, SimulatorRunUntil},
19};
20
21/// Create and Sync up 50 nodes, one amoung them is block producer.
22///
23/// 1. Create the nodes.
24/// 2. Connect them to each other.
25/// 3. Wait kademlia bootstrap is done, observe the connection graph.
26/// 4. Wait pubsub mesh construction is done, observe the mesh.
27/// 5. Wait block is produced and observe the propagation.
28#[derive(documented::Documented, Default, Clone, Copy)]
29pub struct MultiNodePubsubPropagateBlock;
30
31impl MultiNodePubsubPropagateBlock {
32    const WORKERS: usize = 10;
33
34    pub async fn run(self, mut runner: ClusterRunner<'_>) {
35        let graph = Arc::new(Mutex::new("digraph {\n".to_owned()));
36        let factory = || {
37            let graph = graph.clone();
38            move |_id,
39                  state: &node::State,
40                  _service: &NodeTestingService,
41                  action: &ActionWithMeta| {
42                let this = state.p2p.my_id();
43
44                let cut = |peer_id: &PeerId| {
45                    let st = peer_id.to_string();
46                    let len = st.len();
47                    st[(len - 6)..len].to_owned()
48                };
49                let this = cut(&this);
50
51                match action.action() {
52                    Action::P2p(P2pAction::Network(P2pNetworkAction::Pubsub(
53                        P2pNetworkPubsubAction::OutgoingMessage { peer_id },
54                    ))) => {
55                        let pubsub_state =
56                            &state.p2p.ready().unwrap().network.scheduler.broadcast_state;
57                        let msg = &pubsub_state.clients.get(peer_id).unwrap().message;
58
59                        for publish_message in &msg.publish {
60                            let mut slice = &publish_message.data()[8..];
61                            if let Ok(gossip::GossipNetMessageV2::NewState(block)) =
62                                gossip::GossipNetMessageV2::binprot_read(&mut slice)
63                            {
64                                let height = block
65                                    .header
66                                    .protocol_state
67                                    .body
68                                    .consensus_state
69                                    .global_slot();
70                                let mut lock = graph.lock().unwrap();
71                                *lock = format!(
72                                    "{lock}  \"{this}\" -> \"{}\" [label=\"{height}\"];\n",
73                                    cut(peer_id)
74                                );
75                            }
76                        }
77                        false
78                    }
79                    _ => false,
80                }
81            }
82        };
83
84        let initial_time = redux::Timestamp::global_now();
85        let mut constants = v2::PROTOCOL_CONSTANTS.clone();
86        constants.genesis_state_timestamp =
87            v2::BlockTimeTimeStableV1((u64::from(initial_time) / 1_000_000).into());
88        let genesis_cfg = GenesisConfig::Counts {
89            whales: 1,
90            fish: 0,
91            non_stakers: NonStakers::None,
92            constants,
93        };
94        let config = SimulatorConfig {
95            genesis: genesis_cfg.into(),
96            seed_nodes: 1,
97            normal_nodes: Self::WORKERS,
98            snark_workers: 1,
99            block_producers: 1,
100            advance_time: RunCfgAdvanceTime::Rand(1..=200),
101            run_until: SimulatorRunUntil::BlockchainLength(4),
102            run_until_timeout: Duration::from_secs(10 * 60),
103            recorder: Recorder::StateWithInputActions,
104        };
105        let mut simulator = Simulator::new(initial_time, config);
106        simulator
107            .setup_and_run_with_listener(&mut runner, factory)
108            .await;
109
110        println!("{}}}\n", graph.lock().unwrap());
111    }
112}