mina_producer_dashboard/archive/
watchdog.rs

1use crate::{archive::Block, evaluator::epoch::SlotStatus, storage::db_sled::Database, NodeStatus};
2
3use super::ArchiveConnector;
4use tokio::{task::JoinHandle, time::Duration};
5
6pub struct ArchiveWatchdog {
7    producer_pk: String,
8    archive_connector: ArchiveConnector,
9    db: Database,
10    node_status: NodeStatus,
11}
12
13impl ArchiveWatchdog {
14    pub fn spawn_new(db: Database, producer_pk: String, node_status: NodeStatus) -> JoinHandle<()> {
15        tokio::spawn(async move {
16            Self {
17                producer_pk,
18                archive_connector: ArchiveConnector::connect(super::ArchiveUrl::Env).await,
19                db,
20                node_status,
21            }
22            .run()
23            .await;
24        })
25    }
26
27    // TODO(adonagy): cleanup this mess...
28    async fn run(&self) {
29        let mut interval = tokio::time::interval(Duration::from_secs(5));
30
31        loop {
32            interval.tick().await;
33
34            println!("[archive-watchdog] Tick");
35            let node_status = self.node_status.read().await.clone();
36
37            let current_slot = node_status.current_slot();
38
39            if self
40                .db
41                .has_slot(current_slot.global_slot().to_u32())
42                .unwrap()
43            {
44                let old = current_slot.global_slot().to_u32() - 1;
45                self.db
46                    .set_current_slot(old, current_slot.global_slot().to_u32())
47                    .unwrap();
48            }
49
50            if let Some(best_tip) = node_status.best_tip() {
51                let (start, end) = best_tip.epoch_bounds().0;
52
53                let cannonical_chain = match self
54                    .archive_connector
55                    .clone()
56                    .get_canonical_chain(start.into(), end.into(), best_tip.state_hash())
57                    .await
58                {
59                    Ok(blocks) => blocks,
60                    Err(e) => {
61                        eprintln!("{e}");
62                        continue;
63                    }
64                };
65
66                let (canonical_pending, canonical): (Vec<Block>, Vec<Block>) = cannonical_chain
67                    .into_iter()
68                    .partition(|block| block.height >= (best_tip.height() - 290) as i64);
69
70                // get blocks
71                let blocks = match self
72                    .archive_connector
73                    .get_blocks_in_slot_range(start.into(), end.into())
74                    .await
75                {
76                    Ok(blocks) => blocks,
77                    Err(e) => {
78                        eprintln!("{e}");
79                        continue;
80                    }
81                };
82
83                let (our_blocks, other_blocks): (Vec<Block>, Vec<Block>) = blocks
84                    .into_iter()
85                    .partition(|block| block.creator_key == self.producer_pk);
86
87                our_blocks.iter().for_each(|block| {
88                    let slot = block.global_slot_since_hard_fork as u32;
89                    if self
90                        .db
91                        .seen_block(block.state_hash.clone())
92                        .ok()
93                        .unwrap_or_default()
94                    {
95                        if canonical.contains(block) {
96                            self.db
97                                .update_slot_status(slot, SlotStatus::Canonical)
98                                .unwrap();
99                        } else if canonical_pending.contains(block) {
100                            self.db
101                                .update_slot_status(slot, SlotStatus::CanonicalPending)
102                                .unwrap();
103                        } else if block.height >= (best_tip.height() - 290) as i64 {
104                            self.db
105                                .update_slot_status(slot, SlotStatus::OrphanedPending)
106                                .unwrap();
107                        } else {
108                            self.db
109                                .update_slot_status(slot, SlotStatus::Orphaned)
110                                .unwrap();
111                        }
112                    } else if self.db.has_slot(slot).unwrap_or_default() {
113                        println!("[archive] saw produced block: {}", block.state_hash);
114                        self.db.store_block(block.clone()).unwrap();
115                        self.db
116                            .update_slot_block(slot, block.into(), true, false)
117                            .unwrap();
118                    }
119                });
120
121                other_blocks.iter().for_each(|block| {
122                    let slot = block.global_slot();
123                    if self.db.has_slot(slot).ok().unwrap_or_default()
124                        && !self.db.seen_slot(slot).ok().unwrap_or_default()
125                    {
126                        if slot < current_slot.global_slot().to_u32() {
127                            self.db
128                                .update_slot_block(slot, block.into(), false, false)
129                                .unwrap();
130                        } else {
131                            self.db
132                                .update_slot_block(slot, block.into(), false, true)
133                                .unwrap();
134                        }
135                    }
136                });
137            }
138        }
139    }
140}