openmina_bootstrap_sandbox/
replay.rs1use std::{
2 collections::{BTreeMap, BTreeSet},
3 fs::{self, File},
4 path::Path,
5};
6
7use binprot::BinProtRead;
8use libp2p::{futures::StreamExt, swarm::SwarmEvent};
9use libp2p_rpc_behaviour::{Event as RpcEvent, Received};
10use mina_p2p_messages::{
11 rpc::{
12 AnswerSyncLedgerQueryV2, GetAncestryV2, GetBestTipV2,
13 GetStagedLedgerAuxAndPendingCoinbasesAtHashV2, GetTransitionChainProofV1ForV2,
14 GetTransitionChainV2,
15 },
16 rpc_kernel::{QueryHeader, QueryPayload, RpcMethod, RpcResult},
17 v2,
18};
19
20use super::{
21 behaviour::{Behaviour, Event},
22 snarked_ledger::SnarkedLedger,
23};
24
25pub async fn run(mut swarm: libp2p::Swarm<Behaviour>, path_main: &Path, height: u32) {
26 let path_blocks = path_main.join("blocks");
27 let path = path_main.join(height.to_string());
28
29 let mut file = File::open(path.join("best_tip")).unwrap();
30 let best_tip = <GetBestTipV2 as RpcMethod>::Response::binprot_read(&mut file).unwrap();
31
32 let mut file = File::open(path.join("ancestry")).unwrap();
33 let ancestry = <GetAncestryV2 as RpcMethod>::Response::binprot_read(&mut file).unwrap();
34
35 let mut file = File::open(path.join("staged_ledger_aux")).unwrap();
36 type T = GetStagedLedgerAuxAndPendingCoinbasesAtHashV2;
37 let staged_ledger_aux = <T as RpcMethod>::Response::binprot_read(&mut file).unwrap();
38
39 let mut ledgers = BTreeMap::new();
40 for entry in fs::read_dir(path.join("ledgers")).unwrap() {
41 let entry = entry.unwrap();
42 let file = File::open(entry.path()).unwrap();
43 let ledger = SnarkedLedger::load_bin(file).unwrap();
44 ledgers.insert(entry.file_name().to_str().unwrap().to_string(), ledger);
45 }
46
47 let file = File::open(path_main.join("blocks").join("table.json")).unwrap();
48 let table = serde_json::from_reader::<_, BTreeMap<String, u32>>(file).unwrap();
49
50 let mut peers = BTreeSet::default();
51
52 while let Some(event) = swarm.next().await {
53 match event {
54 SwarmEvent::NewListenAddr { address, .. } => {
55 log::info!("listen on {address}");
56 }
57 SwarmEvent::ConnectionEstablished {
58 peer_id, endpoint, ..
59 } => {
60 log::info!(
61 "new connection {peer_id}, {}",
62 endpoint.get_remote_address()
63 );
64 }
65 SwarmEvent::Behaviour(Event::Rpc((peer_id, RpcEvent::ConnectionEstablished))) => {
66 peers.insert(peer_id);
67 log::info!("new connection {peer_id}");
68 }
69 SwarmEvent::Behaviour(Event::Rpc((peer_id, RpcEvent::ConnectionClosed))) => {
70 log::info!("connection closed {peer_id}");
71 peers.remove(&peer_id);
72 }
73 SwarmEvent::Behaviour(Event::Rpc((
74 peer_id,
75 RpcEvent::Stream {
76 stream_id,
77 received,
78 },
79 ))) => match received {
80 Received::HandshakeDone => {
81 log::info!("new stream {peer_id} {stream_id:?}");
82 }
83 Received::Menu(menu) => {
84 log::info!("menu: {menu:?}");
85 }
86 Received::Query {
87 header: QueryHeader { tag, version, id },
88 bytes,
89 } => {
90 let mut bytes = bytes.as_slice();
91 log::info!("handling {}, {}", tag.to_string_lossy(), version);
92 let tag = tag.as_ref();
93 match (tag, version) {
94 (GetBestTipV2::NAME, GetBestTipV2::VERSION) => {
95 swarm
96 .behaviour_mut()
97 .rpc
98 .respond::<GetBestTipV2>(
99 peer_id,
100 stream_id,
101 id,
102 Ok(best_tip.clone()),
103 )
104 .unwrap();
105 }
106 (GetAncestryV2::NAME, GetAncestryV2::VERSION) => {
107 swarm
108 .behaviour_mut()
109 .rpc
110 .respond::<GetAncestryV2>(
111 peer_id,
112 stream_id,
113 id,
114 Ok(ancestry.clone()),
115 )
116 .unwrap();
117 }
118 (AnswerSyncLedgerQueryV2::NAME, AnswerSyncLedgerQueryV2::VERSION) => {
119 type T = AnswerSyncLedgerQueryV2;
120 let (hash, query) =
121 QueryPayload::<<T as RpcMethod>::Query>::binprot_read(&mut bytes)
122 .unwrap()
123 .0;
124
125 let hash = v2::LedgerHash::from(v2::MinaBaseLedgerHash0StableV1(hash));
126 let hash_str = match serde_json::to_value(&hash).unwrap() {
127 serde_json::Value::String(s) => s,
128 _ => panic!(),
129 };
130
131 let ledger = ledgers.get_mut(&hash_str).unwrap();
132 let response = ledger.serve_query(query);
133
134 swarm
135 .behaviour_mut()
136 .rpc
137 .respond::<T>(peer_id, stream_id, id, Ok(RpcResult(Ok(response))))
138 .unwrap();
139 }
140 (
141 GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::NAME,
142 GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::VERSION,
143 ) => swarm
144 .behaviour_mut()
145 .rpc
146 .respond::<GetStagedLedgerAuxAndPendingCoinbasesAtHashV2>(
147 peer_id,
148 stream_id,
149 id,
150 Ok(staged_ledger_aux.clone()),
151 )
152 .unwrap(),
153 (GetTransitionChainV2::NAME, GetTransitionChainV2::VERSION) => {
154 type T = GetTransitionChainV2;
155 let hashes =
156 QueryPayload::<<T as RpcMethod>::Query>::binprot_read(&mut bytes)
157 .unwrap()
158 .0;
159
160 let response = hashes
162 .into_iter()
163 .map(|hash| {
164 let hash =
165 v2::StateHash::from(v2::DataHashLibStateHashStableV1(hash));
166 let height = table.get(&hash.to_string()).unwrap();
178 let path =
179 path_blocks.join(height.to_string()).join(hash.to_string());
180 let mut file = File::open(path).unwrap();
181 binprot::BinProtRead::binprot_read(&mut file).unwrap()
182 })
183 .collect();
184 swarm
185 .behaviour_mut()
186 .rpc
187 .respond::<T>(peer_id, stream_id, id, Ok(Some(response)))
188 .unwrap();
189 }
193 (
194 GetTransitionChainProofV1ForV2::NAME,
195 GetTransitionChainProofV1ForV2::VERSION,
196 ) => {
197 type T = GetTransitionChainProofV1ForV2;
198 let hash =
199 QueryPayload::<<T as RpcMethod>::Query>::binprot_read(&mut bytes)
200 .unwrap()
201 .0;
202
203 let hash = v2::StateHash::from(v2::DataHashLibStateHashStableV1(hash));
204 let response = if let Some(height) = table.get(&hash.to_string()) {
205 let path = path_blocks
206 .join(height.to_string())
207 .join(format!("proof_{hash}"));
208 let mut file = File::open(path).unwrap();
209 binprot::BinProtRead::binprot_read(&mut file).unwrap()
210 } else {
211 log::warn!("no proof for block {hash}");
212 None
213 };
214
215 swarm
216 .behaviour_mut()
217 .rpc
218 .respond::<T>(peer_id, stream_id, id, Ok(response))
219 .unwrap();
220 }
221 (name, version) => {
222 log::warn!(
223 "TODO: unhandled {}, {version}",
224 String::from_utf8_lossy(name)
225 );
226 }
227 }
228 }
229 _ => {}
230 },
231 _ => {}
232 }
233 }
234}