openmina_bootstrap_sandbox/
replay.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    fs::{self, File},
4    path::Path,
5};
6
7use binprot::BinProtRead;
8use libp2p::{futures::StreamExt, swarm::SwarmEvent};
9use libp2p_rpc_behaviour::{Event as RpcEvent, Received};
10use mina_p2p_messages::{
11    rpc::{
12        AnswerSyncLedgerQueryV2, GetAncestryV2, GetBestTipV2,
13        GetStagedLedgerAuxAndPendingCoinbasesAtHashV2, GetTransitionChainProofV1ForV2,
14        GetTransitionChainV2,
15    },
16    rpc_kernel::{QueryHeader, QueryPayload, RpcMethod, RpcResult},
17    v2,
18};
19
20use super::{
21    behaviour::{Behaviour, Event},
22    snarked_ledger::SnarkedLedger,
23};
24
25pub async fn run(mut swarm: libp2p::Swarm<Behaviour>, path_main: &Path, height: u32) {
26    let path_blocks = path_main.join("blocks");
27    let path = path_main.join(height.to_string());
28
29    let mut file = File::open(path.join("best_tip")).unwrap();
30    let best_tip = <GetBestTipV2 as RpcMethod>::Response::binprot_read(&mut file).unwrap();
31
32    let mut file = File::open(path.join("ancestry")).unwrap();
33    let ancestry = <GetAncestryV2 as RpcMethod>::Response::binprot_read(&mut file).unwrap();
34
35    let mut file = File::open(path.join("staged_ledger_aux")).unwrap();
36    type T = GetStagedLedgerAuxAndPendingCoinbasesAtHashV2;
37    let staged_ledger_aux = <T as RpcMethod>::Response::binprot_read(&mut file).unwrap();
38
39    let mut ledgers = BTreeMap::new();
40    for entry in fs::read_dir(path.join("ledgers")).unwrap() {
41        let entry = entry.unwrap();
42        let file = File::open(entry.path()).unwrap();
43        let ledger = SnarkedLedger::load_bin(file).unwrap();
44        ledgers.insert(entry.file_name().to_str().unwrap().to_string(), ledger);
45    }
46
47    let file = File::open(path_main.join("blocks").join("table.json")).unwrap();
48    let table = serde_json::from_reader::<_, BTreeMap<String, u32>>(file).unwrap();
49
50    let mut peers = BTreeSet::default();
51
52    while let Some(event) = swarm.next().await {
53        match event {
54            SwarmEvent::NewListenAddr { address, .. } => {
55                log::info!("listen on {address}");
56            }
57            SwarmEvent::ConnectionEstablished {
58                peer_id, endpoint, ..
59            } => {
60                log::info!(
61                    "new connection {peer_id}, {}",
62                    endpoint.get_remote_address()
63                );
64            }
65            SwarmEvent::Behaviour(Event::Rpc((peer_id, RpcEvent::ConnectionEstablished))) => {
66                peers.insert(peer_id);
67                log::info!("new connection {peer_id}");
68            }
69            SwarmEvent::Behaviour(Event::Rpc((peer_id, RpcEvent::ConnectionClosed))) => {
70                log::info!("connection closed {peer_id}");
71                peers.remove(&peer_id);
72            }
73            SwarmEvent::Behaviour(Event::Rpc((
74                peer_id,
75                RpcEvent::Stream {
76                    stream_id,
77                    received,
78                },
79            ))) => match received {
80                Received::HandshakeDone => {
81                    log::info!("new stream {peer_id} {stream_id:?}");
82                }
83                Received::Menu(menu) => {
84                    log::info!("menu: {menu:?}");
85                }
86                Received::Query {
87                    header: QueryHeader { tag, version, id },
88                    bytes,
89                } => {
90                    let mut bytes = bytes.as_slice();
91                    log::info!("handling {}, {}", tag.to_string_lossy(), version);
92                    let tag = tag.as_ref();
93                    match (tag, version) {
94                        (GetBestTipV2::NAME, GetBestTipV2::VERSION) => {
95                            swarm
96                                .behaviour_mut()
97                                .rpc
98                                .respond::<GetBestTipV2>(
99                                    peer_id,
100                                    stream_id,
101                                    id,
102                                    Ok(best_tip.clone()),
103                                )
104                                .unwrap();
105                        }
106                        (GetAncestryV2::NAME, GetAncestryV2::VERSION) => {
107                            swarm
108                                .behaviour_mut()
109                                .rpc
110                                .respond::<GetAncestryV2>(
111                                    peer_id,
112                                    stream_id,
113                                    id,
114                                    Ok(ancestry.clone()),
115                                )
116                                .unwrap();
117                        }
118                        (AnswerSyncLedgerQueryV2::NAME, AnswerSyncLedgerQueryV2::VERSION) => {
119                            type T = AnswerSyncLedgerQueryV2;
120                            let (hash, query) =
121                                QueryPayload::<<T as RpcMethod>::Query>::binprot_read(&mut bytes)
122                                    .unwrap()
123                                    .0;
124
125                            let hash = v2::LedgerHash::from(v2::MinaBaseLedgerHash0StableV1(hash));
126                            let hash_str = match serde_json::to_value(&hash).unwrap() {
127                                serde_json::Value::String(s) => s,
128                                _ => panic!(),
129                            };
130
131                            let ledger = ledgers.get_mut(&hash_str).unwrap();
132                            let response = ledger.serve_query(query);
133
134                            swarm
135                                .behaviour_mut()
136                                .rpc
137                                .respond::<T>(peer_id, stream_id, id, Ok(RpcResult(Ok(response))))
138                                .unwrap();
139                        }
140                        (
141                            GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::NAME,
142                            GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::VERSION,
143                        ) => swarm
144                            .behaviour_mut()
145                            .rpc
146                            .respond::<GetStagedLedgerAuxAndPendingCoinbasesAtHashV2>(
147                                peer_id,
148                                stream_id,
149                                id,
150                                Ok(staged_ledger_aux.clone()),
151                            )
152                            .unwrap(),
153                        (GetTransitionChainV2::NAME, GetTransitionChainV2::VERSION) => {
154                            type T = GetTransitionChainV2;
155                            let hashes =
156                                QueryPayload::<<T as RpcMethod>::Query>::binprot_read(&mut bytes)
157                                    .unwrap()
158                                    .0;
159
160                            // let mut contains_last = false;
161                            let response = hashes
162                                .into_iter()
163                                .map(|hash| {
164                                    let hash =
165                                        v2::StateHash::from(v2::DataHashLibStateHashStableV1(hash));
166                                    // if hash
167                                    //     == best_tip
168                                    //         .as_ref()
169                                    //         .unwrap()
170                                    //         .data
171                                    //         .header
172                                    //         .protocol_state
173                                    //         .previous_state_hash
174                                    // {
175                                    //     contains_last = true;
176                                    // }
177                                    let height = table.get(&hash.to_string()).unwrap();
178                                    let path =
179                                        path_blocks.join(height.to_string()).join(hash.to_string());
180                                    let mut file = File::open(path).unwrap();
181                                    binprot::BinProtRead::binprot_read(&mut file).unwrap()
182                                })
183                                .collect();
184                            swarm
185                                .behaviour_mut()
186                                .rpc
187                                .respond::<T>(peer_id, stream_id, id, Ok(Some(response)))
188                                .unwrap();
189                            // if contains_last {
190                            //     swarm.disconnect_peer_id(peer_id).unwrap();
191                            // }
192                        }
193                        (
194                            GetTransitionChainProofV1ForV2::NAME,
195                            GetTransitionChainProofV1ForV2::VERSION,
196                        ) => {
197                            type T = GetTransitionChainProofV1ForV2;
198                            let hash =
199                                QueryPayload::<<T as RpcMethod>::Query>::binprot_read(&mut bytes)
200                                    .unwrap()
201                                    .0;
202
203                            let hash = v2::StateHash::from(v2::DataHashLibStateHashStableV1(hash));
204                            let response = if let Some(height) = table.get(&hash.to_string()) {
205                                let path = path_blocks
206                                    .join(height.to_string())
207                                    .join(format!("proof_{hash}"));
208                                let mut file = File::open(path).unwrap();
209                                binprot::BinProtRead::binprot_read(&mut file).unwrap()
210                            } else {
211                                log::warn!("no proof for block {hash}");
212                                None
213                            };
214
215                            swarm
216                                .behaviour_mut()
217                                .rpc
218                                .respond::<T>(peer_id, stream_id, id, Ok(response))
219                                .unwrap();
220                        }
221                        (name, version) => {
222                            log::warn!(
223                                "TODO: unhandled {}, {version}",
224                                String::from_utf8_lossy(name)
225                            );
226                        }
227                    }
228                }
229                _ => {}
230            },
231            _ => {}
232        }
233    }
234}