1use std::{fmt, ops::RangeBounds, time::Duration};
2
3use ledger::scan_state::scan_state::{transaction_snark::OneOrTwo, AvailableJobMessage};
4use mina_core::snark::{Snark, SnarkInfo, SnarkJobCommitment, SnarkJobId};
5use redux::Timestamp;
6use serde::{Deserialize, Serialize};
7
8use crate::{core::distributed_pool::DistributedPool, p2p::PeerId};
9
10use super::{candidate::SnarkPoolCandidatesState, SnarkPoolConfig};
11
12#[derive(Serialize, Deserialize, Clone)]
13pub struct SnarkPoolState {
14 config: SnarkPoolConfig,
15 pool: DistributedPool<JobState, SnarkJobId>,
16 pub candidates: SnarkPoolCandidatesState,
17 pub(super) last_check_timeouts: Timestamp,
18}
19
20#[derive(Serialize, Deserialize, Debug, Clone)]
21pub struct JobState {
22 pub time: Timestamp,
23 pub id: SnarkJobId,
24 pub job: OneOrTwo<AvailableJobMessage>,
25 pub commitment: Option<JobCommitment>,
26 pub snark: Option<SnarkWork>,
27 pub order: usize,
29}
30
31#[derive(Serialize, Deserialize, Debug, Clone)]
32#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
33pub struct JobCommitment {
34 pub commitment: SnarkJobCommitment,
35 pub received_t: Timestamp,
36 pub sender: PeerId,
37}
38
39#[derive(Serialize, Deserialize, Debug, Clone)]
40pub struct SnarkWork {
41 pub work: Snark,
42 pub received_t: Timestamp,
43 pub sender: PeerId,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
49pub enum JobSummary {
50 Tx(usize),
51 Merge(usize),
52}
53
54impl Default for SnarkPoolState {
55 fn default() -> Self {
56 Self::new()
57 }
58}
59
60impl SnarkPoolState {
61 pub fn new() -> Self {
62 Self {
63 config: SnarkPoolConfig {},
64 pool: Default::default(),
65 candidates: SnarkPoolCandidatesState::new(),
66 last_check_timeouts: Timestamp::ZERO,
67 }
68 }
69
70 pub fn is_empty(&self) -> bool {
71 self.pool.is_empty()
72 }
73
74 pub fn last_index(&self) -> u64 {
75 self.pool.last_index()
76 }
77
78 pub fn contains(&self, id: &SnarkJobId) -> bool {
79 self.pool.contains(id)
80 }
81
82 pub fn get(&self, id: &SnarkJobId) -> Option<&JobState> {
83 self.pool.get(id)
84 }
85
86 pub fn insert(&mut self, job: JobState) {
87 self.pool.insert(job)
88 }
89
90 pub fn remove(&mut self, id: &SnarkJobId) -> Option<JobState> {
91 self.pool.remove(id)
92 }
93
94 pub fn set_snark_work(&mut self, snark: SnarkWork) -> Option<SnarkWork> {
95 self.pool
96 .update(&snark.work.job_id(), move |job| job.snark.replace(snark))?
97 }
98
99 pub fn set_commitment(&mut self, commitment: JobCommitment) -> Option<JobCommitment> {
100 let job_id = commitment.commitment.job_id.clone();
101 self.pool
102 .update(&job_id, move |job| job.commitment.replace(commitment))?
103 }
104
105 pub fn remove_commitment(&mut self, id: &SnarkJobId) -> Option<JobCommitment> {
106 self.pool
107 .silent_update(id, |job_state| job_state.commitment.take())?
108 }
109
110 pub fn retain<F>(&mut self, mut get_new_job_order: F)
111 where
112 F: FnMut(&SnarkJobId) -> Option<usize>,
113 {
114 self.pool
115 .retain_and_update(|id, job| match get_new_job_order(id) {
116 None => false,
117 Some(order) => {
118 job.order = order;
119 true
120 }
121 });
122 }
123
124 pub fn range<R>(&self, range: R) -> impl '_ + DoubleEndedIterator<Item = (u64, &'_ JobState)>
125 where
126 R: RangeBounds<u64>,
127 {
128 self.pool.range(range)
129 }
130
131 pub fn should_create_commitment(&self, job_id: &SnarkJobId) -> bool {
132 self.get(job_id).is_some_and(|s| s.is_available())
133 }
134
135 pub fn is_commitment_timed_out(&self, id: &SnarkJobId, time_now: Timestamp) -> bool {
136 self.get(id)
137 .is_some_and(|job| is_job_commitment_timed_out(job, time_now))
138 }
139
140 pub fn timed_out_commitments_iter(
141 &self,
142 time_now: Timestamp,
143 ) -> impl Iterator<Item = &SnarkJobId> {
144 self.jobs_iter()
145 .filter(move |job| is_job_commitment_timed_out(job, time_now))
146 .map(|job| &job.id)
147 }
148
149 pub fn jobs_iter(&self) -> impl Iterator<Item = &JobState> {
150 self.pool.states()
151 }
152
153 pub fn available_jobs_iter(&self) -> impl Iterator<Item = &JobState> {
154 self.jobs_iter().filter(|job| job.is_available())
155 }
156
157 pub fn available_jobs_with_highest_priority(&self, n: usize) -> Vec<&JobState> {
158 self.available_jobs_iter()
160 .fold(Vec::with_capacity(n.saturating_add(1)), |mut jobs, job| {
161 jobs.push(job);
162 if jobs.len() > n {
163 jobs.sort_by_key(|job| job.order);
164 jobs.pop();
165 }
166 jobs
167 })
168 }
169
170 pub fn completed_snarks_iter(&self) -> impl '_ + Iterator<Item = &'_ Snark> {
171 self.jobs_iter()
172 .filter_map(|job| job.snark.as_ref())
173 .map(|snark| &snark.work)
174 }
175
176 pub(super) fn job_summary(&self, id: &SnarkJobId) -> Option<JobSummary> {
177 self.get(id).map(|job| job.summary())
178 }
179
180 pub fn candidates_prune(&mut self) {
181 self.candidates.retain(|id| {
182 let job = self.pool.get(id);
183 move |candidate| match job {
184 None => false,
185 Some(job) => match job.snark.as_ref() {
186 None => true,
187 Some(snark) => &snark.work < candidate,
188 },
189 }
190 });
191 }
192
193 pub fn next_commitments_to_send(
194 &self,
195 index_and_limit: (u64, u8),
196 ) -> (Vec<SnarkJobCommitment>, u64, u64) {
197 self.pool
198 .next_messages_to_send(index_and_limit, |job| job.commitment_msg().cloned())
199 }
200
201 pub fn next_snarks_to_send(&self, index_and_limit: (u64, u8)) -> (Vec<SnarkInfo>, u64, u64) {
202 self.pool
203 .next_messages_to_send(index_and_limit, |job| job.snark_msg())
204 }
205
206 pub fn resources_usage(&self) -> serde_json::Value {
207 let (size, inconsistency) = self.candidates.check();
208
209 serde_json::json!({
210 "pool_size": self.pool.len(),
211 "candidates_size": size,
212 "candidates_inconsistency": inconsistency,
213 })
214 }
215}
216
217fn is_job_commitment_timed_out(job: &JobState, time_now: Timestamp) -> bool {
218 let Some(commitment) = job.commitment.as_ref() else {
219 return false;
220 };
221
222 let timeout = job.estimated_duration();
223 let passed_time = time_now.checked_sub(commitment.commitment.timestamp());
224 let is_timed_out = passed_time.is_some_and(|dur| dur >= timeout);
225 let didnt_deliver = job
226 .snark
227 .as_ref()
228 .is_none_or(|snark| snark.work < commitment.commitment);
229
230 is_timed_out && didnt_deliver
231}
232
233impl fmt::Debug for SnarkPoolState {
234 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235 f.debug_struct("SnarkPoolState")
236 .field("pool", &self.pool)
237 .finish()
238 }
239}
240
241impl JobState {
242 pub fn is_available(&self) -> bool {
243 self.commitment.is_none() && self.snark.is_none()
244 }
245
246 pub fn commitment_msg(&self) -> Option<&SnarkJobCommitment> {
247 self.commitment.as_ref().map(|v| &v.commitment)
248 }
249
250 pub fn snark_msg(&self) -> Option<SnarkInfo> {
251 self.snark.as_ref().map(|v| v.work.info())
252 }
253
254 pub fn summary(&self) -> JobSummary {
255 use mina_p2p_messages::v2::{
256 MinaTransactionLogicTransactionAppliedCommandAppliedStableV2 as CommandApplied,
257 MinaTransactionLogicTransactionAppliedVaryingStableV2 as Varying,
258 };
259 let account_updates = |job: &_| match job {
260 AvailableJobMessage::Base(base) => match &base.transaction_with_info.varying {
261 Varying::Command(CommandApplied::ZkappCommand(zkapp)) => {
262 zkapp.command.data.account_updates.len()
263 }
264 _ => 1,
265 },
266 AvailableJobMessage::Merge { .. } => 1,
267 };
268 let account_updates = match &self.job {
269 OneOrTwo::One(job) => account_updates(job),
270 OneOrTwo::Two((job1, job2)) => account_updates(job1)
271 .checked_add(account_updates(job2))
272 .expect("overflow"),
273 };
274
275 if matches!(
276 self.job,
277 OneOrTwo::One(AvailableJobMessage::Base(_))
278 | OneOrTwo::Two((AvailableJobMessage::Base(_), _))
279 ) {
280 JobSummary::Tx(account_updates)
281 } else {
282 JobSummary::Merge(account_updates)
283 }
284 }
285
286 pub fn estimated_duration(&self) -> Duration {
287 self.summary().estimated_duration()
288 }
289}
290
291impl AsRef<SnarkJobId> for JobState {
292 fn as_ref(&self) -> &SnarkJobId {
293 &self.id
294 }
295}
296
297impl JobSummary {
298 pub fn estimated_duration(&self) -> Duration {
299 const BASE: Duration = Duration::from_secs(10);
300 const MAX_LATENCY: Duration = Duration::from_secs(10);
301
302 let (JobSummary::Tx(n) | JobSummary::Merge(n)) = self;
303 BASE.saturating_mul(*n as u32).saturating_add(MAX_LATENCY)
304 }
305}