Skip to main content

mina_node/snark_pool/
snark_pool_state.rs

1use std::{fmt, ops::RangeBounds, time::Duration};
2
3use ledger::scan_state::scan_state::{transaction_snark::OneOrTwo, AvailableJobMessage};
4use mina_core::snark::{Snark, SnarkInfo, SnarkJobCommitment, SnarkJobId};
5use redux::Timestamp;
6use serde::{Deserialize, Serialize};
7
8use crate::{core::distributed_pool::DistributedPool, p2p::PeerId};
9
10use super::{candidate::SnarkPoolCandidatesState, SnarkPoolConfig};
11
12#[derive(Serialize, Deserialize, Clone)]
13pub struct SnarkPoolState {
14    config: SnarkPoolConfig,
15    pool: DistributedPool<JobState, SnarkJobId>,
16    pub candidates: SnarkPoolCandidatesState,
17    pub(super) last_check_timeouts: Timestamp,
18}
19
20#[derive(Serialize, Deserialize, Debug, Clone)]
21pub struct JobState {
22    pub time: Timestamp,
23    pub id: SnarkJobId,
24    pub job: OneOrTwo<AvailableJobMessage>,
25    pub commitment: Option<JobCommitment>,
26    pub snark: Option<SnarkWork>,
27    /// Lower order has higher priority to be done as it represents older job.
28    pub order: usize,
29}
30
31#[derive(Serialize, Deserialize, Debug, Clone)]
32#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
33pub struct JobCommitment {
34    pub commitment: SnarkJobCommitment,
35    pub received_t: Timestamp,
36    pub sender: PeerId,
37}
38
39#[derive(Serialize, Deserialize, Debug, Clone)]
40pub struct SnarkWork {
41    pub work: Snark,
42    pub received_t: Timestamp,
43    pub sender: PeerId,
44}
45
46/// Whether the job is a merge proof job, or a transaction proof job, with particular number of account updates.
47#[derive(Debug, Clone, Serialize, Deserialize)]
48#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
49pub enum JobSummary {
50    Tx(usize),
51    Merge(usize),
52}
53
54impl Default for SnarkPoolState {
55    fn default() -> Self {
56        Self::new()
57    }
58}
59
60impl SnarkPoolState {
61    pub fn new() -> Self {
62        Self {
63            config: SnarkPoolConfig {},
64            pool: Default::default(),
65            candidates: SnarkPoolCandidatesState::new(),
66            last_check_timeouts: Timestamp::ZERO,
67        }
68    }
69
70    pub fn is_empty(&self) -> bool {
71        self.pool.is_empty()
72    }
73
74    pub fn last_index(&self) -> u64 {
75        self.pool.last_index()
76    }
77
78    pub fn contains(&self, id: &SnarkJobId) -> bool {
79        self.pool.contains(id)
80    }
81
82    pub fn get(&self, id: &SnarkJobId) -> Option<&JobState> {
83        self.pool.get(id)
84    }
85
86    pub fn insert(&mut self, job: JobState) {
87        self.pool.insert(job)
88    }
89
90    pub fn remove(&mut self, id: &SnarkJobId) -> Option<JobState> {
91        self.pool.remove(id)
92    }
93
94    pub fn set_snark_work(&mut self, snark: SnarkWork) -> Option<SnarkWork> {
95        self.pool
96            .update(&snark.work.job_id(), move |job| job.snark.replace(snark))?
97    }
98
99    pub fn set_commitment(&mut self, commitment: JobCommitment) -> Option<JobCommitment> {
100        let job_id = commitment.commitment.job_id.clone();
101        self.pool
102            .update(&job_id, move |job| job.commitment.replace(commitment))?
103    }
104
105    pub fn remove_commitment(&mut self, id: &SnarkJobId) -> Option<JobCommitment> {
106        self.pool
107            .silent_update(id, |job_state| job_state.commitment.take())?
108    }
109
110    pub fn retain<F>(&mut self, mut get_new_job_order: F)
111    where
112        F: FnMut(&SnarkJobId) -> Option<usize>,
113    {
114        self.pool
115            .retain_and_update(|id, job| match get_new_job_order(id) {
116                None => false,
117                Some(order) => {
118                    job.order = order;
119                    true
120                }
121            });
122    }
123
124    pub fn range<R>(&self, range: R) -> impl '_ + DoubleEndedIterator<Item = (u64, &'_ JobState)>
125    where
126        R: RangeBounds<u64>,
127    {
128        self.pool.range(range)
129    }
130
131    pub fn should_create_commitment(&self, job_id: &SnarkJobId) -> bool {
132        self.get(job_id).is_some_and(|s| s.is_available())
133    }
134
135    pub fn is_commitment_timed_out(&self, id: &SnarkJobId, time_now: Timestamp) -> bool {
136        self.get(id)
137            .is_some_and(|job| is_job_commitment_timed_out(job, time_now))
138    }
139
140    pub fn timed_out_commitments_iter(
141        &self,
142        time_now: Timestamp,
143    ) -> impl Iterator<Item = &SnarkJobId> {
144        self.jobs_iter()
145            .filter(move |job| is_job_commitment_timed_out(job, time_now))
146            .map(|job| &job.id)
147    }
148
149    pub fn jobs_iter(&self) -> impl Iterator<Item = &JobState> {
150        self.pool.states()
151    }
152
153    pub fn available_jobs_iter(&self) -> impl Iterator<Item = &JobState> {
154        self.jobs_iter().filter(|job| job.is_available())
155    }
156
157    pub fn available_jobs_with_highest_priority(&self, n: usize) -> Vec<&JobState> {
158        // find `n` jobs with lowest order (highest priority).
159        self.available_jobs_iter()
160            .fold(Vec::with_capacity(n.saturating_add(1)), |mut jobs, job| {
161                jobs.push(job);
162                if jobs.len() > n {
163                    jobs.sort_by_key(|job| job.order);
164                    jobs.pop();
165                }
166                jobs
167            })
168    }
169
170    pub fn completed_snarks_iter(&self) -> impl '_ + Iterator<Item = &'_ Snark> {
171        self.jobs_iter()
172            .filter_map(|job| job.snark.as_ref())
173            .map(|snark| &snark.work)
174    }
175
176    pub(super) fn job_summary(&self, id: &SnarkJobId) -> Option<JobSummary> {
177        self.get(id).map(|job| job.summary())
178    }
179
180    pub fn candidates_prune(&mut self) {
181        self.candidates.retain(|id| {
182            let job = self.pool.get(id);
183            move |candidate| match job {
184                None => false,
185                Some(job) => match job.snark.as_ref() {
186                    None => true,
187                    Some(snark) => &snark.work < candidate,
188                },
189            }
190        });
191    }
192
193    pub fn next_commitments_to_send(
194        &self,
195        index_and_limit: (u64, u8),
196    ) -> (Vec<SnarkJobCommitment>, u64, u64) {
197        self.pool
198            .next_messages_to_send(index_and_limit, |job| job.commitment_msg().cloned())
199    }
200
201    pub fn next_snarks_to_send(&self, index_and_limit: (u64, u8)) -> (Vec<SnarkInfo>, u64, u64) {
202        self.pool
203            .next_messages_to_send(index_and_limit, |job| job.snark_msg())
204    }
205
206    pub fn resources_usage(&self) -> serde_json::Value {
207        let (size, inconsistency) = self.candidates.check();
208
209        serde_json::json!({
210            "pool_size": self.pool.len(),
211            "candidates_size": size,
212            "candidates_inconsistency": inconsistency,
213        })
214    }
215}
216
217fn is_job_commitment_timed_out(job: &JobState, time_now: Timestamp) -> bool {
218    let Some(commitment) = job.commitment.as_ref() else {
219        return false;
220    };
221
222    let timeout = job.estimated_duration();
223    let passed_time = time_now.checked_sub(commitment.commitment.timestamp());
224    let is_timed_out = passed_time.is_some_and(|dur| dur >= timeout);
225    let didnt_deliver = job
226        .snark
227        .as_ref()
228        .is_none_or(|snark| snark.work < commitment.commitment);
229
230    is_timed_out && didnt_deliver
231}
232
233impl fmt::Debug for SnarkPoolState {
234    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235        f.debug_struct("SnarkPoolState")
236            .field("pool", &self.pool)
237            .finish()
238    }
239}
240
241impl JobState {
242    pub fn is_available(&self) -> bool {
243        self.commitment.is_none() && self.snark.is_none()
244    }
245
246    pub fn commitment_msg(&self) -> Option<&SnarkJobCommitment> {
247        self.commitment.as_ref().map(|v| &v.commitment)
248    }
249
250    pub fn snark_msg(&self) -> Option<SnarkInfo> {
251        self.snark.as_ref().map(|v| v.work.info())
252    }
253
254    pub fn summary(&self) -> JobSummary {
255        use mina_p2p_messages::v2::{
256            MinaTransactionLogicTransactionAppliedCommandAppliedStableV2 as CommandApplied,
257            MinaTransactionLogicTransactionAppliedVaryingStableV2 as Varying,
258        };
259        let account_updates = |job: &_| match job {
260            AvailableJobMessage::Base(base) => match &base.transaction_with_info.varying {
261                Varying::Command(CommandApplied::ZkappCommand(zkapp)) => {
262                    zkapp.command.data.account_updates.len()
263                }
264                _ => 1,
265            },
266            AvailableJobMessage::Merge { .. } => 1,
267        };
268        let account_updates = match &self.job {
269            OneOrTwo::One(job) => account_updates(job),
270            OneOrTwo::Two((job1, job2)) => account_updates(job1)
271                .checked_add(account_updates(job2))
272                .expect("overflow"),
273        };
274
275        if matches!(
276            self.job,
277            OneOrTwo::One(AvailableJobMessage::Base(_))
278                | OneOrTwo::Two((AvailableJobMessage::Base(_), _))
279        ) {
280            JobSummary::Tx(account_updates)
281        } else {
282            JobSummary::Merge(account_updates)
283        }
284    }
285
286    pub fn estimated_duration(&self) -> Duration {
287        self.summary().estimated_duration()
288    }
289}
290
291impl AsRef<SnarkJobId> for JobState {
292    fn as_ref(&self) -> &SnarkJobId {
293        &self.id
294    }
295}
296
297impl JobSummary {
298    pub fn estimated_duration(&self) -> Duration {
299        const BASE: Duration = Duration::from_secs(10);
300        const MAX_LATENCY: Duration = Duration::from_secs(10);
301
302        let (JobSummary::Tx(n) | JobSummary::Merge(n)) = self;
303        BASE.saturating_mul(*n as u32).saturating_add(MAX_LATENCY)
304    }
305}