1use std::collections::{BTreeMap, BTreeSet};
2
3use openmina_core::snark::{Snark, SnarkInfo, SnarkJobId};
4use redux::Timestamp;
5use serde::{Deserialize, Serialize};
6
7use crate::{
8 p2p::{channels::rpc::P2pRpcId, PeerId},
9 snark::work_verify::SnarkWorkVerifyId,
10};
11
12static EMPTY_PEER_WORK_CANDIDATES: BTreeMap<SnarkJobId, SnarkPoolCandidateState> = BTreeMap::new();
13
14#[derive(Serialize, Deserialize, Debug, Clone, Default)]
15pub struct SnarkPoolCandidatesState {
16 by_peer: BTreeMap<PeerId, BTreeMap<SnarkJobId, SnarkPoolCandidateState>>,
17 by_job_id: BTreeMap<SnarkJobId, BTreeSet<PeerId>>,
18}
19
20#[derive(Serialize, Deserialize, Debug, Clone)]
21pub enum SnarkPoolCandidateState {
22 InfoReceived {
23 time: Timestamp,
24 info: SnarkInfo,
25 },
26 WorkFetchPending {
27 time: Timestamp,
28 info: SnarkInfo,
29 rpc_id: P2pRpcId,
30 },
31 WorkReceived {
32 time: Timestamp,
33 work: Snark,
34 },
35 WorkVerifyPending {
36 time: Timestamp,
37 work: Snark,
38 verify_id: SnarkWorkVerifyId,
39 },
40 WorkVerifyError {
41 time: Timestamp,
42 work: Snark,
43 },
44 WorkVerifySuccess {
45 time: Timestamp,
46 work: Snark,
47 },
48}
49
50impl SnarkPoolCandidatesState {
51 pub fn new() -> Self {
52 Self::default()
53 }
54
55 pub fn check(&self) -> (usize, Vec<(PeerId, SnarkJobId)>) {
56 let len = self.by_peer.values().map(BTreeMap::len).sum::<usize>();
57 let lhs = self
58 .by_job_id
59 .iter()
60 .flat_map(|(job_id, v)| v.iter().map(|peer_id| (*peer_id, job_id.clone())))
61 .collect::<BTreeSet<_>>();
62 let rhs = self
63 .by_peer
64 .iter()
65 .flat_map(|(peer_id, v)| v.keys().map(|job_id| (*peer_id, job_id.clone())))
66 .collect::<BTreeSet<_>>();
67 let inconsistency = lhs.symmetric_difference(&rhs).cloned().collect();
68 (len, inconsistency)
69 }
70
71 pub fn peer_work_count(&self, peer_id: &PeerId) -> usize {
72 self.by_peer.get(peer_id).map(|v| v.len()).unwrap_or(0)
73 }
74
75 pub fn get(&self, peer_id: PeerId, job_id: &SnarkJobId) -> Option<&SnarkPoolCandidateState> {
76 self.by_peer.get(&peer_id)?.get(job_id)
77 }
78
79 fn jobs_from_peer_or_empty(
80 &self,
81 peer_id: PeerId,
82 ) -> &BTreeMap<SnarkJobId, SnarkPoolCandidateState> {
83 self.by_peer
84 .get(&peer_id)
85 .unwrap_or(&EMPTY_PEER_WORK_CANDIDATES)
86 }
87
88 pub fn jobs_from_peer_iter(
89 &self,
90 peer_id: PeerId,
91 ) -> impl Iterator<Item = (&SnarkJobId, &SnarkPoolCandidateState)> {
92 self.jobs_from_peer_or_empty(peer_id).iter()
93 }
94
95 pub fn jobs_from_peer_with_job_ids<'a, I>(
96 &'a self,
97 peer_id: PeerId,
98 job_ids: I,
99 ) -> impl Iterator<Item = (&'a SnarkJobId, Option<&'a SnarkPoolCandidateState>)>
100 where
101 I: IntoIterator<Item = &'a SnarkJobId>,
102 {
103 let jobs = self.jobs_from_peer_or_empty(peer_id);
104 job_ids.into_iter().map(|id| (id, jobs.get(id)))
105 }
106
107 pub fn info_received(&mut self, time: Timestamp, peer_id: PeerId, info: SnarkInfo) {
108 self.by_job_id
109 .entry(info.job_id.clone())
110 .or_default()
111 .insert(peer_id);
112
113 let job_id = info.job_id.clone();
114 let state = SnarkPoolCandidateState::InfoReceived { time, info };
115 self.by_peer
116 .entry(peer_id)
117 .or_default()
118 .insert(job_id, state);
119 }
120
121 pub fn peers_next_work_to_fetch<I, F>(
122 &self,
123 peers: I,
124 get_order: F,
125 ) -> Vec<(PeerId, SnarkJobId)>
126 where
127 I: IntoIterator<Item = PeerId>,
128 F: Copy + Fn(&SnarkJobId) -> usize,
129 {
130 let mut needs_fetching = peers
131 .into_iter()
132 .filter_map(|peer_id| Some((peer_id, self.by_peer.get(&peer_id)?)))
133 .flat_map(|(peer_id, jobs)| {
134 jobs.iter()
135 .filter(|(_, state)| {
136 matches!(state, SnarkPoolCandidateState::InfoReceived { .. })
137 })
138 .map(move |(job_id, state)| (get_order(job_id), state.fee(), peer_id, job_id))
139 })
140 .collect::<Vec<_>>();
141 needs_fetching
142 .sort_by(|(ord1, fee1, ..), (ord2, fee2, ..)| ord1.cmp(ord2).then(fee1.cmp(fee2)));
143
144 needs_fetching
145 .into_iter()
146 .scan(None, |last_ord, (ord, _, peer_id, job_id)| {
147 if *last_ord == Some(ord) {
148 return Some(None);
149 }
150 *last_ord = Some(ord);
151 Some(Some((peer_id, job_id.clone())))
152 })
153 .flatten()
154 .collect()
155 }
156
157 pub fn work_fetch_pending(
158 &mut self,
159 time: Timestamp,
160 peer_id: &PeerId,
161 job_id: &SnarkJobId,
162 rpc_id: P2pRpcId,
163 ) {
164 if let Some(state) = self
165 .by_peer
166 .get_mut(peer_id)
167 .and_then(|jobs| jobs.get_mut(job_id))
168 {
169 if let SnarkPoolCandidateState::InfoReceived { info, .. } = state {
170 *state = SnarkPoolCandidateState::WorkFetchPending {
171 time,
172 info: info.clone(),
173 rpc_id,
174 };
175 }
176 }
177 }
178
179 pub fn work_received(&mut self, time: Timestamp, peer_id: PeerId, work: Snark) {
180 let job_id = work.job_id();
181 self.by_job_id
182 .entry(job_id.clone())
183 .or_default()
184 .insert(peer_id);
185
186 let state = SnarkPoolCandidateState::WorkReceived { time, work };
187 self.by_peer
188 .entry(peer_id)
189 .or_default()
190 .insert(job_id, state);
191 }
192
193 pub fn get_batch_to_verify<'a, I>(&'a self, job_ids_ordered: I) -> Option<(PeerId, Vec<Snark>)>
194 where
195 I: IntoIterator<Item = &'a SnarkJobId>,
196 {
197 for job_id in job_ids_ordered {
198 if let Some(res) = None.or_else(|| {
199 for peer_id in self.by_job_id.get(job_id)? {
200 let peer_jobs = self.by_peer.get(peer_id)?;
201 if peer_jobs.get(job_id)?.work().is_some() {
202 let jobs = peer_jobs
203 .iter()
204 .filter_map(|(_, v)| match v {
205 SnarkPoolCandidateState::WorkReceived { work, .. } => Some(work),
206 _ => None,
207 })
208 .cloned()
209 .collect();
210 return Some((*peer_id, jobs));
211 }
212 }
213 None
214 }) {
215 return Some(res);
216 }
217 }
218 None
219 }
220
221 pub fn verify_pending(
222 &mut self,
223 time: Timestamp,
224 peer_id: &PeerId,
225 verify_id: SnarkWorkVerifyId,
226 job_ids: &[SnarkJobId],
227 ) {
228 let Some(peer_jobs) = self.by_peer.get_mut(peer_id) else {
229 return;
230 };
231
232 for job_id in job_ids {
233 if let Some(job_state) = peer_jobs.get_mut(job_id) {
234 if let SnarkPoolCandidateState::WorkReceived { work, .. } = job_state {
235 *job_state = SnarkPoolCandidateState::WorkVerifyPending {
236 time,
237 work: work.clone(),
238 verify_id,
239 };
240 }
241 }
242 }
243 }
244
245 pub fn verify_result(
246 &mut self,
247 time: Timestamp,
248 peer_id: &PeerId,
249 verify_id: SnarkWorkVerifyId,
250 result: Result<(), ()>,
251 ) {
252 if let Some(peer_jobs) = self.by_peer.get_mut(peer_id) {
253 for (_, job_state) in peer_jobs
254 .iter_mut()
255 .filter(|(_, job_state)| job_state.pending_verify_id() == Some(verify_id))
256 {
257 let SnarkPoolCandidateState::WorkVerifyPending { work, .. } = job_state else {
258 continue;
259 };
260 match result {
261 Ok(_) => {
262 *job_state = SnarkPoolCandidateState::WorkVerifySuccess {
263 time,
264 work: work.clone(),
265 };
266 }
267 Err(_) => {
268 *job_state = SnarkPoolCandidateState::WorkVerifyError {
269 time,
270 work: work.clone(),
271 };
272 }
273 }
274 }
275 }
276 }
277
278 pub fn peer_remove(&mut self, peer_id: PeerId) {
279 if let Some(works) = self.by_peer.remove(&peer_id) {
280 for job_id in works.into_keys() {
281 if let Some(peers) = self.by_job_id.get_mut(&job_id) {
282 peers.remove(&peer_id);
283 if peers.is_empty() {
284 self.by_job_id.remove(&job_id);
285 }
286 }
287 }
288 }
289 }
290
291 pub fn peer_work_remove(&mut self, peer_id: PeerId, job_id: &SnarkJobId) {
292 if let Some(works) = self.by_peer.get_mut(&peer_id) {
293 works.remove(job_id);
294 if let Some(peers) = self.by_job_id.get_mut(job_id) {
295 peers.remove(&peer_id);
296 if peers.is_empty() {
297 self.by_job_id.remove(job_id);
298 }
299 }
300 }
301 }
302
303 pub fn remove_inferior_snarks(&mut self, snark: &Snark) {
304 let job_id = snark.job_id();
305 let by_peer = &mut self.by_peer;
306 if let Some(peers) = self.by_job_id.get_mut(&job_id) {
307 peers.retain(|peer_id| {
308 let Some(peer_works) = by_peer.get_mut(peer_id) else {
309 return false;
310 };
311 let Some(work) = peer_works.get(&job_id) else {
312 return false;
313 };
314 if snark >= work {
315 peer_works.remove(&job_id);
316 return false;
317 }
318 true
319 });
320 if peers.is_empty() {
321 self.by_job_id.remove(&job_id);
322 }
323 }
324 }
325
326 pub fn retain<F1, F2>(&mut self, mut predicate: F1)
327 where
328 F1: FnMut(&SnarkJobId) -> F2,
329 F2: FnMut(&SnarkPoolCandidateState) -> bool,
330 {
331 let by_peer = &mut self.by_peer;
332 self.by_job_id.retain(|job_id, peers| {
333 let mut predicate = predicate(job_id);
334 peers.retain(|peer_id| {
335 if let Some(peer_works) = by_peer.get_mut(peer_id) {
336 match peer_works.get(job_id) {
337 Some(s) if predicate(s) => true,
338 Some(_) => {
339 peer_works.remove(job_id);
340 false
341 }
342 None => false,
343 }
344 } else {
345 false
346 }
347 });
348 !peers.is_empty()
349 })
350 }
351}
352
353impl SnarkPoolCandidateState {
354 pub fn fee(&self) -> u64 {
355 match self {
356 Self::InfoReceived { info, .. } | Self::WorkFetchPending { info, .. } => {
357 info.fee.0.as_u64()
358 }
359 Self::WorkReceived { work, .. }
360 | Self::WorkVerifyPending { work, .. }
361 | Self::WorkVerifyError { work, .. }
362 | Self::WorkVerifySuccess { work, .. } => work.fee.0.as_u64(),
363 }
364 }
365
366 pub fn work(&self) -> Option<&Snark> {
367 match self {
368 Self::InfoReceived { .. } => None,
369 Self::WorkFetchPending { .. } => None,
370 Self::WorkReceived { work, .. } => Some(work),
371 Self::WorkVerifyPending { work, .. } => Some(work),
372 Self::WorkVerifyError { work, .. } => Some(work),
373 Self::WorkVerifySuccess { work, .. } => Some(work),
374 }
375 }
376
377 pub fn pending_verify_id(&self) -> Option<SnarkWorkVerifyId> {
378 match self {
379 Self::WorkVerifyPending { verify_id, .. } => Some(*verify_id),
380 _ => None,
381 }
382 }
383}
384
385impl<'a> From<&'a SnarkPoolCandidateState> for openmina_core::snark::SnarkCmp<'a> {
386 fn from(value: &'a SnarkPoolCandidateState) -> Self {
387 match value {
388 SnarkPoolCandidateState::InfoReceived { info, .. } => info.into(),
389 SnarkPoolCandidateState::WorkFetchPending { info, .. } => info.into(),
390 SnarkPoolCandidateState::WorkReceived { work, .. } => work.into(),
391 SnarkPoolCandidateState::WorkVerifyPending { work, .. } => work.into(),
392 SnarkPoolCandidateState::WorkVerifyError { work, .. } => work.into(),
393 SnarkPoolCandidateState::WorkVerifySuccess { work, .. } => work.into(),
394 }
395 }
396}