node/snark_pool/
snark_pool_state.rs

1use std::{fmt, ops::RangeBounds, time::Duration};
2
3use ledger::scan_state::scan_state::{transaction_snark::OneOrTwo, AvailableJobMessage};
4use openmina_core::snark::{Snark, SnarkInfo, SnarkJobCommitment, SnarkJobId};
5use redux::Timestamp;
6use serde::{Deserialize, Serialize};
7
8use crate::{core::distributed_pool::DistributedPool, p2p::PeerId};
9
10use super::{candidate::SnarkPoolCandidatesState, SnarkPoolConfig};
11
12#[derive(Serialize, Deserialize, Clone)]
13pub struct SnarkPoolState {
14    config: SnarkPoolConfig,
15    pool: DistributedPool<JobState, SnarkJobId>,
16    pub candidates: SnarkPoolCandidatesState,
17    pub(super) last_check_timeouts: Timestamp,
18}
19
20#[derive(Serialize, Deserialize, Debug, Clone)]
21pub struct JobState {
22    pub time: Timestamp,
23    pub id: SnarkJobId,
24    pub job: OneOrTwo<AvailableJobMessage>,
25    pub commitment: Option<JobCommitment>,
26    pub snark: Option<SnarkWork>,
27    /// Lower order has higher priority to be done as it represents older job.
28    pub order: usize,
29}
30
31#[derive(Serialize, Deserialize, Debug, Clone)]
32pub struct JobCommitment {
33    pub commitment: SnarkJobCommitment,
34    pub received_t: Timestamp,
35    pub sender: PeerId,
36}
37
38#[derive(Serialize, Deserialize, Debug, Clone)]
39pub struct SnarkWork {
40    pub work: Snark,
41    pub received_t: Timestamp,
42    pub sender: PeerId,
43}
44
45/// Whether the job is a merge proof job, or a transaction proof job, with particular number of account updates.
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub enum JobSummary {
48    Tx(usize),
49    Merge(usize),
50}
51
52impl Default for SnarkPoolState {
53    fn default() -> Self {
54        Self::new()
55    }
56}
57
58impl SnarkPoolState {
59    pub fn new() -> Self {
60        Self {
61            config: SnarkPoolConfig {},
62            pool: Default::default(),
63            candidates: SnarkPoolCandidatesState::new(),
64            last_check_timeouts: Timestamp::ZERO,
65        }
66    }
67
68    pub fn is_empty(&self) -> bool {
69        self.pool.is_empty()
70    }
71
72    pub fn last_index(&self) -> u64 {
73        self.pool.last_index()
74    }
75
76    pub fn contains(&self, id: &SnarkJobId) -> bool {
77        self.pool.contains(id)
78    }
79
80    pub fn get(&self, id: &SnarkJobId) -> Option<&JobState> {
81        self.pool.get(id)
82    }
83
84    pub fn insert(&mut self, job: JobState) {
85        self.pool.insert(job)
86    }
87
88    pub fn remove(&mut self, id: &SnarkJobId) -> Option<JobState> {
89        self.pool.remove(id)
90    }
91
92    pub fn set_snark_work(&mut self, snark: SnarkWork) -> Option<SnarkWork> {
93        self.pool
94            .update(&snark.work.job_id(), move |job| job.snark.replace(snark))?
95    }
96
97    pub fn set_commitment(&mut self, commitment: JobCommitment) -> Option<JobCommitment> {
98        let job_id = commitment.commitment.job_id.clone();
99        self.pool
100            .update(&job_id, move |job| job.commitment.replace(commitment))?
101    }
102
103    pub fn remove_commitment(&mut self, id: &SnarkJobId) -> Option<JobCommitment> {
104        self.pool
105            .silent_update(id, |job_state| job_state.commitment.take())?
106    }
107
108    pub fn retain<F>(&mut self, mut get_new_job_order: F)
109    where
110        F: FnMut(&SnarkJobId) -> Option<usize>,
111    {
112        self.pool
113            .retain_and_update(|id, job| match get_new_job_order(id) {
114                None => false,
115                Some(order) => {
116                    job.order = order;
117                    true
118                }
119            });
120    }
121
122    pub fn range<R>(&self, range: R) -> impl '_ + DoubleEndedIterator<Item = (u64, &'_ JobState)>
123    where
124        R: RangeBounds<u64>,
125    {
126        self.pool.range(range)
127    }
128
129    pub fn should_create_commitment(&self, job_id: &SnarkJobId) -> bool {
130        self.get(job_id).is_some_and(|s| s.is_available())
131    }
132
133    pub fn is_commitment_timed_out(&self, id: &SnarkJobId, time_now: Timestamp) -> bool {
134        self.get(id)
135            .is_some_and(|job| is_job_commitment_timed_out(job, time_now))
136    }
137
138    pub fn timed_out_commitments_iter(
139        &self,
140        time_now: Timestamp,
141    ) -> impl Iterator<Item = &SnarkJobId> {
142        self.jobs_iter()
143            .filter(move |job| is_job_commitment_timed_out(job, time_now))
144            .map(|job| &job.id)
145    }
146
147    pub fn jobs_iter(&self) -> impl Iterator<Item = &JobState> {
148        self.pool.states()
149    }
150
151    pub fn available_jobs_iter(&self) -> impl Iterator<Item = &JobState> {
152        self.jobs_iter().filter(|job| job.is_available())
153    }
154
155    pub fn available_jobs_with_highest_priority(&self, n: usize) -> Vec<&JobState> {
156        // find `n` jobs with lowest order (highest priority).
157        self.available_jobs_iter()
158            .fold(Vec::with_capacity(n.saturating_add(1)), |mut jobs, job| {
159                jobs.push(job);
160                if jobs.len() > n {
161                    jobs.sort_by_key(|job| job.order);
162                    jobs.pop();
163                }
164                jobs
165            })
166    }
167
168    pub fn completed_snarks_iter(&self) -> impl '_ + Iterator<Item = &'_ Snark> {
169        self.jobs_iter()
170            .filter_map(|job| job.snark.as_ref())
171            .map(|snark| &snark.work)
172    }
173
174    pub(super) fn job_summary(&self, id: &SnarkJobId) -> Option<JobSummary> {
175        self.get(id).map(|job| job.summary())
176    }
177
178    pub fn candidates_prune(&mut self) {
179        self.candidates.retain(|id| {
180            let job = self.pool.get(id);
181            move |candidate| match job {
182                None => false,
183                Some(job) => match job.snark.as_ref() {
184                    None => true,
185                    Some(snark) => &snark.work < candidate,
186                },
187            }
188        });
189    }
190
191    pub fn next_commitments_to_send(
192        &self,
193        index_and_limit: (u64, u8),
194    ) -> (Vec<SnarkJobCommitment>, u64, u64) {
195        self.pool
196            .next_messages_to_send(index_and_limit, |job| job.commitment_msg().cloned())
197    }
198
199    pub fn next_snarks_to_send(&self, index_and_limit: (u64, u8)) -> (Vec<SnarkInfo>, u64, u64) {
200        self.pool
201            .next_messages_to_send(index_and_limit, |job| job.snark_msg())
202    }
203
204    pub fn resources_usage(&self) -> serde_json::Value {
205        let (size, inconsistency) = self.candidates.check();
206
207        serde_json::json!({
208            "pool_size": self.pool.len(),
209            "candidates_size": size,
210            "candidates_inconsistency": inconsistency,
211        })
212    }
213}
214
215fn is_job_commitment_timed_out(job: &JobState, time_now: Timestamp) -> bool {
216    let Some(commitment) = job.commitment.as_ref() else {
217        return false;
218    };
219
220    let timeout = job.estimated_duration();
221    let passed_time = time_now.checked_sub(commitment.commitment.timestamp());
222    let is_timed_out = passed_time.is_some_and(|dur| dur >= timeout);
223    let didnt_deliver = job
224        .snark
225        .as_ref()
226        .is_none_or(|snark| snark.work < commitment.commitment);
227
228    is_timed_out && didnt_deliver
229}
230
231impl fmt::Debug for SnarkPoolState {
232    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233        f.debug_struct("SnarkPoolState")
234            .field("pool", &self.pool)
235            .finish()
236    }
237}
238
239impl JobState {
240    pub fn is_available(&self) -> bool {
241        self.commitment.is_none() && self.snark.is_none()
242    }
243
244    pub fn commitment_msg(&self) -> Option<&SnarkJobCommitment> {
245        self.commitment.as_ref().map(|v| &v.commitment)
246    }
247
248    pub fn snark_msg(&self) -> Option<SnarkInfo> {
249        self.snark.as_ref().map(|v| v.work.info())
250    }
251
252    pub fn summary(&self) -> JobSummary {
253        use mina_p2p_messages::v2::{
254            MinaTransactionLogicTransactionAppliedCommandAppliedStableV2 as CommandApplied,
255            MinaTransactionLogicTransactionAppliedVaryingStableV2 as Varying,
256        };
257        let account_updates = |job: &_| match job {
258            AvailableJobMessage::Base(base) => match &base.transaction_with_info.varying {
259                Varying::Command(CommandApplied::ZkappCommand(zkapp)) => {
260                    zkapp.command.data.account_updates.len()
261                }
262                _ => 1,
263            },
264            AvailableJobMessage::Merge { .. } => 1,
265        };
266        let account_updates = match &self.job {
267            OneOrTwo::One(job) => account_updates(job),
268            OneOrTwo::Two((job1, job2)) => account_updates(job1)
269                .checked_add(account_updates(job2))
270                .expect("overflow"),
271        };
272
273        if matches!(
274            self.job,
275            OneOrTwo::One(AvailableJobMessage::Base(_))
276                | OneOrTwo::Two((AvailableJobMessage::Base(_), _))
277        ) {
278            JobSummary::Tx(account_updates)
279        } else {
280            JobSummary::Merge(account_updates)
281        }
282    }
283
284    pub fn estimated_duration(&self) -> Duration {
285        self.summary().estimated_duration()
286    }
287}
288
289impl AsRef<SnarkJobId> for JobState {
290    fn as_ref(&self) -> &SnarkJobId {
291        &self.id
292    }
293}
294
295impl JobSummary {
296    pub fn estimated_duration(&self) -> Duration {
297        const BASE: Duration = Duration::from_secs(10);
298        const MAX_LATENCY: Duration = Duration::from_secs(10);
299
300        let (JobSummary::Tx(n) | JobSummary::Merge(n)) = self;
301        BASE.saturating_mul(*n as u32).saturating_add(MAX_LATENCY)
302    }
303}