mina_producer_dashboard/archive/
watchdog.rs1use crate::{archive::Block, evaluator::epoch::SlotStatus, storage::db_sled::Database, NodeStatus};
2
3use super::ArchiveConnector;
4use tokio::{task::JoinHandle, time::Duration};
5
6pub struct ArchiveWatchdog {
7 producer_pk: String,
8 archive_connector: ArchiveConnector,
9 db: Database,
10 node_status: NodeStatus,
11}
12
13impl ArchiveWatchdog {
14 pub fn spawn_new(db: Database, producer_pk: String, node_status: NodeStatus) -> JoinHandle<()> {
15 tokio::spawn(async move {
16 Self {
17 producer_pk,
18 archive_connector: ArchiveConnector::connect(super::ArchiveUrl::Env).await,
19 db,
20 node_status,
21 }
22 .run()
23 .await;
24 })
25 }
26
27 async fn run(&self) {
29 let mut interval = tokio::time::interval(Duration::from_secs(5));
30
31 loop {
32 interval.tick().await;
33
34 println!("[archive-watchdog] Tick");
35 let node_status = self.node_status.read().await.clone();
36
37 let current_slot = node_status.current_slot();
38
39 if self
40 .db
41 .has_slot(current_slot.global_slot().to_u32())
42 .unwrap()
43 {
44 let old = current_slot.global_slot().to_u32() - 1;
45 self.db
46 .set_current_slot(old, current_slot.global_slot().to_u32())
47 .unwrap();
48 }
49
50 if let Some(best_tip) = node_status.best_tip() {
51 let (start, end) = best_tip.epoch_bounds().0;
52
53 let cannonical_chain = match self
54 .archive_connector
55 .clone()
56 .get_canonical_chain(start.into(), end.into(), best_tip.state_hash())
57 .await
58 {
59 Ok(blocks) => blocks,
60 Err(e) => {
61 eprintln!("{e}");
62 continue;
63 }
64 };
65
66 let (canonical_pending, canonical): (Vec<Block>, Vec<Block>) = cannonical_chain
67 .into_iter()
68 .partition(|block| block.height >= (best_tip.height() - 290) as i64);
69
70 let blocks = match self
72 .archive_connector
73 .get_blocks_in_slot_range(start.into(), end.into())
74 .await
75 {
76 Ok(blocks) => blocks,
77 Err(e) => {
78 eprintln!("{e}");
79 continue;
80 }
81 };
82
83 let (our_blocks, other_blocks): (Vec<Block>, Vec<Block>) = blocks
84 .into_iter()
85 .partition(|block| block.creator_key == self.producer_pk);
86
87 our_blocks.iter().for_each(|block| {
88 let slot = block.global_slot_since_hard_fork as u32;
89 if self
90 .db
91 .seen_block(block.state_hash.clone())
92 .ok()
93 .unwrap_or_default()
94 {
95 if canonical.contains(block) {
96 self.db
97 .update_slot_status(slot, SlotStatus::Canonical)
98 .unwrap();
99 } else if canonical_pending.contains(block) {
100 self.db
101 .update_slot_status(slot, SlotStatus::CanonicalPending)
102 .unwrap();
103 } else if block.height >= (best_tip.height() - 290) as i64 {
104 self.db
105 .update_slot_status(slot, SlotStatus::OrphanedPending)
106 .unwrap();
107 } else {
108 self.db
109 .update_slot_status(slot, SlotStatus::Orphaned)
110 .unwrap();
111 }
112 } else if self.db.has_slot(slot).unwrap_or_default() {
113 println!("[archive] saw produced block: {}", block.state_hash);
114 self.db.store_block(block.clone()).unwrap();
115 self.db
116 .update_slot_block(slot, block.into(), true, false)
117 .unwrap();
118 }
119 });
120
121 other_blocks.iter().for_each(|block| {
122 let slot = block.global_slot();
123 if self.db.has_slot(slot).ok().unwrap_or_default()
124 && !self.db.seen_slot(slot).ok().unwrap_or_default()
125 {
126 if slot < current_slot.global_slot().to_u32() {
127 self.db
128 .update_slot_block(slot, block.into(), false, false)
129 .unwrap();
130 } else {
131 self.db
132 .update_slot_block(slot, block.into(), false, true)
133 .unwrap();
134 }
135 }
136 });
137 }
138 }
139 }
140}