node/snark_pool/candidate/
snark_pool_candidate_actions.rs

1use std::cmp::Ordering;
2
3use openmina_core::{
4    snark::{Snark, SnarkInfo, SnarkJobId},
5    ActionEvent,
6};
7use serde::{Deserialize, Serialize};
8
9use crate::{
10    p2p::{channels::rpc::P2pRpcId, PeerId},
11    snark::work_verify::SnarkWorkVerifyId,
12};
13
14use super::SnarkPoolCandidateState;
15
16pub type SnarkPoolCandidateActionWithMeta = redux::ActionWithMeta<SnarkPoolCandidateAction>;
17pub type SnarkPoolCandidateActionWithMetaRef<'a> =
18    redux::ActionWithMeta<&'a SnarkPoolCandidateAction>;
19
20#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
21pub enum SnarkPoolCandidateAction {
22    InfoReceived {
23        peer_id: PeerId,
24        info: SnarkInfo,
25    },
26    #[action_event(level = trace)]
27    WorkFetchAll,
28    WorkFetchInit {
29        peer_id: PeerId,
30        job_id: SnarkJobId,
31    },
32    WorkFetchPending {
33        peer_id: PeerId,
34        job_id: SnarkJobId,
35        rpc_id: P2pRpcId,
36    },
37    WorkFetchError {
38        peer_id: PeerId,
39        job_id: SnarkJobId,
40    },
41    WorkFetchSuccess {
42        peer_id: PeerId,
43        work: Snark,
44    },
45    #[action_event(level = trace)]
46    WorkVerifyNext,
47    WorkVerifyPending {
48        peer_id: PeerId,
49        job_ids: Vec<SnarkJobId>,
50        verify_id: SnarkWorkVerifyId,
51    },
52    WorkVerifyError {
53        peer_id: PeerId,
54        verify_id: SnarkWorkVerifyId,
55        batch: Vec<SnarkJobId>,
56    },
57    WorkVerifySuccess {
58        peer_id: PeerId,
59        verify_id: SnarkWorkVerifyId,
60        batch: Vec<Snark>,
61    },
62    PeerPrune {
63        peer_id: PeerId,
64    },
65}
66
67impl redux::EnablingCondition<crate::State> for SnarkPoolCandidateAction {
68    fn is_enabled(&self, state: &crate::State, _time: redux::Timestamp) -> bool {
69        match self {
70            SnarkPoolCandidateAction::InfoReceived { peer_id, info } => {
71                state.snark_pool.contains(&info.job_id)
72                    && state
73                        .snark_pool
74                        .candidates
75                        .get(*peer_id, &info.job_id)
76                        .is_none_or(|v| info > v)
77            }
78            SnarkPoolCandidateAction::WorkFetchAll => state.p2p.ready().is_some(),
79            SnarkPoolCandidateAction::WorkFetchInit { peer_id, job_id } => {
80                let is_peer_available = state
81                    .p2p
82                    .get_ready_peer(peer_id)
83                    .is_some_and(|peer| peer.channels.rpc.can_send_request());
84                is_peer_available
85                    && state
86                        .snark_pool
87                        .candidates
88                        .get(*peer_id, job_id)
89                        .is_some_and(|s| matches!(s, SnarkPoolCandidateState::InfoReceived { .. }))
90            }
91            SnarkPoolCandidateAction::WorkFetchPending {
92                peer_id, job_id, ..
93            } => state
94                .snark_pool
95                .candidates
96                .get(*peer_id, job_id)
97                .is_some_and(|s| matches!(s, SnarkPoolCandidateState::InfoReceived { .. })),
98            SnarkPoolCandidateAction::WorkFetchError { peer_id, job_id } => state
99                .snark_pool
100                .candidates
101                .get(*peer_id, job_id)
102                .is_some_and(|s| matches!(s, SnarkPoolCandidateState::WorkFetchPending { .. })),
103            SnarkPoolCandidateAction::WorkFetchSuccess { peer_id, work } => {
104                let job_id = work.job_id();
105                state.snark_pool.contains(&job_id)
106                    && state
107                        .snark_pool
108                        .candidates
109                        .get(*peer_id, &job_id)
110                        .is_none_or(|v| match work.partial_cmp(v).unwrap() {
111                            Ordering::Less => false,
112                            Ordering::Greater => true,
113                            Ordering::Equal => {
114                                matches!(v, SnarkPoolCandidateState::WorkFetchPending { .. })
115                            }
116                        })
117            }
118            SnarkPoolCandidateAction::WorkVerifyNext => {
119                state.snark.work_verify.jobs.is_empty()
120                    && state.transition_frontier.sync.is_synced()
121            }
122            SnarkPoolCandidateAction::WorkVerifyPending {
123                peer_id, job_ids, ..
124            } => {
125                !job_ids.is_empty()
126                    && state
127                        .snark_pool
128                        .candidates
129                        .jobs_from_peer_with_job_ids(*peer_id, job_ids)
130                        .all(|(_, state)| {
131                            matches!(state, Some(SnarkPoolCandidateState::WorkReceived { .. }))
132                        })
133            }
134            SnarkPoolCandidateAction::WorkVerifyError { .. } => {
135                // TODO(binier)
136                true
137            }
138            SnarkPoolCandidateAction::WorkVerifySuccess { .. } => {
139                // TODO(binier)
140                true
141            }
142            SnarkPoolCandidateAction::PeerPrune { peer_id } => {
143                state.snark_pool.candidates.peer_work_count(peer_id) > 0
144            }
145        }
146    }
147}
148
149use crate::snark_pool::SnarkPoolAction;
150
151impl From<SnarkPoolCandidateAction> for crate::Action {
152    fn from(value: SnarkPoolCandidateAction) -> Self {
153        Self::SnarkPool(SnarkPoolAction::Candidate(value))
154    }
155}