mina_producer_dashboard/storage/
db_sled.rs1use sled::{Db, Tree};
2use std::path::PathBuf;
3
4use crate::{
5 archive::Block,
6 evaluator::epoch::{SlotBlockUpdate, SlotData, SlotStatus},
7 node::epoch_ledgers::Ledger,
8};
9
10#[derive(Clone)]
11pub struct Database {
12 _db: Db,
13 epoch_data: Tree,
14 seeds: Tree,
15 epoch_ledgers: Tree,
16 blocks: Tree,
17 produced_blocks_by_slot: Tree,
18 _summaries: Tree,
19}
20
21impl Database {
22 pub fn open(path: PathBuf) -> Result<Self, sled::Error> {
23 let db = sled::open(path)?;
24 let seeds = db.open_tree("seeds")?;
25 let current_epoch = db.open_tree("current_epoch")?;
26 let epoch_ledgers = db.open_tree("epoch_ledgers")?;
27 let blocks = db.open_tree("produced_blocks")?;
28 let produced_blocks_by_slot = db.open_tree("produced_blocks_by_slot")?;
29 let summaries = db.open_tree("sumaries")?;
30
31 Ok(Self {
32 _db: db,
33 epoch_data: current_epoch,
34 seeds,
35 epoch_ledgers,
36 blocks,
37 produced_blocks_by_slot,
38 _summaries: summaries,
39 })
40 }
41
42 fn store<T: serde::Serialize, K: AsRef<[u8]>>(
43 &self,
44 tree: &Tree,
45 key: K,
46 item: &T,
47 ) -> Result<(), sled::Error> {
48 let serialized = serialize_bincode(item)?;
49 tree.insert(key.as_ref(), serialized)?;
50 Ok(())
51 }
52
53 fn retrieve<T: serde::de::DeserializeOwned, K: AsRef<[u8]>>(
54 &self,
55 tree: &Tree,
56 key: K,
57 ) -> Result<Option<T>, sled::Error> {
58 match tree.get(key.as_ref())? {
59 Some(serialized_item) => {
60 let item = deserialize_bincode(&serialized_item)?;
61 Ok(Some(item))
62 }
63 None => Ok(None),
64 }
65 }
66
67 pub fn update<T, K, F>(&self, tree: &Tree, key: K, mut func: F) -> Result<(), sled::Error>
68 where
69 T: serde::de::DeserializeOwned + serde::Serialize,
70 K: AsRef<[u8]>,
71 F: FnMut(T) -> T,
72 {
73 let existing_bytes = tree.get(key.as_ref())?.ok_or_else(|| {
74 sled::Error::Io(std::io::Error::new(
75 std::io::ErrorKind::NotFound,
76 "Key not found",
77 ))
78 })?;
79
80 let current_item: T = deserialize_bincode(&existing_bytes)?;
82
83 let updated_item = func(current_item);
85
86 let serialized = serialize_bincode(&updated_item)?;
88 tree.insert(key.as_ref(), serialized)?;
89
90 Ok(())
91 }
92
93 pub fn store_ledger(&self, epoch: u32, ledger: &Ledger) -> Result<(), sled::Error> {
95 self.store(&self.epoch_ledgers, epoch.to_be_bytes(), ledger)
96 }
97
98 pub fn get_ledger(&self, epoch: u32) -> Result<Option<Ledger>, sled::Error> {
99 self.retrieve(&self.epoch_ledgers, epoch.to_be_bytes())
100 }
101
102 pub fn has_ledger(&self, epoch: &u32) -> Result<bool, sled::Error> {
103 self.epoch_ledgers.contains_key(epoch.to_be_bytes())
104 }
105
106 pub fn store_seed(&self, epoch: u32, seed: String) -> Result<(), sled::Error> {
107 self.store(&self.seeds, epoch.to_be_bytes(), &seed)
108 }
109
110 pub fn get_seed(&self, epoch: u32) -> Result<Option<String>, sled::Error> {
111 self.retrieve(&self.seeds, epoch.to_be_bytes())
112 }
113
114 pub fn store_slot(&self, slot: u32, slot_data: &SlotData) -> Result<(), sled::Error> {
115 self.store(&self.epoch_data, slot.to_be_bytes(), slot_data)
116 }
117
118 pub fn update_slot_status(
119 &self,
120 slot: u32,
121 block_status: SlotStatus,
122 ) -> Result<(), sled::Error> {
123 self.update(
124 &self.epoch_data,
125 slot.to_be_bytes(),
126 |mut slot_entry: SlotData| {
127 slot_entry.update_block_status(block_status.clone());
128 slot_entry
129 },
130 )
131 }
132
133 pub fn update_slot_block(
134 &self,
135 slot: u32,
136 block: SlotBlockUpdate,
137 produced: bool,
139 in_future: bool,
140 ) -> Result<(), sled::Error> {
141 self.update(
142 &self.epoch_data,
143 slot.to_be_bytes(),
144 |mut slot_entry: SlotData| {
145 slot_entry.add_block(block.clone());
146 if !produced {
147 if in_future {
148 slot_entry.update_block_status(SlotStatus::ForeignToBeProduced);
149 } else {
150 slot_entry.update_block_status(SlotStatus::Foreign);
151 }
152 }
153 slot_entry
154 },
155 )
156 }
157
158 pub fn has_slot(&self, slot: u32) -> Result<bool, sled::Error> {
159 self.epoch_data.contains_key(slot.to_be_bytes())
160 }
161
162 pub fn store_block(&self, block: Block) -> Result<(), sled::Error> {
163 self.store(&self.blocks, block.state_hash.as_bytes(), &block)?;
164 self.store(
165 &self.produced_blocks_by_slot,
166 block.global_slot().to_be_bytes(),
167 &block,
168 )
169 }
170
171 pub fn get_blocks_in_range(
172 &self,
173 start_slot: u32,
174 end_slot: u32,
175 ) -> Result<Vec<Block>, sled::Error> {
176 let start_key = start_slot.to_be_bytes();
177 let end_key = end_slot.to_be_bytes();
178
179 self.blocks
180 .range(start_key..end_key)
181 .map(|entry_result| {
182 let (_key, value) = entry_result?;
183 bincode::deserialize(&value)
184 .map_err(|e| sled::Error::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))
185 })
186 .collect()
187 }
188
189 pub fn get_slots_for_epoch(&self, epoch: u32) -> Result<Vec<SlotData>, sled::Error> {
190 let start_slot = epoch * 7140;
191 let end_slot = start_slot + 7140;
192
193 let start_key = start_slot.to_be_bytes();
195 let end_key = end_slot.to_be_bytes();
196
197 self.epoch_data
198 .range(start_key..end_key)
199 .map(|entry_result| {
200 let (_key, value) = entry_result?;
201 deserialize_bincode(&value)
202 })
203 .collect()
204 }
205
206 pub fn get_all_slots(&self) -> Result<Vec<SlotData>, sled::Error> {
207 self.epoch_data
208 .into_iter()
209 .map(|entry_result| {
210 let (_, value) = entry_result?;
211 deserialize_bincode(&value)
212 })
213 .collect()
214 }
215
216 pub fn seen_block(&self, state_hash: String) -> Result<bool, sled::Error> {
217 self.blocks.contains_key(state_hash.as_bytes())
218 }
219
220 pub fn seen_slot(&self, slot: u32) -> Result<bool, sled::Error> {
221 self.produced_blocks_by_slot
222 .contains_key(slot.to_be_bytes())
223 }
224
225 pub fn set_current_slot(&self, old: u32, new: u32) -> Result<(), sled::Error> {
226 self.update(
227 &self.epoch_data,
228 old.to_be_bytes(),
229 |mut slot_entry: SlotData| {
230 slot_entry.unset_as_current();
231 slot_entry
232 },
233 )?;
234
235 self.update(
236 &self.epoch_data,
237 new.to_be_bytes(),
238 |mut slot_entry: SlotData| {
239 slot_entry.set_as_current();
240 slot_entry
241 },
242 )
243 }
244}
245
246fn serialize_bincode<T: serde::Serialize>(item: &T) -> Result<Vec<u8>, sled::Error> {
247 bincode::serialize(item)
248 .map_err(|e| sled::Error::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))
249}
250
251fn deserialize_bincode<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Result<T, sled::Error> {
252 bincode::deserialize(bytes)
253 .map_err(|e| sled::Error::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))
254}
255
256#[cfg(test)]
257mod test {
258 use std::env;
259
260 use super::*;
261
262 #[test]
263 fn test_ledger_store_and_get() {
264 const EPOCH: u32 = 4;
265
266 let db_dir = env::temp_dir();
267 let db = Database::open(db_dir).expect("Failed to open DB");
268
269 let ledger = Ledger::load_from_file("test/files/staking-epoch-ledger.json".into())
270 .expect("Failed to load ledger file");
271 db.store_ledger(EPOCH, &ledger.clone())
272 .expect("Failed to store ledger into the DB");
273
274 let retrieved = db
275 .get_ledger(EPOCH)
276 .expect("Failed to retrieve ledger from the DB");
277
278 assert_eq!(Some(ledger), retrieved);
279 }
280
281 #[test]
282 fn test_seed_store_and_get() {
283 const EPOCH: u32 = 4;
284
285 let db_dir = env::temp_dir();
286 let db = Database::open(db_dir).expect("Failed to open DB");
287
288 let seed = "2vawAhPq9RsPXhz8NvrxB5VXuge8U9vQPGCtjqLZ5idHTUtWHWF8".to_string();
289
290 db.store_seed(EPOCH, seed.clone())
291 .expect("Failed to store seed into the DB");
292
293 let retrieved = db
294 .get_seed(EPOCH)
295 .expect("Failed to retrieve seed from the DB");
296
297 assert_eq!(Some(seed), retrieved);
298 }
299}