mina_node_testing/scenarios/solo_node/
sync_root_snarked_ledger.rs

1use std::{cmp::Ordering, time::Duration};
2
3use mina_p2p_messages::v2::MinaLedgerSyncLedgerQueryStableV1;
4use node::{
5    event_source::Event,
6    ledger::LedgerAddress,
7    p2p::{
8        channels::{
9            rpc::{P2pRpcRequest, RpcChannelMsg},
10            ChannelMsg,
11        },
12        P2pChannelEvent, P2pEvent,
13    },
14    ActionKind, State,
15};
16
17use crate::{
18    cluster::ClusterNodeId,
19    node::RustNodeTestingConfig,
20    scenario::{ListenerNode, ScenarioStep},
21    scenarios::{ClusterRunner, RunCfg, RunDecision},
22};
23
24/// Set up single Rust node and sync up root snarked ledger.
25///
26/// 1. Node will connect to 2 peers (replayers).
27/// 2. At some chosen height, node will receive all hashes before receiving the first one.
28/// 3. At next height, node will receive all hashes before receiving the last hash on that height.
29/// 4. At next height we will do same above 2 steps, except those first and last hash requests will timeout instead of being received at the end.
30/// 5. Continue till we are done syncing root snarked ledger.
31#[derive(documented::Documented, Default, Clone, Copy)]
32pub struct SoloNodeSyncRootSnarkedLedger;
33
34impl SoloNodeSyncRootSnarkedLedger {
35    pub async fn run(self, mut runner: ClusterRunner<'_>) {
36        let node_id = runner.add_rust_node(RustNodeTestingConfig::devnet_default());
37        eprintln!("launch Rust node with default configuration, id: {node_id}");
38
39        const REPLAYER_1: &str =
40            "/dns4/web-node-1/tcp/18302/p2p/12D3KooWD8jSyPFXNdAcMBHyHjRBcK1AW9t3xvnpfCFSRKMweVKi";
41        const REPLAYER_2: &str =
42            "/dns4/web-node-1/tcp/18303/p2p/12D3KooWBxbfeaxGHxdxP3U5jRKpNK5wQmbjKywGJEqTCNpVPxqk";
43
44        // Initiate connection to 2 replayers.
45        runner
46            .exec_step(ScenarioStep::ConnectNodes {
47                dialer: node_id,
48                listener: ListenerNode::Custom(REPLAYER_1.parse().unwrap()),
49            })
50            .await
51            .unwrap();
52        eprintln!("node({node_id}) dialing to replayer: {REPLAYER_1}");
53        runner
54            .exec_step(ScenarioStep::ConnectNodes {
55                dialer: node_id,
56                listener: ListenerNode::Custom(REPLAYER_2.parse().unwrap()),
57            })
58            .await
59            .unwrap();
60        eprintln!("node({node_id}) dialing to replayer: {REPLAYER_2}");
61
62        // Wait for both peers to be connected, hiding p2p ledger query
63        // responses for now, as we want to control their order.
64        runner
65            .run(
66                RunCfg::default()
67                    .timeout(Duration::from_secs(10))
68                    .event_handler(|_, state, event| {
69                        if self.event_ledger_query_addr(state, event).is_some() {
70                            // skip/hide ledger query events.
71                            return RunDecision::Skip;
72                        }
73                        RunDecision::ContinueExec
74                    })
75                    .action_handler(|_, state, _, _| {
76                        let connected_peer_count = state
77                            .p2p
78                            .ready_peers_iter()
79                            .filter(|(_, p)| p.channels.rpc.is_ready())
80                            .count();
81
82                        // exit if both peers ready.
83                        connected_peer_count >= 2
84                    }),
85            )
86            .await
87            .expect("waiting for 2 replayer peers to be connected timed out");
88
89        eprintln!("2 replayers are now connected");
90
91        // Exec ledger query responses until we are deep enough for there
92        // to be more than 1 hash in the same height.
93        eprintln!("exec ledger query responses until we are deep enough for there to be more than 1 hash in the same height");
94        runner
95            .run(
96                RunCfg::default()
97                    .timeout(Duration::from_secs(10))
98                    .action_handler(move |_, state, _, action| {
99                        matches!(action.action().kind(), ActionKind::CheckTimeouts)
100                            && self.fetch_pending_count(state) >= 2
101                    }),
102            )
103            .await
104            .expect("time out");
105
106        eprintln!("receive all hashes before first...");
107        self.receive_all_hashes_before_first(&mut runner, node_id)
108            .await;
109        eprintln!("receive all hashes before last...");
110        self.receive_all_hashes_before_last(&mut runner, node_id)
111            .await;
112        eprintln!("success");
113    }
114
115    fn fetch_pending_count(self, state: &State) -> usize {
116        None.or_else(|| {
117            let snarked_state = state.transition_frontier.sync.ledger()?.snarked()?;
118            Some(snarked_state.fetch_pending().unwrap().len())
119        })
120        .unwrap_or(0)
121    }
122
123    async fn receive_single_hash(self, runner: &mut ClusterRunner<'_>, node_id: ClusterNodeId) {
124        runner
125            .run(
126                RunCfg::default()
127                    .timeout(Duration::from_secs(5))
128                    .event_handler(|cur_node_id, state, event| {
129                        if cur_node_id == node_id
130                            && self.event_ledger_query_addr(state, event).is_some()
131                        {
132                            return RunDecision::StopExec;
133                        }
134                        RunDecision::Skip
135                    }),
136            )
137            .await
138            .expect("timeout");
139    }
140
141    async fn receive_all_hashes_before_first(
142        self,
143        runner: &mut ClusterRunner<'_>,
144        node_id: ClusterNodeId,
145    ) {
146        self.receive_all_hashes_except_first(runner, node_id).await;
147        self.receive_single_hash(runner, node_id).await;
148    }
149
150    async fn receive_all_hashes_except_first(
151        self,
152        runner: &mut ClusterRunner<'_>,
153        _node_id: ClusterNodeId,
154    ) {
155        runner
156            .run(
157                RunCfg::default()
158                    .timeout(Duration::from_secs(10))
159                    .event_handler(|_, state, event| {
160                        if self.is_event_first_ledger_query(state, event) {
161                            return RunDecision::Skip;
162                        }
163                        RunDecision::ContinueExec
164                    })
165                    .action_handler(move |_, state, _, action| {
166                        matches!(action.action().kind(), ActionKind::CheckTimeouts)
167                            && self.fetch_pending_count(state) == 1
168                    }),
169            )
170            .await
171            .expect("timeout");
172    }
173
174    async fn receive_all_hashes_before_last(
175        self,
176        runner: &mut ClusterRunner<'_>,
177        node_id: ClusterNodeId,
178    ) {
179        self.receive_all_hashes_except_last(runner, node_id).await;
180        self.receive_single_hash(runner, node_id).await;
181    }
182
183    async fn receive_all_hashes_except_last(
184        self,
185        runner: &mut ClusterRunner<'_>,
186        node_id: ClusterNodeId,
187    ) {
188        let mut biggest_addr = None;
189        while self.fetch_pending_count(runner.node(node_id).unwrap().state()) > 1 {
190            runner
191                .run(
192                    RunCfg::default()
193                        .timeout(Duration::from_secs(10))
194                        .event_handler(|_, state, event| {
195                            let Some(addr) = self.event_ledger_query_addr(state, event) else {
196                                return RunDecision::Skip;
197                            };
198                            match biggest_addr.as_mut() {
199                                None => {
200                                    biggest_addr = Some(addr);
201                                    RunDecision::Skip
202                                }
203                                Some(biggest_addr) => match addr.cmp(biggest_addr) {
204                                    Ordering::Less => RunDecision::ContinueExec,
205                                    Ordering::Equal => RunDecision::Skip,
206                                    Ordering::Greater => {
207                                        *biggest_addr = addr;
208                                        RunDecision::Stop
209                                    }
210                                },
211                            }
212                        })
213                        .action_handler(move |_, state, _, action| {
214                            matches!(action.action().kind(), ActionKind::CheckTimeouts)
215                                && self.fetch_pending_count(state) == 1
216                        }),
217                )
218                .await
219                .expect("timeout");
220        }
221    }
222
223    fn event_ledger_query_addr(self, state: &State, event: &Event) -> Option<LedgerAddress> {
224        let Event::P2p(P2pEvent::Channel(P2pChannelEvent::Received(
225            peer_id,
226            Ok(ChannelMsg::Rpc(RpcChannelMsg::Response(_, _))),
227        ))) = event
228        else {
229            return None;
230        };
231        let rpc = state
232            .p2p
233            .get_ready_peer(peer_id)
234            .unwrap()
235            .channels
236            .rpc
237            .pending_local_rpc()
238            .unwrap();
239        let P2pRpcRequest::LedgerQuery(_, MinaLedgerSyncLedgerQueryStableV1::WhatChildHashes(addr)) =
240            rpc
241        else {
242            return None;
243        };
244        Some(addr.into())
245    }
246
247    fn is_event_first_ledger_query(self, state: &State, event: &Event) -> bool {
248        self.event_ledger_query_addr(state, event)
249            .is_some_and(|addr| addr == LedgerAddress::first(addr.length()))
250    }
251}