mina_node_testing/scenarios/multi_node/
pubsub_advanced.rs1use std::{
2 str,
3 sync::{Arc, Mutex},
4 time::Duration,
5};
6
7use mina_p2p_messages::{binprot::BinProtRead, gossip, v2};
8use node::{
9 p2p::{P2pNetworkAction, P2pNetworkPubsubAction, PeerId},
10 transition_frontier::genesis::{GenesisConfig, NonStakers},
11 Action, ActionWithMeta, P2pAction,
12};
13
14use crate::{
15 node::Recorder,
16 scenarios::{ClusterRunner, RunCfgAdvanceTime},
17 service::NodeTestingService,
18 simulator::{Simulator, SimulatorConfig, SimulatorRunUntil},
19};
20
21#[derive(documented::Documented, Default, Clone, Copy)]
29pub struct MultiNodePubsubPropagateBlock;
30
31impl MultiNodePubsubPropagateBlock {
32 const WORKERS: usize = 10;
33
34 pub async fn run(self, mut runner: ClusterRunner<'_>) {
35 let graph = Arc::new(Mutex::new("digraph {\n".to_owned()));
36 let factory = || {
37 let graph = graph.clone();
38 move |_id,
39 state: &node::State,
40 _service: &NodeTestingService,
41 action: &ActionWithMeta| {
42 let this = state.p2p.my_id();
43
44 let cut = |peer_id: &PeerId| {
45 let st = peer_id.to_string();
46 let len = st.len();
47 st[(len - 6)..len].to_owned()
48 };
49 let this = cut(&this);
50
51 match action.action() {
52 Action::P2p(P2pAction::Network(P2pNetworkAction::Pubsub(
53 P2pNetworkPubsubAction::OutgoingMessage { peer_id },
54 ))) => {
55 let pubsub_state =
56 &state.p2p.ready().unwrap().network.scheduler.broadcast_state;
57 let msg = &pubsub_state.clients.get(peer_id).unwrap().message;
58
59 for publish_message in &msg.publish {
60 let mut slice = &publish_message.data()[8..];
61 if let Ok(gossip::GossipNetMessageV2::NewState(block)) =
62 gossip::GossipNetMessageV2::binprot_read(&mut slice)
63 {
64 let height = block
65 .header
66 .protocol_state
67 .body
68 .consensus_state
69 .global_slot();
70 let mut lock = graph.lock().unwrap();
71 *lock = format!(
72 "{lock} \"{this}\" -> \"{}\" [label=\"{height}\"];\n",
73 cut(peer_id)
74 );
75 }
76 }
77 false
78 }
79 _ => false,
80 }
81 }
82 };
83
84 let initial_time = redux::Timestamp::global_now();
85 let mut constants = v2::PROTOCOL_CONSTANTS.clone();
86 constants.genesis_state_timestamp =
87 v2::BlockTimeTimeStableV1((u64::from(initial_time) / 1_000_000).into());
88 let genesis_cfg = GenesisConfig::Counts {
89 whales: 1,
90 fish: 0,
91 non_stakers: NonStakers::None,
92 constants,
93 };
94 let config = SimulatorConfig {
95 genesis: genesis_cfg.into(),
96 seed_nodes: 1,
97 normal_nodes: Self::WORKERS,
98 snark_workers: 1,
99 block_producers: 1,
100 advance_time: RunCfgAdvanceTime::Rand(1..=200),
101 run_until: SimulatorRunUntil::BlockchainLength(4),
102 run_until_timeout: Duration::from_secs(10 * 60),
103 recorder: Recorder::StateWithInputActions,
104 };
105 let mut simulator = Simulator::new(initial_time, config);
106 simulator
107 .setup_and_run_with_listener(&mut runner, factory)
108 .await;
109
110 println!("{}}}\n", graph.lock().unwrap());
111 }
112}