node/stats/
stats_block_producer.rs

1use std::collections::{BTreeMap, VecDeque};
2
3use ledger::AccountIndex;
4use mina_p2p_messages::v2;
5use openmina_core::block::{AppliedBlock, ArcBlockWithHash};
6use serde::{Deserialize, Serialize};
7
8use crate::{
9    block_producer::{BlockProducerWonSlot, BlockProducerWonSlotDiscardReason, BlockWithoutProof},
10    core::block::BlockHash,
11};
12
13const MAX_HISTORY: usize = 2048;
14
15#[derive(Serialize, Deserialize, Debug, Default, Clone)]
16pub struct BlockProducerStats {
17    pub(super) attempts: VecDeque<BlockProductionAttempt>,
18    pub vrf_evaluator: BTreeMap<u32, VrfEvaluatorStats>,
19    pub last_produced_block: Option<ArcBlockWithHash>,
20}
21
22#[derive(Serialize, Deserialize, Debug, Clone)]
23pub struct BlockProductionAttempt {
24    pub won_slot: BlockProductionAttemptWonSlot,
25    pub block: Option<ProducedBlock>,
26    pub times: BlockProductionTimes,
27    #[serde(flatten)]
28    pub status: BlockProductionStatus,
29}
30
31#[derive(Serialize, Deserialize, Debug, Clone)]
32pub struct BlockProductionAttemptWonSlot {
33    pub slot_time: redux::Timestamp,
34    pub global_slot: u32,
35    pub epoch: u32,
36    pub delegator: (v2::NonZeroCurvePoint, AccountIndex),
37    pub value_with_threshold: Option<(f64, f64)>,
38}
39
40#[derive(Serialize, Deserialize, Debug, Clone)]
41pub struct BlockProductionTimes {
42    pub scheduled: redux::Timestamp,
43    pub staged_ledger_diff_create_start: Option<redux::Timestamp>,
44    pub staged_ledger_diff_create_end: Option<redux::Timestamp>,
45    pub produced: Option<redux::Timestamp>,
46    pub proof_create_start: Option<redux::Timestamp>,
47    pub proof_create_end: Option<redux::Timestamp>,
48    pub block_apply_start: Option<redux::Timestamp>,
49    pub block_apply_end: Option<redux::Timestamp>,
50    pub committed: Option<redux::Timestamp>,
51    pub discarded: Option<redux::Timestamp>,
52}
53
54#[derive(Serialize, Deserialize, Debug, Clone)]
55#[serde(tag = "status")]
56pub enum BlockProductionStatus {
57    Scheduled,
58    StagedLedgerDiffCreatePending,
59    StagedLedgerDiffCreateSuccess,
60    Produced,
61    ProofCreatePending,
62    ProofCreateSuccess,
63    BlockApplyPending,
64    BlockApplySuccess,
65    Committed,
66    Canonical {
67        last_observed_confirmations: u32,
68    },
69    Orphaned {
70        orphaned_by: BlockHash,
71    },
72    Discarded {
73        discard_reason: BlockProducerWonSlotDiscardReason,
74    },
75}
76
77#[derive(Serialize, Deserialize, Debug, Clone)]
78pub struct ProducedBlock {
79    pub hash: BlockHash,
80    pub height: u32,
81    pub transactions: ProducedBlockTransactions,
82    pub completed_works_count: usize,
83    pub coinbase: u64,
84    pub fees: u64,
85    pub snark_fees: u64,
86}
87
88#[derive(Serialize, Deserialize, Debug, Default, Clone)]
89pub struct ProducedBlockTransactions {
90    pub payments: u16,
91    pub delegations: u16,
92    pub zkapps: u16,
93}
94
95#[derive(Serialize, Deserialize, Debug, Clone)]
96pub struct VrfEvaluatorStats {
97    pub total_slots: u32,
98    pub evaluated_slots: u32,
99}
100
101impl Default for VrfEvaluatorStats {
102    fn default() -> Self {
103        Self {
104            total_slots: 7140,
105            evaluated_slots: 0,
106        }
107    }
108}
109
110impl BlockProducerStats {
111    fn latest_attempt_block_hash_matches(&self, hash: &BlockHash) -> bool {
112        self.attempts
113            .back()
114            .and_then(|v| v.block.as_ref())
115            .is_some_and(|b| &b.hash == hash)
116    }
117
118    pub fn collect_attempts(&self) -> Vec<BlockProductionAttempt> {
119        self.attempts.iter().cloned().collect()
120    }
121
122    pub fn new_best_chain(&mut self, time: redux::Timestamp, chain: &[AppliedBlock]) {
123        let (best_tip, chain) = chain.split_last().unwrap();
124        let root_block = chain.first().unwrap_or(best_tip);
125
126        self.committed(time, best_tip.hash());
127
128        self.attempts
129            .iter_mut()
130            .rev()
131            .take_while(|v| v.won_slot.global_slot >= root_block.global_slot())
132            .filter(|attempt| {
133                matches!(
134                    attempt.status,
135                    BlockProductionStatus::Committed
136                        | BlockProductionStatus::Canonical { .. }
137                        | BlockProductionStatus::Orphaned { .. }
138                )
139            })
140            .for_each(|attempt| {
141                let Some(block) = attempt.block.as_ref() else {
142                    return;
143                };
144                let Some(i) = block.height.checked_sub(root_block.height()) else {
145                    return;
146                };
147
148                match chain.get(i as usize) {
149                    Some(b) if b.hash() == &block.hash => {
150                        attempt.status = BlockProductionStatus::Canonical {
151                            last_observed_confirmations: best_tip
152                                .height()
153                                .saturating_sub(block.height),
154                        };
155                    }
156                    Some(b) => {
157                        attempt.status = BlockProductionStatus::Orphaned {
158                            orphaned_by: b.hash().clone(),
159                        };
160                    }
161                    None => {}
162                }
163            });
164    }
165
166    fn update<F>(&mut self, kind: &'static str, with: F)
167    where
168        F: FnOnce(&mut BlockProductionAttempt) -> bool,
169    {
170        match self.attempts.pop_back() {
171            None => {
172                openmina_core::log::error!(openmina_core::log::system_time();
173                    kind = "BlockProducerStatsAttemptsEmpty",
174                    summary = "attempts are empty when they aren't expected to be",
175                    update_kind = kind);
176            }
177            Some(mut attempt) => {
178                let was_correct_state = with(&mut attempt);
179
180                if !was_correct_state {
181                    openmina_core::log::error!(openmina_core::log::system_time();
182                        kind = "BlockProducerStatsAttemptUnexpectedState",
183                        summary = format!("update kind `{kind}` is not applicable to state: {attempt:?}"));
184                }
185                self.attempts.push_back(attempt);
186            }
187        }
188    }
189
190    pub fn scheduled(&mut self, time: redux::Timestamp, won_slot: &BlockProducerWonSlot) {
191        if self.attempts.len() >= MAX_HISTORY {
192            self.attempts.pop_front();
193        }
194        self.attempts.push_back(BlockProductionAttempt {
195            won_slot: won_slot.into(),
196            block: None,
197            times: BlockProductionTimes {
198                scheduled: time,
199                staged_ledger_diff_create_start: None,
200                staged_ledger_diff_create_end: None,
201                produced: None,
202                proof_create_start: None,
203                proof_create_end: None,
204                block_apply_start: None,
205                block_apply_end: None,
206                committed: None,
207                discarded: None,
208            },
209            status: BlockProductionStatus::Scheduled,
210        });
211    }
212
213    pub fn staged_ledger_diff_create_start(&mut self, time: redux::Timestamp) {
214        self.update(
215            "staged_ledger_diff_create_start",
216            move |attempt| match attempt.status {
217                BlockProductionStatus::Scheduled => {
218                    attempt.status = BlockProductionStatus::StagedLedgerDiffCreatePending;
219                    attempt.times.staged_ledger_diff_create_start = Some(time);
220                    true
221                }
222                _ => false,
223            },
224        );
225    }
226
227    pub fn staged_ledger_diff_create_end(&mut self, time: redux::Timestamp) {
228        self.update(
229            "staged_ledger_diff_create_end",
230            move |attempt| match attempt.status {
231                BlockProductionStatus::StagedLedgerDiffCreatePending => {
232                    attempt.status = BlockProductionStatus::StagedLedgerDiffCreateSuccess;
233                    attempt.times.staged_ledger_diff_create_end = Some(time);
234                    true
235                }
236                _ => false,
237            },
238        );
239    }
240
241    pub fn produced(
242        &mut self,
243        time: redux::Timestamp,
244        block_hash: &BlockHash,
245        block: &BlockWithoutProof,
246    ) {
247        self.update("produced", move |attempt| match attempt.status {
248            BlockProductionStatus::StagedLedgerDiffCreateSuccess => {
249                attempt.status = BlockProductionStatus::Produced;
250                attempt.times.produced = Some(time);
251                attempt.block = Some((block_hash, block).into());
252                true
253            }
254            _ => false,
255        });
256    }
257
258    pub fn proof_create_start(&mut self, time: redux::Timestamp) {
259        self.update("proof_create_start", move |attempt| match attempt.status {
260            BlockProductionStatus::Produced => {
261                attempt.status = BlockProductionStatus::ProofCreatePending;
262                attempt.times.proof_create_start = Some(time);
263                true
264            }
265            _ => false,
266        });
267    }
268
269    pub fn proof_create_end(&mut self, time: redux::Timestamp) {
270        self.update("proof_create_end", move |attempt| match attempt.status {
271            BlockProductionStatus::ProofCreatePending => {
272                attempt.status = BlockProductionStatus::ProofCreateSuccess;
273                attempt.times.proof_create_end = Some(time);
274                true
275            }
276            _ => false,
277        });
278    }
279
280    pub fn block_apply_start(&mut self, time: redux::Timestamp, hash: &BlockHash) {
281        if !self.is_our_just_produced_block(hash) {
282            return;
283        }
284
285        self.update("block_apply_start", move |attempt| match attempt.status {
286            BlockProductionStatus::ProofCreateSuccess => {
287                attempt.status = BlockProductionStatus::BlockApplyPending;
288                attempt.times.block_apply_start = Some(time);
289                true
290            }
291            _ => false,
292        });
293    }
294
295    pub fn block_apply_end(&mut self, time: redux::Timestamp, hash: &BlockHash) {
296        if !self.latest_attempt_block_hash_matches(hash) {
297            return;
298        }
299
300        self.update("block_apply_end", move |attempt| match attempt.status {
301            BlockProductionStatus::BlockApplyPending => {
302                attempt.status = BlockProductionStatus::BlockApplySuccess;
303                attempt.times.block_apply_end = Some(time);
304                true
305            }
306            _ => false,
307        });
308    }
309
310    pub fn committed(&mut self, time: redux::Timestamp, hash: &BlockHash) {
311        if !self.latest_attempt_block_hash_matches(hash) {
312            return;
313        }
314
315        self.update("committed", move |attempt| match attempt.status {
316            BlockProductionStatus::BlockApplySuccess => {
317                attempt.status = BlockProductionStatus::Committed;
318                attempt.times.committed = Some(time);
319                true
320            }
321            _ => false,
322        });
323    }
324
325    pub fn discarded(&mut self, time: redux::Timestamp, reason: BlockProducerWonSlotDiscardReason) {
326        self.update("discarded", move |attempt| {
327            attempt.status = BlockProductionStatus::Discarded {
328                discard_reason: reason,
329            };
330            attempt.times.discarded = Some(time);
331            true
332        });
333    }
334
335    /// Returns `true` if this is a block we just produced
336    pub fn is_our_just_produced_block(&self, hash: &BlockHash) -> bool {
337        // For the block to be ours:
338        // - we must have an attempt to produce a block
339        // - we must have just produced the proof for that block
340        // - the hash must match
341        if let Some(attempt) = self.attempts.back() {
342            match (&attempt.status, attempt.block.as_ref()) {
343                (BlockProductionStatus::ProofCreateSuccess, Some(block)) => &block.hash == hash,
344                _ => false,
345            }
346        } else {
347            false
348        }
349    }
350
351    /// In case a new run, when the current epoch has less than `slots_per_epoch` slots to evaluate.
352    pub fn new_epoch_evaluation(&mut self, epoch: u32, remaining_slots: u32) {
353        self.vrf_evaluator.insert(
354            epoch,
355            VrfEvaluatorStats {
356                total_slots: remaining_slots,
357                evaluated_slots: 0,
358            },
359        );
360    }
361
362    pub fn increment_slot_evaluated(&mut self, epoch: u32) {
363        self.vrf_evaluator
364            .entry(epoch)
365            .and_modify(|v| v.evaluated_slots = v.evaluated_slots.checked_add(1).expect("overflow"))
366            .or_insert_with(|| VrfEvaluatorStats {
367                evaluated_slots: 1,
368                ..Default::default()
369            });
370    }
371}
372
373impl From<&BlockProducerWonSlot> for BlockProductionAttemptWonSlot {
374    fn from(won_slot: &BlockProducerWonSlot) -> Self {
375        Self {
376            slot_time: won_slot.slot_time,
377            global_slot: won_slot.global_slot(),
378            epoch: won_slot.epoch(),
379            delegator: won_slot.delegator.clone(),
380            value_with_threshold: won_slot.value_with_threshold,
381        }
382    }
383}
384
385impl From<(&BlockHash, &BlockWithoutProof)> for ProducedBlock {
386    fn from((block_hash, block): (&BlockHash, &BlockWithoutProof)) -> Self {
387        Self {
388            hash: block_hash.clone(),
389            height: block
390                .protocol_state
391                .body
392                .consensus_state
393                .blockchain_length
394                .as_u32(),
395            transactions: block.into(),
396            completed_works_count: block.body.completed_works_count(),
397            coinbase: if block.body.has_coinbase() {
398                openmina_core::constants::constraint_constants().coinbase_amount
399            } else {
400                0
401            },
402            fees: block.body.fees_sum(),
403            snark_fees: block.body.snark_fees_sum(),
404        }
405    }
406}
407
408impl From<&BlockWithoutProof> for ProducedBlockTransactions {
409    fn from(block: &BlockWithoutProof) -> Self {
410        block
411            .body
412            .commands_iter()
413            .fold(Self::default(), |mut res, cmd| {
414                match &cmd.data {
415                    v2::MinaBaseUserCommandStableV2::SignedCommand(v) => match &v.payload.body {
416                        v2::MinaBaseSignedCommandPayloadBodyStableV2::Payment(_) => {
417                            res.payments = res.payments.checked_add(1).expect("overflow")
418                        }
419                        v2::MinaBaseSignedCommandPayloadBodyStableV2::StakeDelegation(_) => {
420                            res.delegations = res.delegations.checked_add(1).expect("overflow")
421                        }
422                    },
423                    v2::MinaBaseUserCommandStableV2::ZkappCommand(_) => {
424                        res.zkapps = res.zkapps.checked_add(1).expect("overflow")
425                    }
426                }
427                res
428            })
429    }
430}