node/snark_pool/
snark_pool_reducer.rs1use std::collections::BTreeMap;
2
3use crate::{snark_pool::JobCommitment, ExternalSnarkWorkerAction, SnarkerStrategy};
4use openmina_core::snark::{SnarkJobCommitment, SnarkJobId};
5use p2p::channels::{
6 snark::P2pChannelsSnarkAction, snark_job_commitment::P2pChannelsSnarkJobCommitmentAction,
7};
8
9use super::{
10 JobState, SnarkPoolAction, SnarkPoolActionWithMetaRef, SnarkPoolEffectfulAction,
11 SnarkPoolState, SnarkWork,
12};
13
14impl SnarkPoolState {
15 pub fn reducer(
16 mut state_context: crate::Substate<Self>,
17 action: SnarkPoolActionWithMetaRef<'_>,
18 ) {
19 let Ok(state) = state_context.get_substate_mut() else {
20 return;
22 };
23 let (action, meta) = action.split();
24
25 match action {
26 SnarkPoolAction::Candidate(action) => {
27 super::candidate::SnarkPoolCandidatesState::reducer(
28 crate::Substate::from_compatible_substate(state_context),
29 meta.with_action(action),
30 );
31 }
32 SnarkPoolAction::JobsUpdate {
33 jobs,
34 orphaned_snarks,
35 } => {
36 let mut jobs_map = jobs
37 .iter()
38 .enumerate()
39 .map(|(index, job)| (SnarkJobId::from(job), (index, job.clone())))
40 .collect::<BTreeMap<_, _>>();
41
42 state.retain(|id| jobs_map.remove(id).map(|(order, _)| order));
43 for (id, (order, job)) in jobs_map {
44 state.insert(JobState {
45 time: meta.time(),
46 id,
47 job,
48 commitment: None,
49 snark: None,
50 order,
51 });
52 }
53
54 let orphaned_snarks = orphaned_snarks
55 .iter()
56 .map(|snark| (snark.work.job_id(), snark.clone()));
57
58 for (id, snark) in orphaned_snarks {
59 let take = state
60 .get(&id)
61 .and_then(|job| job.snark.as_ref())
62 .is_none_or(|old_snark| snark.work > old_snark.work);
63 if take {
64 state.set_snark_work(snark.clone());
65 }
66 }
67
68 state.candidates_prune();
69
70 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
72 if let Some(job_id) = global_state.external_snark_worker.working_job_id() {
73 if !global_state.snark_pool.contains(job_id) {
74 dispatcher.push(ExternalSnarkWorkerAction::CancelWork);
76 }
77 } else {
78 dispatcher.push(SnarkPoolAction::AutoCreateCommitment);
79 }
80 }
81 SnarkPoolAction::AutoCreateCommitment => {
82 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
83 let Some(snarker_config) = &global_state.config.snarker else {
84 return;
85 };
86 let available_workers = global_state.external_snark_worker.available();
87
88 if available_workers > 0 {
89 let jobs = global_state
90 .snark_pool
91 .available_jobs_with_highest_priority(available_workers);
92 match snarker_config.strategy {
93 SnarkerStrategy::Sequential => {
94 let job_ids = jobs
95 .into_iter()
96 .map(|job| job.id.clone())
97 .take(available_workers) .collect();
99 dispatcher.push(SnarkPoolAction::CommitmentCreateMany { job_ids });
100 }
101 SnarkerStrategy::Random => {
102 let jobs = global_state.snark_pool.available_jobs_iter();
103 let choices = jobs.map(|job| job.id.clone()).collect();
104
105 dispatcher.push(SnarkPoolEffectfulAction::SnarkPoolJobsRandomChoose {
106 choices,
107 count: available_workers,
108 on_result: redux::callback!(
109 on_snark_pool_jobs_random_choose_result(job_ids: Vec<SnarkJobId>) -> crate::Action {
110 SnarkPoolAction::CommitmentCreateMany { job_ids }
111 }
112 ),
113 });
114 }
115 }
116 };
117 }
118 SnarkPoolAction::CommitmentCreateMany { job_ids } => {
119 let dispatcher = state_context.into_dispatcher();
120 for job_id in job_ids.iter().cloned() {
121 dispatcher.push(SnarkPoolAction::CommitmentCreate { job_id });
122 }
123 }
124 SnarkPoolAction::CommitmentCreate { job_id } => {
125 let job_id = job_id.clone();
126 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
127 let Some(summary) = global_state.snark_pool.job_summary(&job_id) else {
128 return;
129 };
130
131 if global_state.external_snark_worker.is_idle() {
132 dispatcher.push(ExternalSnarkWorkerAction::SubmitWork {
133 job_id: job_id.clone(),
134 summary,
135 });
136
137 let timestamp_ms = meta.time_as_nanos() / 1_000_000;
138 let Some(config) = global_state.config.snarker.as_ref() else {
139 return;
140 };
141 dispatcher.push(SnarkPoolAction::CommitmentAdd {
142 commitment: SnarkJobCommitment::new(
143 timestamp_ms,
144 job_id,
145 config.fee.clone(),
146 config.public_key.clone().into(),
147 ),
148 sender: global_state.p2p.my_id(),
149 });
150 }
151 }
152 SnarkPoolAction::CommitmentAdd { commitment, sender } => {
153 state.set_commitment(JobCommitment {
154 commitment: commitment.clone(),
155 received_t: meta.time(),
156 sender: *sender,
157 });
158
159 let commitment = commitment.clone();
161 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
162 if let Some(job_id) = global_state.external_snark_worker.working_job_id() {
163 let Some(config) = global_state.config.snarker.as_ref() else {
164 return;
165 };
166 if &commitment.job_id == job_id
167 && &commitment.snarker != config.public_key.as_ref()
168 {
169 dispatcher.push(ExternalSnarkWorkerAction::CancelWork);
170 }
171 }
172 }
173 SnarkPoolAction::WorkAdd {
174 snark,
175 sender,
176 is_sender_local,
177 } => {
178 state.set_snark_work(SnarkWork {
179 work: snark.clone(),
180 received_t: meta.time(),
181 sender: *sender,
182 });
183 state.candidates.remove_inferior_snarks(snark);
184
185 let snark = snark.clone();
187 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
188 if let Some(job_id) = global_state
189 .external_snark_worker
190 .working_job_id()
191 .filter(|job_id| *job_id == &snark.job_id())
192 {
193 if let Some(commitment) = global_state
194 .snark_pool
195 .get(job_id)
196 .and_then(|job| job.commitment.as_ref())
197 {
198 if snark > commitment.commitment {
199 dispatcher.push(ExternalSnarkWorkerAction::CancelWork);
200 }
201 }
202 }
203
204 dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast {
205 snark: snark.clone(),
206 nonce: 0,
207 is_local: *is_sender_local,
208 });
209 }
210 SnarkPoolAction::P2pSendAll { .. } => {
211 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
212 for peer_id in global_state.p2p.ready_peers() {
213 dispatcher.push(SnarkPoolAction::P2pSend { peer_id });
214 }
215 }
216 SnarkPoolAction::P2pSend { peer_id } => {
217 let peer_id = *peer_id;
218 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
219 let Some(peer) = global_state.p2p.get_ready_peer(&peer_id) else {
220 return;
221 };
222
223 let index_and_limit = peer
225 .channels
226 .snark_job_commitment
227 .next_send_index_and_limit();
228 let (commitments, first_index, last_index) = global_state
229 .snark_pool
230 .next_commitments_to_send(index_and_limit);
231
232 let send_commitments = P2pChannelsSnarkJobCommitmentAction::ResponseSend {
233 peer_id,
234 commitments,
235 first_index,
236 last_index,
237 };
238
239 let index_and_limit = peer.channels.snark.next_send_index_and_limit();
241 let (snarks, first_index, last_index) =
242 global_state.snark_pool.next_snarks_to_send(index_and_limit);
243
244 dispatcher.push(send_commitments);
245 dispatcher.push(P2pChannelsSnarkAction::ResponseSend {
246 peer_id,
247 snarks,
248 first_index,
249 last_index,
250 });
251 }
252 SnarkPoolAction::CheckTimeouts => {
253 state.last_check_timeouts = meta.time();
254
255 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
257 let timed_out_ids = global_state
258 .snark_pool
259 .timed_out_commitments_iter(meta.time())
260 .cloned()
261 .collect::<Vec<_>>();
262 for job_id in timed_out_ids {
263 dispatcher.push(SnarkPoolAction::JobCommitmentTimeout { job_id });
264 }
265 }
266 SnarkPoolAction::JobCommitmentTimeout { job_id } => {
267 state.remove_commitment(job_id);
268
269 let dispatcher = state_context.into_dispatcher();
271 dispatcher.push(SnarkPoolAction::AutoCreateCommitment);
272 }
273 }
274 }
275}