mina_node_testing/scenarios/solo_node/
sync_root_snarked_ledger.rs1use std::{cmp::Ordering, time::Duration};
2
3use mina_p2p_messages::v2::MinaLedgerSyncLedgerQueryStableV1;
4use node::{
5 event_source::Event,
6 ledger::LedgerAddress,
7 p2p::{
8 channels::{
9 rpc::{P2pRpcRequest, RpcChannelMsg},
10 ChannelMsg,
11 },
12 P2pChannelEvent, P2pEvent,
13 },
14 ActionKind, State,
15};
16
17use crate::{
18 cluster::ClusterNodeId,
19 node::RustNodeTestingConfig,
20 scenario::{ListenerNode, ScenarioStep},
21 scenarios::{ClusterRunner, RunCfg, RunDecision},
22};
23
24#[derive(documented::Documented, Default, Clone, Copy)]
32pub struct SoloNodeSyncRootSnarkedLedger;
33
34impl SoloNodeSyncRootSnarkedLedger {
35 pub async fn run(self, mut runner: ClusterRunner<'_>) {
36 let node_id = runner.add_rust_node(RustNodeTestingConfig::devnet_default());
37 eprintln!("launch Rust node with default configuration, id: {node_id}");
38
39 const REPLAYER_1: &str =
40 "/dns4/web-node-1/tcp/18302/p2p/12D3KooWD8jSyPFXNdAcMBHyHjRBcK1AW9t3xvnpfCFSRKMweVKi";
41 const REPLAYER_2: &str =
42 "/dns4/web-node-1/tcp/18303/p2p/12D3KooWBxbfeaxGHxdxP3U5jRKpNK5wQmbjKywGJEqTCNpVPxqk";
43
44 runner
46 .exec_step(ScenarioStep::ConnectNodes {
47 dialer: node_id,
48 listener: ListenerNode::Custom(REPLAYER_1.parse().unwrap()),
49 })
50 .await
51 .unwrap();
52 eprintln!("node({node_id}) dialing to replayer: {REPLAYER_1}");
53 runner
54 .exec_step(ScenarioStep::ConnectNodes {
55 dialer: node_id,
56 listener: ListenerNode::Custom(REPLAYER_2.parse().unwrap()),
57 })
58 .await
59 .unwrap();
60 eprintln!("node({node_id}) dialing to replayer: {REPLAYER_2}");
61
62 runner
65 .run(
66 RunCfg::default()
67 .timeout(Duration::from_secs(10))
68 .event_handler(|_, state, event| {
69 if self.event_ledger_query_addr(state, event).is_some() {
70 return RunDecision::Skip;
72 }
73 RunDecision::ContinueExec
74 })
75 .action_handler(|_, state, _, _| {
76 let connected_peer_count = state
77 .p2p
78 .ready_peers_iter()
79 .filter(|(_, p)| p.channels.rpc.is_ready())
80 .count();
81
82 connected_peer_count >= 2
84 }),
85 )
86 .await
87 .expect("waiting for 2 replayer peers to be connected timed out");
88
89 eprintln!("2 replayers are now connected");
90
91 eprintln!("exec ledger query responses until we are deep enough for there to be more than 1 hash in the same height");
94 runner
95 .run(
96 RunCfg::default()
97 .timeout(Duration::from_secs(10))
98 .action_handler(move |_, state, _, action| {
99 matches!(action.action().kind(), ActionKind::CheckTimeouts)
100 && self.fetch_pending_count(state) >= 2
101 }),
102 )
103 .await
104 .expect("time out");
105
106 eprintln!("receive all hashes before first...");
107 self.receive_all_hashes_before_first(&mut runner, node_id)
108 .await;
109 eprintln!("receive all hashes before last...");
110 self.receive_all_hashes_before_last(&mut runner, node_id)
111 .await;
112 eprintln!("success");
113 }
114
115 fn fetch_pending_count(self, state: &State) -> usize {
116 None.or_else(|| {
117 let snarked_state = state.transition_frontier.sync.ledger()?.snarked()?;
118 Some(snarked_state.fetch_pending().unwrap().len())
119 })
120 .unwrap_or(0)
121 }
122
123 async fn receive_single_hash(self, runner: &mut ClusterRunner<'_>, node_id: ClusterNodeId) {
124 runner
125 .run(
126 RunCfg::default()
127 .timeout(Duration::from_secs(5))
128 .event_handler(|cur_node_id, state, event| {
129 if cur_node_id == node_id
130 && self.event_ledger_query_addr(state, event).is_some()
131 {
132 return RunDecision::StopExec;
133 }
134 RunDecision::Skip
135 }),
136 )
137 .await
138 .expect("timeout");
139 }
140
141 async fn receive_all_hashes_before_first(
142 self,
143 runner: &mut ClusterRunner<'_>,
144 node_id: ClusterNodeId,
145 ) {
146 self.receive_all_hashes_except_first(runner, node_id).await;
147 self.receive_single_hash(runner, node_id).await;
148 }
149
150 async fn receive_all_hashes_except_first(
151 self,
152 runner: &mut ClusterRunner<'_>,
153 _node_id: ClusterNodeId,
154 ) {
155 runner
156 .run(
157 RunCfg::default()
158 .timeout(Duration::from_secs(10))
159 .event_handler(|_, state, event| {
160 if self.is_event_first_ledger_query(state, event) {
161 return RunDecision::Skip;
162 }
163 RunDecision::ContinueExec
164 })
165 .action_handler(move |_, state, _, action| {
166 matches!(action.action().kind(), ActionKind::CheckTimeouts)
167 && self.fetch_pending_count(state) == 1
168 }),
169 )
170 .await
171 .expect("timeout");
172 }
173
174 async fn receive_all_hashes_before_last(
175 self,
176 runner: &mut ClusterRunner<'_>,
177 node_id: ClusterNodeId,
178 ) {
179 self.receive_all_hashes_except_last(runner, node_id).await;
180 self.receive_single_hash(runner, node_id).await;
181 }
182
183 async fn receive_all_hashes_except_last(
184 self,
185 runner: &mut ClusterRunner<'_>,
186 node_id: ClusterNodeId,
187 ) {
188 let mut biggest_addr = None;
189 while self.fetch_pending_count(runner.node(node_id).unwrap().state()) > 1 {
190 runner
191 .run(
192 RunCfg::default()
193 .timeout(Duration::from_secs(10))
194 .event_handler(|_, state, event| {
195 let Some(addr) = self.event_ledger_query_addr(state, event) else {
196 return RunDecision::Skip;
197 };
198 match biggest_addr.as_mut() {
199 None => {
200 biggest_addr = Some(addr);
201 RunDecision::Skip
202 }
203 Some(biggest_addr) => match addr.cmp(biggest_addr) {
204 Ordering::Less => RunDecision::ContinueExec,
205 Ordering::Equal => RunDecision::Skip,
206 Ordering::Greater => {
207 *biggest_addr = addr;
208 RunDecision::Stop
209 }
210 },
211 }
212 })
213 .action_handler(move |_, state, _, action| {
214 matches!(action.action().kind(), ActionKind::CheckTimeouts)
215 && self.fetch_pending_count(state) == 1
216 }),
217 )
218 .await
219 .expect("timeout");
220 }
221 }
222
223 fn event_ledger_query_addr(self, state: &State, event: &Event) -> Option<LedgerAddress> {
224 let Event::P2p(P2pEvent::Channel(P2pChannelEvent::Received(
225 peer_id,
226 Ok(ChannelMsg::Rpc(RpcChannelMsg::Response(_, _))),
227 ))) = event
228 else {
229 return None;
230 };
231 let rpc = state
232 .p2p
233 .get_ready_peer(peer_id)
234 .unwrap()
235 .channels
236 .rpc
237 .pending_local_rpc()
238 .unwrap();
239 let P2pRpcRequest::LedgerQuery(_, MinaLedgerSyncLedgerQueryStableV1::WhatChildHashes(addr)) =
240 rpc
241 else {
242 return None;
243 };
244 Some(addr.into())
245 }
246
247 fn is_event_first_ledger_query(self, state: &State, event: &Event) -> bool {
248 self.event_ledger_query_addr(state, event)
249 .is_some_and(|addr| addr == LedgerAddress::first(addr.length()))
250 }
251}