1use std::{fmt, ops::RangeBounds, time::Duration};
2
3use ledger::scan_state::scan_state::{transaction_snark::OneOrTwo, AvailableJobMessage};
4use openmina_core::snark::{Snark, SnarkInfo, SnarkJobCommitment, SnarkJobId};
5use redux::Timestamp;
6use serde::{Deserialize, Serialize};
7
8use crate::{core::distributed_pool::DistributedPool, p2p::PeerId};
9
10use super::{candidate::SnarkPoolCandidatesState, SnarkPoolConfig};
11
12#[derive(Serialize, Deserialize, Clone)]
13pub struct SnarkPoolState {
14 config: SnarkPoolConfig,
15 pool: DistributedPool<JobState, SnarkJobId>,
16 pub candidates: SnarkPoolCandidatesState,
17 pub(super) last_check_timeouts: Timestamp,
18}
19
20#[derive(Serialize, Deserialize, Debug, Clone)]
21pub struct JobState {
22 pub time: Timestamp,
23 pub id: SnarkJobId,
24 pub job: OneOrTwo<AvailableJobMessage>,
25 pub commitment: Option<JobCommitment>,
26 pub snark: Option<SnarkWork>,
27 pub order: usize,
29}
30
31#[derive(Serialize, Deserialize, Debug, Clone)]
32pub struct JobCommitment {
33 pub commitment: SnarkJobCommitment,
34 pub received_t: Timestamp,
35 pub sender: PeerId,
36}
37
38#[derive(Serialize, Deserialize, Debug, Clone)]
39pub struct SnarkWork {
40 pub work: Snark,
41 pub received_t: Timestamp,
42 pub sender: PeerId,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub enum JobSummary {
48 Tx(usize),
49 Merge(usize),
50}
51
52impl Default for SnarkPoolState {
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58impl SnarkPoolState {
59 pub fn new() -> Self {
60 Self {
61 config: SnarkPoolConfig {},
62 pool: Default::default(),
63 candidates: SnarkPoolCandidatesState::new(),
64 last_check_timeouts: Timestamp::ZERO,
65 }
66 }
67
68 pub fn is_empty(&self) -> bool {
69 self.pool.is_empty()
70 }
71
72 pub fn last_index(&self) -> u64 {
73 self.pool.last_index()
74 }
75
76 pub fn contains(&self, id: &SnarkJobId) -> bool {
77 self.pool.contains(id)
78 }
79
80 pub fn get(&self, id: &SnarkJobId) -> Option<&JobState> {
81 self.pool.get(id)
82 }
83
84 pub fn insert(&mut self, job: JobState) {
85 self.pool.insert(job)
86 }
87
88 pub fn remove(&mut self, id: &SnarkJobId) -> Option<JobState> {
89 self.pool.remove(id)
90 }
91
92 pub fn set_snark_work(&mut self, snark: SnarkWork) -> Option<SnarkWork> {
93 self.pool
94 .update(&snark.work.job_id(), move |job| job.snark.replace(snark))?
95 }
96
97 pub fn set_commitment(&mut self, commitment: JobCommitment) -> Option<JobCommitment> {
98 let job_id = commitment.commitment.job_id.clone();
99 self.pool
100 .update(&job_id, move |job| job.commitment.replace(commitment))?
101 }
102
103 pub fn remove_commitment(&mut self, id: &SnarkJobId) -> Option<JobCommitment> {
104 self.pool
105 .silent_update(id, |job_state| job_state.commitment.take())?
106 }
107
108 pub fn retain<F>(&mut self, mut get_new_job_order: F)
109 where
110 F: FnMut(&SnarkJobId) -> Option<usize>,
111 {
112 self.pool
113 .retain_and_update(|id, job| match get_new_job_order(id) {
114 None => false,
115 Some(order) => {
116 job.order = order;
117 true
118 }
119 });
120 }
121
122 pub fn range<R>(&self, range: R) -> impl '_ + DoubleEndedIterator<Item = (u64, &'_ JobState)>
123 where
124 R: RangeBounds<u64>,
125 {
126 self.pool.range(range)
127 }
128
129 pub fn should_create_commitment(&self, job_id: &SnarkJobId) -> bool {
130 self.get(job_id).is_some_and(|s| s.is_available())
131 }
132
133 pub fn is_commitment_timed_out(&self, id: &SnarkJobId, time_now: Timestamp) -> bool {
134 self.get(id)
135 .is_some_and(|job| is_job_commitment_timed_out(job, time_now))
136 }
137
138 pub fn timed_out_commitments_iter(
139 &self,
140 time_now: Timestamp,
141 ) -> impl Iterator<Item = &SnarkJobId> {
142 self.jobs_iter()
143 .filter(move |job| is_job_commitment_timed_out(job, time_now))
144 .map(|job| &job.id)
145 }
146
147 pub fn jobs_iter(&self) -> impl Iterator<Item = &JobState> {
148 self.pool.states()
149 }
150
151 pub fn available_jobs_iter(&self) -> impl Iterator<Item = &JobState> {
152 self.jobs_iter().filter(|job| job.is_available())
153 }
154
155 pub fn available_jobs_with_highest_priority(&self, n: usize) -> Vec<&JobState> {
156 self.available_jobs_iter()
158 .fold(Vec::with_capacity(n.saturating_add(1)), |mut jobs, job| {
159 jobs.push(job);
160 if jobs.len() > n {
161 jobs.sort_by_key(|job| job.order);
162 jobs.pop();
163 }
164 jobs
165 })
166 }
167
168 pub fn completed_snarks_iter(&self) -> impl '_ + Iterator<Item = &'_ Snark> {
169 self.jobs_iter()
170 .filter_map(|job| job.snark.as_ref())
171 .map(|snark| &snark.work)
172 }
173
174 pub(super) fn job_summary(&self, id: &SnarkJobId) -> Option<JobSummary> {
175 self.get(id).map(|job| job.summary())
176 }
177
178 pub fn candidates_prune(&mut self) {
179 self.candidates.retain(|id| {
180 let job = self.pool.get(id);
181 move |candidate| match job {
182 None => false,
183 Some(job) => match job.snark.as_ref() {
184 None => true,
185 Some(snark) => &snark.work < candidate,
186 },
187 }
188 });
189 }
190
191 pub fn next_commitments_to_send(
192 &self,
193 index_and_limit: (u64, u8),
194 ) -> (Vec<SnarkJobCommitment>, u64, u64) {
195 self.pool
196 .next_messages_to_send(index_and_limit, |job| job.commitment_msg().cloned())
197 }
198
199 pub fn next_snarks_to_send(&self, index_and_limit: (u64, u8)) -> (Vec<SnarkInfo>, u64, u64) {
200 self.pool
201 .next_messages_to_send(index_and_limit, |job| job.snark_msg())
202 }
203
204 pub fn resources_usage(&self) -> serde_json::Value {
205 let (size, inconsistency) = self.candidates.check();
206
207 serde_json::json!({
208 "pool_size": self.pool.len(),
209 "candidates_size": size,
210 "candidates_inconsistency": inconsistency,
211 })
212 }
213}
214
215fn is_job_commitment_timed_out(job: &JobState, time_now: Timestamp) -> bool {
216 let Some(commitment) = job.commitment.as_ref() else {
217 return false;
218 };
219
220 let timeout = job.estimated_duration();
221 let passed_time = time_now.checked_sub(commitment.commitment.timestamp());
222 let is_timed_out = passed_time.is_some_and(|dur| dur >= timeout);
223 let didnt_deliver = job
224 .snark
225 .as_ref()
226 .is_none_or(|snark| snark.work < commitment.commitment);
227
228 is_timed_out && didnt_deliver
229}
230
231impl fmt::Debug for SnarkPoolState {
232 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233 f.debug_struct("SnarkPoolState")
234 .field("pool", &self.pool)
235 .finish()
236 }
237}
238
239impl JobState {
240 pub fn is_available(&self) -> bool {
241 self.commitment.is_none() && self.snark.is_none()
242 }
243
244 pub fn commitment_msg(&self) -> Option<&SnarkJobCommitment> {
245 self.commitment.as_ref().map(|v| &v.commitment)
246 }
247
248 pub fn snark_msg(&self) -> Option<SnarkInfo> {
249 self.snark.as_ref().map(|v| v.work.info())
250 }
251
252 pub fn summary(&self) -> JobSummary {
253 use mina_p2p_messages::v2::{
254 MinaTransactionLogicTransactionAppliedCommandAppliedStableV2 as CommandApplied,
255 MinaTransactionLogicTransactionAppliedVaryingStableV2 as Varying,
256 };
257 let account_updates = |job: &_| match job {
258 AvailableJobMessage::Base(base) => match &base.transaction_with_info.varying {
259 Varying::Command(CommandApplied::ZkappCommand(zkapp)) => {
260 zkapp.command.data.account_updates.len()
261 }
262 _ => 1,
263 },
264 AvailableJobMessage::Merge { .. } => 1,
265 };
266 let account_updates = match &self.job {
267 OneOrTwo::One(job) => account_updates(job),
268 OneOrTwo::Two((job1, job2)) => account_updates(job1)
269 .checked_add(account_updates(job2))
270 .expect("overflow"),
271 };
272
273 if matches!(
274 self.job,
275 OneOrTwo::One(AvailableJobMessage::Base(_))
276 | OneOrTwo::Two((AvailableJobMessage::Base(_), _))
277 ) {
278 JobSummary::Tx(account_updates)
279 } else {
280 JobSummary::Merge(account_updates)
281 }
282 }
283
284 pub fn estimated_duration(&self) -> Duration {
285 self.summary().estimated_duration()
286 }
287}
288
289impl AsRef<SnarkJobId> for JobState {
290 fn as_ref(&self) -> &SnarkJobId {
291 &self.id
292 }
293}
294
295impl JobSummary {
296 pub fn estimated_duration(&self) -> Duration {
297 const BASE: Duration = Duration::from_secs(10);
298 const MAX_LATENCY: Duration = Duration::from_secs(10);
299
300 let (JobSummary::Tx(n) | JobSummary::Merge(n)) = self;
301 BASE.saturating_mul(*n as u32).saturating_add(MAX_LATENCY)
302 }
303}