mina_producer_dashboard/node/
mod.rs

1use ::reqwest::Client;
2use graphql_client::{reqwest::post_graphql, GraphQLQuery};
3use num_bigint::BigInt;
4use serde::{Deserialize, Deserializer, Serialize};
5use std::{collections::BTreeSet, process::Command, str::FromStr};
6use time::{format_description::well_known::Rfc3339, OffsetDateTime};
7
8use crate::{
9    evaluator::epoch::{RawGlobalSlot, RawSlot},
10    StakingToolError,
11};
12
13pub mod epoch_ledgers;
14pub mod watchdog;
15
16use self::{daemon_status::SyncStatus, epoch_ledgers::Ledger};
17
18type PublicKey = String;
19type StateHash = String;
20type FeeTransferType = String;
21type UserCommandKind = String;
22type Amount = StringNumber;
23type Fee = StringNumber;
24type Epoch = String;
25type Length = String;
26type EpochSeed = String;
27type Slot = String;
28type Globalslot = String;
29
30pub fn calc_slot_timestamp(genesis_timestamp: i64, global_slot: u32) -> i64 {
31    genesis_timestamp + ((global_slot as i64) * 60 * 3)
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct Node {
36    url: String,
37}
38
39impl Node {
40    pub fn new(url: String) -> Self {
41        Self { url }
42    }
43
44    pub async fn wait_for_graphql(&self) -> Result<(), StakingToolError> {
45        let client = Client::builder()
46            .user_agent("graphql-rust/0.10.0")
47            .build()
48            .unwrap();
49
50        let timeout_duration = tokio::time::Duration::from_secs(120); // 2 minutes
51        let start_time = tokio::time::Instant::now();
52
53        while tokio::time::Instant::now() - start_time < timeout_duration {
54            match client.get(&self.url).send().await {
55                Ok(response) => {
56                    println!("[wait_for_graphql] Response status: {}", response.status());
57                    if response.status().is_client_error() {
58                        return Ok(()); // URL is reachable and returns a successful status
59                    }
60                }
61                Err(_) => {
62                    println!("Waiting for node...");
63                }
64            }
65            // Wait for some time before the next retry
66            tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
67        }
68
69        Err(StakingToolError::NodeOffline)
70    }
71
72    pub async fn sync_status(&self) -> SyncStatus {
73        let client = Client::builder()
74            .user_agent("graphql-rust/0.10.0")
75            .build()
76            .unwrap();
77
78        let variables = daemon_status::Variables {};
79
80        let response_body = post_graphql::<DaemonStatus, _>(&client, &self.url, variables)
81            .await
82            .unwrap();
83
84        let response_data: daemon_status::ResponseData = response_body
85            .data
86            .ok_or(StakingToolError::EmptyGraphqlResponse)
87            .unwrap();
88
89        response_data.daemon_status.sync_status
90    }
91
92    pub async fn get_genesis_timestmap(&self) -> Result<i64, StakingToolError> {
93        let client = Client::builder()
94            .user_agent("graphql-rust/0.10.0")
95            .build()
96            .unwrap();
97
98        let variables = genesis_timestamp::Variables {};
99
100        let response_body = post_graphql::<GenesisTimestamp, _>(&client, &self.url, variables)
101            .await
102            .unwrap();
103        let response_data: genesis_timestamp::ResponseData = response_body
104            .data
105            .ok_or(StakingToolError::EmptyGraphqlResponse)?;
106
107        let timestamp_formatted = response_data.genesis_constants.genesis_timestamp;
108        let datetime = OffsetDateTime::parse(&timestamp_formatted, &Rfc3339).unwrap();
109        Ok(datetime.unix_timestamp())
110    }
111
112    #[allow(dead_code)]
113    pub async fn get_best_chain(&self) -> Result<Vec<(u32, String)>, StakingToolError> {
114        let client = Client::builder()
115            .user_agent("graphql-rust/0.10.0")
116            .build()
117            .unwrap();
118
119        let variables = best_chain::Variables { max_length: 290 };
120        let response_body = post_graphql::<BestChain, _>(&client, &self.url, variables)
121            .await
122            .unwrap();
123
124        let response_data: best_chain::ResponseData = response_body
125            .data
126            .ok_or(StakingToolError::EmptyGraphqlResponse)?;
127
128        response_data
129            .best_chain
130            .ok_or(StakingToolError::EmptyGraphqlResponse)
131            .map(|v| {
132                v.iter()
133                    .map(|bc| {
134                        (
135                            bc.protocol_state
136                                .consensus_state
137                                .slot_since_genesis
138                                .parse()
139                                .unwrap(),
140                            bc.state_hash.clone(),
141                        )
142                    })
143                    .collect()
144            })
145    }
146
147    pub async fn get_best_tip(&self) -> Result<BestTip, StakingToolError> {
148        let client = Client::builder()
149            .user_agent("graphql-rust/0.10.0")
150            .build()
151            .unwrap();
152
153        let variables = best_chain::Variables { max_length: 1 };
154        let response_body = post_graphql::<BestChain, _>(&client, &self.url, variables)
155            .await
156            .unwrap();
157
158        let response_data: best_chain::ResponseData = response_body
159            .data
160            .ok_or(StakingToolError::EmptyGraphqlResponse)?;
161
162        response_data
163            .best_chain
164            .map(|res| res.first().cloned().unwrap().into())
165            .ok_or(StakingToolError::EmptyGraphqlResponse)
166    }
167
168    fn dump_current_staking_ledger() -> impl AsRef<[u8]> {
169        let output = Command::new("mina")
170            .args([
171                "ledger",
172                "export",
173                "--daemon-port",
174                "mina:8301",
175                "staking-epoch-ledger",
176            ])
177            .output()
178            .expect("Failed to execute command");
179
180        if !output.status.success() {
181            let error_message = String::from_utf8_lossy(&output.stderr);
182            panic!("Command execution failed with error: {}", error_message);
183        }
184
185        output.stdout
186    }
187
188    pub fn get_staking_ledger(_epoch_number: u32) -> Ledger {
189        let raw = Self::dump_current_staking_ledger();
190        let inner = serde_json::from_slice(raw.as_ref()).unwrap();
191        Ledger::new(inner)
192    }
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize)]
196pub struct NodeData {
197    best_tip: Option<BestTip>,
198    best_chain: Vec<(u32, String)>,
199    sync_status: SyncStatus,
200    dumped_ledgers: BTreeSet<u32>,
201    // TODO: make sure it's available, make it an option
202    genesis_timestamp: i64,
203}
204
205impl Default for NodeData {
206    fn default() -> Self {
207        Self {
208            best_tip: Default::default(),
209            best_chain: Default::default(),
210            sync_status: SyncStatus::OFFLINE,
211            dumped_ledgers: Default::default(),
212            genesis_timestamp: 0,
213        }
214    }
215}
216
217impl NodeData {
218    // TODO(adonagy): Hydrate from db
219    // pub fn new()
220
221    pub fn transition_frontier_root(&self) -> Option<u32> {
222        self.best_chain.first().map(|v| v.0)
223    }
224
225    pub fn best_tip(&self) -> Option<BestTip> {
226        // TODO
227        self.best_tip.clone()
228    }
229
230    pub fn current_slot(&self) -> CurrentSlot {
231        let now = OffsetDateTime::now_utc().unix_timestamp();
232
233        let elapsed = now - self.genesis_timestamp;
234
235        let slot = (elapsed / (3 * 60)) as u32;
236        CurrentSlot::new(slot)
237    }
238
239    pub fn best_chain(&self) -> &[(u32, String)] {
240        self.best_chain.as_slice()
241    }
242}
243
244#[derive(Debug, Clone, Serialize, Deserialize)]
245pub struct CurrentSlot {
246    slot: RawSlot,
247    global_slot: RawGlobalSlot,
248}
249
250impl CurrentSlot {
251    pub fn new(global_slot: u32) -> Self {
252        let global_slot: RawGlobalSlot = global_slot.into();
253
254        Self {
255            global_slot: global_slot.clone(),
256            slot: global_slot.into(),
257        }
258    }
259
260    pub fn global_slot(&self) -> RawGlobalSlot {
261        self.global_slot.clone()
262    }
263}
264
265#[allow(dead_code)]
266#[derive(Debug, Clone)]
267struct StringNumber(BigInt);
268
269#[allow(dead_code)]
270impl StringNumber {
271    pub fn to_bigint(&self) -> BigInt {
272        self.0.clone()
273    }
274}
275
276impl From<BigInt> for StringNumber {
277    fn from(value: BigInt) -> Self {
278        Self(value)
279    }
280}
281
282impl<'de> Deserialize<'de> for StringNumber {
283    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
284    where
285        D: Deserializer<'de>,
286    {
287        let s = String::deserialize(deserializer)?;
288        BigInt::from_str(&s)
289            .map(StringNumber)
290            .map_err(serde::de::Error::custom)
291    }
292}
293
294impl Serialize for StringNumber {
295    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
296    where
297        S: serde::Serializer,
298    {
299        let as_string = self.0.to_string();
300        serializer.serialize_str(&as_string)
301    }
302}
303
304#[derive(GraphQLQuery)]
305#[graphql(
306    schema_path = "src/graphql/schema.json",
307    query_path = "src/graphql/daemon_status.graphql",
308    response_derives = "Debug, Clone"
309)]
310struct DaemonStatus;
311
312#[derive(GraphQLQuery)]
313#[graphql(
314    schema_path = "src/graphql/schema.json",
315    query_path = "src/graphql/genesis_timestamp.graphql",
316    response_derives = "Debug, Clone"
317)]
318struct GenesisTimestamp;
319
320#[derive(GraphQLQuery)]
321#[graphql(
322    schema_path = "src/graphql/schema.json",
323    query_path = "src/graphql/best_chain.graphql",
324    response_derives = "Debug, Clone, Serialize"
325)]
326struct BestChain;
327
328#[derive(Debug, Clone, Serialize, Deserialize)]
329pub struct BestTip(best_chain::BestChainBestChain);
330
331impl BestTip {
332    pub fn consensus_state(&self) -> &best_chain::BestChainBestChainProtocolStateConsensusState {
333        &self.0.protocol_state.consensus_state
334    }
335
336    pub fn epoch_bounds(&self) -> ((u32, u32), (u32, u32)) {
337        // TODO(adonagy): get the data from the node + unwrap
338        const SLOTS_PER_EPOCH: u32 = 7140;
339        let current_epoch = self.consensus_state().epoch.parse::<u32>().unwrap();
340        let current_start = current_epoch * SLOTS_PER_EPOCH;
341        let current_end = current_epoch * SLOTS_PER_EPOCH + SLOTS_PER_EPOCH - 1;
342
343        let next_epoch = current_epoch + 1;
344        let next_start = next_epoch * SLOTS_PER_EPOCH;
345        let next_end = next_start + SLOTS_PER_EPOCH - 1;
346
347        ((current_start, current_end), (next_start, next_end))
348    }
349
350    pub fn height(&self) -> u32 {
351        self.consensus_state().block_height.parse().unwrap()
352    }
353
354    pub fn epoch(&self) -> u32 {
355        self.consensus_state().epoch.parse().unwrap()
356    }
357
358    pub fn state_hash(&self) -> String {
359        self.0.state_hash.clone()
360    }
361}
362
363impl From<best_chain::BestChainBestChain> for BestTip {
364    fn from(value: best_chain::BestChainBestChain) -> Self {
365        BestTip(value)
366    }
367}
368
369impl From<BestTip> for best_chain::BestChainBestChain {
370    fn from(value: BestTip) -> Self {
371        value.0
372    }
373}