mina_producer_dashboard/storage/
db_sled.rs

1use sled::{Db, Tree};
2use std::path::PathBuf;
3
4use crate::{
5    archive::Block,
6    evaluator::epoch::{SlotBlockUpdate, SlotData, SlotStatus},
7    node::epoch_ledgers::Ledger,
8};
9
10#[derive(Clone)]
11pub struct Database {
12    _db: Db,
13    epoch_data: Tree,
14    seeds: Tree,
15    epoch_ledgers: Tree,
16    blocks: Tree,
17    produced_blocks_by_slot: Tree,
18    _summaries: Tree,
19}
20
21impl Database {
22    pub fn open(path: PathBuf) -> Result<Self, sled::Error> {
23        let db = sled::open(path)?;
24        let seeds = db.open_tree("seeds")?;
25        let current_epoch = db.open_tree("current_epoch")?;
26        let epoch_ledgers = db.open_tree("epoch_ledgers")?;
27        let blocks = db.open_tree("produced_blocks")?;
28        let produced_blocks_by_slot = db.open_tree("produced_blocks_by_slot")?;
29        let summaries = db.open_tree("sumaries")?;
30
31        Ok(Self {
32            _db: db,
33            epoch_data: current_epoch,
34            seeds,
35            epoch_ledgers,
36            blocks,
37            produced_blocks_by_slot,
38            _summaries: summaries,
39        })
40    }
41
42    fn store<T: serde::Serialize, K: AsRef<[u8]>>(
43        &self,
44        tree: &Tree,
45        key: K,
46        item: &T,
47    ) -> Result<(), sled::Error> {
48        let serialized = serialize_bincode(item)?;
49        tree.insert(key.as_ref(), serialized)?;
50        Ok(())
51    }
52
53    fn retrieve<T: serde::de::DeserializeOwned, K: AsRef<[u8]>>(
54        &self,
55        tree: &Tree,
56        key: K,
57    ) -> Result<Option<T>, sled::Error> {
58        match tree.get(key.as_ref())? {
59            Some(serialized_item) => {
60                let item = deserialize_bincode(&serialized_item)?;
61                Ok(Some(item))
62            }
63            None => Ok(None),
64        }
65    }
66
67    pub fn update<T, K, F>(&self, tree: &Tree, key: K, mut func: F) -> Result<(), sled::Error>
68    where
69        T: serde::de::DeserializeOwned + serde::Serialize,
70        K: AsRef<[u8]>,
71        F: FnMut(T) -> T,
72    {
73        let existing_bytes = tree.get(key.as_ref())?.ok_or_else(|| {
74            sled::Error::Io(std::io::Error::new(
75                std::io::ErrorKind::NotFound,
76                "Key not found",
77            ))
78        })?;
79
80        // Deserialize the current value
81        let current_item: T = deserialize_bincode(&existing_bytes)?;
82
83        // Apply the update function to get the new item
84        let updated_item = func(current_item);
85
86        // Serialize and store the updated item
87        let serialized = serialize_bincode(&updated_item)?;
88        tree.insert(key.as_ref(), serialized)?;
89
90        Ok(())
91    }
92
93    // Specific methods using the generic helpers
94    pub fn store_ledger(&self, epoch: u32, ledger: &Ledger) -> Result<(), sled::Error> {
95        self.store(&self.epoch_ledgers, epoch.to_be_bytes(), ledger)
96    }
97
98    pub fn get_ledger(&self, epoch: u32) -> Result<Option<Ledger>, sled::Error> {
99        self.retrieve(&self.epoch_ledgers, epoch.to_be_bytes())
100    }
101
102    pub fn has_ledger(&self, epoch: &u32) -> Result<bool, sled::Error> {
103        self.epoch_ledgers.contains_key(epoch.to_be_bytes())
104    }
105
106    pub fn store_seed(&self, epoch: u32, seed: String) -> Result<(), sled::Error> {
107        self.store(&self.seeds, epoch.to_be_bytes(), &seed)
108    }
109
110    pub fn get_seed(&self, epoch: u32) -> Result<Option<String>, sled::Error> {
111        self.retrieve(&self.seeds, epoch.to_be_bytes())
112    }
113
114    pub fn store_slot(&self, slot: u32, slot_data: &SlotData) -> Result<(), sled::Error> {
115        self.store(&self.epoch_data, slot.to_be_bytes(), slot_data)
116    }
117
118    pub fn update_slot_status(
119        &self,
120        slot: u32,
121        block_status: SlotStatus,
122    ) -> Result<(), sled::Error> {
123        self.update(
124            &self.epoch_data,
125            slot.to_be_bytes(),
126            |mut slot_entry: SlotData| {
127                slot_entry.update_block_status(block_status.clone());
128                slot_entry
129            },
130        )
131    }
132
133    pub fn update_slot_block(
134        &self,
135        slot: u32,
136        block: SlotBlockUpdate,
137        // TODO: simplify
138        produced: bool,
139        in_future: bool,
140    ) -> Result<(), sled::Error> {
141        self.update(
142            &self.epoch_data,
143            slot.to_be_bytes(),
144            |mut slot_entry: SlotData| {
145                slot_entry.add_block(block.clone());
146                if !produced {
147                    if in_future {
148                        slot_entry.update_block_status(SlotStatus::ForeignToBeProduced);
149                    } else {
150                        slot_entry.update_block_status(SlotStatus::Foreign);
151                    }
152                }
153                slot_entry
154            },
155        )
156    }
157
158    pub fn has_slot(&self, slot: u32) -> Result<bool, sled::Error> {
159        self.epoch_data.contains_key(slot.to_be_bytes())
160    }
161
162    pub fn store_block(&self, block: Block) -> Result<(), sled::Error> {
163        self.store(&self.blocks, block.state_hash.as_bytes(), &block)?;
164        self.store(
165            &self.produced_blocks_by_slot,
166            block.global_slot().to_be_bytes(),
167            &block,
168        )
169    }
170
171    pub fn get_blocks_in_range(
172        &self,
173        start_slot: u32,
174        end_slot: u32,
175    ) -> Result<Vec<Block>, sled::Error> {
176        let start_key = start_slot.to_be_bytes();
177        let end_key = end_slot.to_be_bytes();
178
179        self.blocks
180            .range(start_key..end_key)
181            .map(|entry_result| {
182                let (_key, value) = entry_result?;
183                bincode::deserialize(&value)
184                    .map_err(|e| sled::Error::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))
185            })
186            .collect()
187    }
188
189    pub fn get_slots_for_epoch(&self, epoch: u32) -> Result<Vec<SlotData>, sled::Error> {
190        let start_slot = epoch * 7140;
191        let end_slot = start_slot + 7140;
192
193        // Convert slot numbers to byte arrays for the range query
194        let start_key = start_slot.to_be_bytes();
195        let end_key = end_slot.to_be_bytes();
196
197        self.epoch_data
198            .range(start_key..end_key)
199            .map(|entry_result| {
200                let (_key, value) = entry_result?;
201                deserialize_bincode(&value)
202            })
203            .collect()
204    }
205
206    pub fn get_all_slots(&self) -> Result<Vec<SlotData>, sled::Error> {
207        self.epoch_data
208            .into_iter()
209            .map(|entry_result| {
210                let (_, value) = entry_result?;
211                deserialize_bincode(&value)
212            })
213            .collect()
214    }
215
216    pub fn seen_block(&self, state_hash: String) -> Result<bool, sled::Error> {
217        self.blocks.contains_key(state_hash.as_bytes())
218    }
219
220    pub fn seen_slot(&self, slot: u32) -> Result<bool, sled::Error> {
221        self.produced_blocks_by_slot
222            .contains_key(slot.to_be_bytes())
223    }
224
225    pub fn set_current_slot(&self, old: u32, new: u32) -> Result<(), sled::Error> {
226        self.update(
227            &self.epoch_data,
228            old.to_be_bytes(),
229            |mut slot_entry: SlotData| {
230                slot_entry.unset_as_current();
231                slot_entry
232            },
233        )?;
234
235        self.update(
236            &self.epoch_data,
237            new.to_be_bytes(),
238            |mut slot_entry: SlotData| {
239                slot_entry.set_as_current();
240                slot_entry
241            },
242        )
243    }
244}
245
246fn serialize_bincode<T: serde::Serialize>(item: &T) -> Result<Vec<u8>, sled::Error> {
247    bincode::serialize(item)
248        .map_err(|e| sled::Error::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))
249}
250
251fn deserialize_bincode<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Result<T, sled::Error> {
252    bincode::deserialize(bytes)
253        .map_err(|e| sled::Error::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))
254}
255
256#[cfg(test)]
257mod test {
258    use std::env;
259
260    use super::*;
261
262    #[test]
263    fn test_ledger_store_and_get() {
264        const EPOCH: u32 = 4;
265
266        let db_dir = env::temp_dir();
267        let db = Database::open(db_dir).expect("Failed to open DB");
268
269        let ledger = Ledger::load_from_file("test/files/staking-epoch-ledger.json".into())
270            .expect("Failed to load ledger file");
271        db.store_ledger(EPOCH, &ledger.clone())
272            .expect("Failed to store ledger into the DB");
273
274        let retrieved = db
275            .get_ledger(EPOCH)
276            .expect("Failed to retrieve ledger from the DB");
277
278        assert_eq!(Some(ledger), retrieved);
279    }
280
281    #[test]
282    fn test_seed_store_and_get() {
283        const EPOCH: u32 = 4;
284
285        let db_dir = env::temp_dir();
286        let db = Database::open(db_dir).expect("Failed to open DB");
287
288        let seed = "2vawAhPq9RsPXhz8NvrxB5VXuge8U9vQPGCtjqLZ5idHTUtWHWF8".to_string();
289
290        db.store_seed(EPOCH, seed.clone())
291            .expect("Failed to store seed into the DB");
292
293        let retrieved = db
294            .get_seed(EPOCH)
295            .expect("Failed to retrieve seed from the DB");
296
297        assert_eq!(Some(seed), retrieved);
298    }
299}