node/snark_pool/candidate/
snark_pool_candidate_state.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use openmina_core::snark::{Snark, SnarkInfo, SnarkJobId};
4use redux::Timestamp;
5use serde::{Deserialize, Serialize};
6
7use crate::{
8    p2p::{channels::rpc::P2pRpcId, PeerId},
9    snark::work_verify::SnarkWorkVerifyId,
10};
11
12static EMPTY_PEER_WORK_CANDIDATES: BTreeMap<SnarkJobId, SnarkPoolCandidateState> = BTreeMap::new();
13
14#[derive(Serialize, Deserialize, Debug, Clone, Default)]
15pub struct SnarkPoolCandidatesState {
16    by_peer: BTreeMap<PeerId, BTreeMap<SnarkJobId, SnarkPoolCandidateState>>,
17    by_job_id: BTreeMap<SnarkJobId, BTreeSet<PeerId>>,
18}
19
20#[derive(Serialize, Deserialize, Debug, Clone)]
21pub enum SnarkPoolCandidateState {
22    InfoReceived {
23        time: Timestamp,
24        info: SnarkInfo,
25    },
26    WorkFetchPending {
27        time: Timestamp,
28        info: SnarkInfo,
29        rpc_id: P2pRpcId,
30    },
31    WorkReceived {
32        time: Timestamp,
33        work: Snark,
34    },
35    WorkVerifyPending {
36        time: Timestamp,
37        work: Snark,
38        verify_id: SnarkWorkVerifyId,
39    },
40    WorkVerifyError {
41        time: Timestamp,
42        work: Snark,
43    },
44    WorkVerifySuccess {
45        time: Timestamp,
46        work: Snark,
47    },
48}
49
50impl SnarkPoolCandidatesState {
51    pub fn new() -> Self {
52        Self::default()
53    }
54
55    pub fn check(&self) -> (usize, Vec<(PeerId, SnarkJobId)>) {
56        let len = self.by_peer.values().map(BTreeMap::len).sum::<usize>();
57        let lhs = self
58            .by_job_id
59            .iter()
60            .flat_map(|(job_id, v)| v.iter().map(|peer_id| (*peer_id, job_id.clone())))
61            .collect::<BTreeSet<_>>();
62        let rhs = self
63            .by_peer
64            .iter()
65            .flat_map(|(peer_id, v)| v.keys().map(|job_id| (*peer_id, job_id.clone())))
66            .collect::<BTreeSet<_>>();
67        let inconsistency = lhs.symmetric_difference(&rhs).cloned().collect();
68        (len, inconsistency)
69    }
70
71    pub fn peer_work_count(&self, peer_id: &PeerId) -> usize {
72        self.by_peer.get(peer_id).map(|v| v.len()).unwrap_or(0)
73    }
74
75    pub fn get(&self, peer_id: PeerId, job_id: &SnarkJobId) -> Option<&SnarkPoolCandidateState> {
76        self.by_peer.get(&peer_id)?.get(job_id)
77    }
78
79    fn jobs_from_peer_or_empty(
80        &self,
81        peer_id: PeerId,
82    ) -> &BTreeMap<SnarkJobId, SnarkPoolCandidateState> {
83        self.by_peer
84            .get(&peer_id)
85            .unwrap_or(&EMPTY_PEER_WORK_CANDIDATES)
86    }
87
88    pub fn jobs_from_peer_iter(
89        &self,
90        peer_id: PeerId,
91    ) -> impl Iterator<Item = (&SnarkJobId, &SnarkPoolCandidateState)> {
92        self.jobs_from_peer_or_empty(peer_id).iter()
93    }
94
95    pub fn jobs_from_peer_with_job_ids<'a, I>(
96        &'a self,
97        peer_id: PeerId,
98        job_ids: I,
99    ) -> impl Iterator<Item = (&'a SnarkJobId, Option<&'a SnarkPoolCandidateState>)>
100    where
101        I: IntoIterator<Item = &'a SnarkJobId>,
102    {
103        let jobs = self.jobs_from_peer_or_empty(peer_id);
104        job_ids.into_iter().map(|id| (id, jobs.get(id)))
105    }
106
107    pub fn info_received(&mut self, time: Timestamp, peer_id: PeerId, info: SnarkInfo) {
108        self.by_job_id
109            .entry(info.job_id.clone())
110            .or_default()
111            .insert(peer_id);
112
113        let job_id = info.job_id.clone();
114        let state = SnarkPoolCandidateState::InfoReceived { time, info };
115        self.by_peer
116            .entry(peer_id)
117            .or_default()
118            .insert(job_id, state);
119    }
120
121    pub fn peers_next_work_to_fetch<I, F>(
122        &self,
123        peers: I,
124        get_order: F,
125    ) -> Vec<(PeerId, SnarkJobId)>
126    where
127        I: IntoIterator<Item = PeerId>,
128        F: Copy + Fn(&SnarkJobId) -> usize,
129    {
130        let mut needs_fetching = peers
131            .into_iter()
132            .filter_map(|peer_id| Some((peer_id, self.by_peer.get(&peer_id)?)))
133            .flat_map(|(peer_id, jobs)| {
134                jobs.iter()
135                    .filter(|(_, state)| {
136                        matches!(state, SnarkPoolCandidateState::InfoReceived { .. })
137                    })
138                    .map(move |(job_id, state)| (get_order(job_id), state.fee(), peer_id, job_id))
139            })
140            .collect::<Vec<_>>();
141        needs_fetching
142            .sort_by(|(ord1, fee1, ..), (ord2, fee2, ..)| ord1.cmp(ord2).then(fee1.cmp(fee2)));
143
144        needs_fetching
145            .into_iter()
146            .scan(None, |last_ord, (ord, _, peer_id, job_id)| {
147                if *last_ord == Some(ord) {
148                    return Some(None);
149                }
150                *last_ord = Some(ord);
151                Some(Some((peer_id, job_id.clone())))
152            })
153            .flatten()
154            .collect()
155    }
156
157    pub fn work_fetch_pending(
158        &mut self,
159        time: Timestamp,
160        peer_id: &PeerId,
161        job_id: &SnarkJobId,
162        rpc_id: P2pRpcId,
163    ) {
164        if let Some(state) = self
165            .by_peer
166            .get_mut(peer_id)
167            .and_then(|jobs| jobs.get_mut(job_id))
168        {
169            if let SnarkPoolCandidateState::InfoReceived { info, .. } = state {
170                *state = SnarkPoolCandidateState::WorkFetchPending {
171                    time,
172                    info: info.clone(),
173                    rpc_id,
174                };
175            }
176        }
177    }
178
179    pub fn work_received(&mut self, time: Timestamp, peer_id: PeerId, work: Snark) {
180        let job_id = work.job_id();
181        self.by_job_id
182            .entry(job_id.clone())
183            .or_default()
184            .insert(peer_id);
185
186        let state = SnarkPoolCandidateState::WorkReceived { time, work };
187        self.by_peer
188            .entry(peer_id)
189            .or_default()
190            .insert(job_id, state);
191    }
192
193    pub fn get_batch_to_verify<'a, I>(&'a self, job_ids_ordered: I) -> Option<(PeerId, Vec<Snark>)>
194    where
195        I: IntoIterator<Item = &'a SnarkJobId>,
196    {
197        for job_id in job_ids_ordered {
198            if let Some(res) = None.or_else(|| {
199                for peer_id in self.by_job_id.get(job_id)? {
200                    let peer_jobs = self.by_peer.get(peer_id)?;
201                    if peer_jobs.get(job_id)?.work().is_some() {
202                        let jobs = peer_jobs
203                            .iter()
204                            .filter_map(|(_, v)| match v {
205                                SnarkPoolCandidateState::WorkReceived { work, .. } => Some(work),
206                                _ => None,
207                            })
208                            .cloned()
209                            .collect();
210                        return Some((*peer_id, jobs));
211                    }
212                }
213                None
214            }) {
215                return Some(res);
216            }
217        }
218        None
219    }
220
221    pub fn verify_pending(
222        &mut self,
223        time: Timestamp,
224        peer_id: &PeerId,
225        verify_id: SnarkWorkVerifyId,
226        job_ids: &[SnarkJobId],
227    ) {
228        let Some(peer_jobs) = self.by_peer.get_mut(peer_id) else {
229            return;
230        };
231
232        for job_id in job_ids {
233            if let Some(job_state) = peer_jobs.get_mut(job_id) {
234                if let SnarkPoolCandidateState::WorkReceived { work, .. } = job_state {
235                    *job_state = SnarkPoolCandidateState::WorkVerifyPending {
236                        time,
237                        work: work.clone(),
238                        verify_id,
239                    };
240                }
241            }
242        }
243    }
244
245    pub fn verify_result(
246        &mut self,
247        time: Timestamp,
248        peer_id: &PeerId,
249        verify_id: SnarkWorkVerifyId,
250        result: Result<(), ()>,
251    ) {
252        if let Some(peer_jobs) = self.by_peer.get_mut(peer_id) {
253            for (_, job_state) in peer_jobs
254                .iter_mut()
255                .filter(|(_, job_state)| job_state.pending_verify_id() == Some(verify_id))
256            {
257                let SnarkPoolCandidateState::WorkVerifyPending { work, .. } = job_state else {
258                    continue;
259                };
260                match result {
261                    Ok(_) => {
262                        *job_state = SnarkPoolCandidateState::WorkVerifySuccess {
263                            time,
264                            work: work.clone(),
265                        };
266                    }
267                    Err(_) => {
268                        *job_state = SnarkPoolCandidateState::WorkVerifyError {
269                            time,
270                            work: work.clone(),
271                        };
272                    }
273                }
274            }
275        }
276    }
277
278    pub fn peer_remove(&mut self, peer_id: PeerId) {
279        if let Some(works) = self.by_peer.remove(&peer_id) {
280            for job_id in works.into_keys() {
281                if let Some(peers) = self.by_job_id.get_mut(&job_id) {
282                    peers.remove(&peer_id);
283                    if peers.is_empty() {
284                        self.by_job_id.remove(&job_id);
285                    }
286                }
287            }
288        }
289    }
290
291    pub fn peer_work_remove(&mut self, peer_id: PeerId, job_id: &SnarkJobId) {
292        if let Some(works) = self.by_peer.get_mut(&peer_id) {
293            works.remove(job_id);
294            if let Some(peers) = self.by_job_id.get_mut(job_id) {
295                peers.remove(&peer_id);
296                if peers.is_empty() {
297                    self.by_job_id.remove(job_id);
298                }
299            }
300        }
301    }
302
303    pub fn remove_inferior_snarks(&mut self, snark: &Snark) {
304        let job_id = snark.job_id();
305        let by_peer = &mut self.by_peer;
306        if let Some(peers) = self.by_job_id.get_mut(&job_id) {
307            peers.retain(|peer_id| {
308                let Some(peer_works) = by_peer.get_mut(peer_id) else {
309                    return false;
310                };
311                let Some(work) = peer_works.get(&job_id) else {
312                    return false;
313                };
314                if snark >= work {
315                    peer_works.remove(&job_id);
316                    return false;
317                }
318                true
319            });
320            if peers.is_empty() {
321                self.by_job_id.remove(&job_id);
322            }
323        }
324    }
325
326    pub fn retain<F1, F2>(&mut self, mut predicate: F1)
327    where
328        F1: FnMut(&SnarkJobId) -> F2,
329        F2: FnMut(&SnarkPoolCandidateState) -> bool,
330    {
331        let by_peer = &mut self.by_peer;
332        self.by_job_id.retain(|job_id, peers| {
333            let mut predicate = predicate(job_id);
334            peers.retain(|peer_id| {
335                if let Some(peer_works) = by_peer.get_mut(peer_id) {
336                    match peer_works.get(job_id) {
337                        Some(s) if predicate(s) => true,
338                        Some(_) => {
339                            peer_works.remove(job_id);
340                            false
341                        }
342                        None => false,
343                    }
344                } else {
345                    false
346                }
347            });
348            !peers.is_empty()
349        })
350    }
351}
352
353impl SnarkPoolCandidateState {
354    pub fn fee(&self) -> u64 {
355        match self {
356            Self::InfoReceived { info, .. } | Self::WorkFetchPending { info, .. } => {
357                info.fee.0.as_u64()
358            }
359            Self::WorkReceived { work, .. }
360            | Self::WorkVerifyPending { work, .. }
361            | Self::WorkVerifyError { work, .. }
362            | Self::WorkVerifySuccess { work, .. } => work.fee.0.as_u64(),
363        }
364    }
365
366    pub fn work(&self) -> Option<&Snark> {
367        match self {
368            Self::InfoReceived { .. } => None,
369            Self::WorkFetchPending { .. } => None,
370            Self::WorkReceived { work, .. } => Some(work),
371            Self::WorkVerifyPending { work, .. } => Some(work),
372            Self::WorkVerifyError { work, .. } => Some(work),
373            Self::WorkVerifySuccess { work, .. } => Some(work),
374        }
375    }
376
377    pub fn pending_verify_id(&self) -> Option<SnarkWorkVerifyId> {
378        match self {
379            Self::WorkVerifyPending { verify_id, .. } => Some(*verify_id),
380            _ => None,
381        }
382    }
383}
384
385impl<'a> From<&'a SnarkPoolCandidateState> for openmina_core::snark::SnarkCmp<'a> {
386    fn from(value: &'a SnarkPoolCandidateState) -> Self {
387        match value {
388            SnarkPoolCandidateState::InfoReceived { info, .. } => info.into(),
389            SnarkPoolCandidateState::WorkFetchPending { info, .. } => info.into(),
390            SnarkPoolCandidateState::WorkReceived { work, .. } => work.into(),
391            SnarkPoolCandidateState::WorkVerifyPending { work, .. } => work.into(),
392            SnarkPoolCandidateState::WorkVerifyError { work, .. } => work.into(),
393            SnarkPoolCandidateState::WorkVerifySuccess { work, .. } => work.into(),
394        }
395    }
396}