mina_node/snark_pool/
snark_pool_reducer.rs1use std::collections::BTreeMap;
2
3use crate::{
4 p2p::channels::{
5 snark::P2pChannelsSnarkAction, snark_job_commitment::P2pChannelsSnarkJobCommitmentAction,
6 },
7 snark_pool::JobCommitment,
8 ExternalSnarkWorkerAction, SnarkerStrategy,
9};
10use mina_core::snark::{SnarkJobCommitment, SnarkJobId};
11
12use super::{
13 JobState, SnarkPoolAction, SnarkPoolActionWithMetaRef, SnarkPoolEffectfulAction,
14 SnarkPoolState, SnarkWork,
15};
16
17impl SnarkPoolState {
18 pub fn reducer(
19 mut state_context: crate::Substate<Self>,
20 action: SnarkPoolActionWithMetaRef<'_>,
21 ) {
22 let Ok(state) = state_context.get_substate_mut() else {
23 return;
25 };
26 let (action, meta) = action.split();
27
28 match action {
29 SnarkPoolAction::Candidate(action) => {
30 super::candidate::SnarkPoolCandidatesState::reducer(
31 crate::Substate::from_compatible_substate(state_context),
32 meta.with_action(action),
33 );
34 }
35 SnarkPoolAction::JobsUpdate {
36 jobs,
37 orphaned_snarks,
38 } => {
39 let mut jobs_map = jobs
40 .iter()
41 .enumerate()
42 .map(|(index, job)| (SnarkJobId::from(job), (index, job.clone())))
43 .collect::<BTreeMap<_, _>>();
44
45 state.retain(|id| jobs_map.remove(id).map(|(order, _)| order));
46 for (id, (order, job)) in jobs_map {
47 state.insert(JobState {
48 time: meta.time(),
49 id,
50 job,
51 commitment: None,
52 snark: None,
53 order,
54 });
55 }
56
57 let orphaned_snarks = orphaned_snarks
58 .iter()
59 .map(|snark| (snark.work.job_id(), snark.clone()));
60
61 for (id, snark) in orphaned_snarks {
62 let take = state
63 .get(&id)
64 .and_then(|job| job.snark.as_ref())
65 .is_none_or(|old_snark| snark.work > old_snark.work);
66 if take {
67 state.set_snark_work(snark.clone());
68 }
69 }
70
71 state.candidates_prune();
72
73 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
75 if let Some(job_id) = global_state.external_snark_worker.working_job_id() {
76 if !global_state.snark_pool.contains(job_id) {
77 dispatcher.push(ExternalSnarkWorkerAction::CancelWork);
79 }
80 } else {
81 dispatcher.push(SnarkPoolAction::AutoCreateCommitment);
82 }
83 }
84 SnarkPoolAction::AutoCreateCommitment => {
85 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
86 let Some(snarker_config) = &global_state.config.snarker else {
87 return;
88 };
89 let available_workers = global_state.external_snark_worker.available();
90
91 if available_workers > 0 {
92 let jobs = global_state
93 .snark_pool
94 .available_jobs_with_highest_priority(available_workers);
95 match snarker_config.strategy {
96 SnarkerStrategy::Sequential => {
97 let job_ids = jobs
98 .into_iter()
99 .map(|job| job.id.clone())
100 .take(available_workers) .collect();
102 dispatcher.push(SnarkPoolAction::CommitmentCreateMany { job_ids });
103 }
104 SnarkerStrategy::Random => {
105 let jobs = global_state.snark_pool.available_jobs_iter();
106 let choices = jobs.map(|job| job.id.clone()).collect();
107
108 dispatcher.push(SnarkPoolEffectfulAction::SnarkPoolJobsRandomChoose {
109 choices,
110 count: available_workers,
111 on_result: redux::callback!(
112 on_snark_pool_jobs_random_choose_result(job_ids: Vec<SnarkJobId>) -> crate::Action {
113 SnarkPoolAction::CommitmentCreateMany { job_ids }
114 }
115 ),
116 });
117 }
118 }
119 };
120 }
121 SnarkPoolAction::CommitmentCreateMany { job_ids } => {
122 let dispatcher = state_context.into_dispatcher();
123 for job_id in job_ids.iter().cloned() {
124 dispatcher.push(SnarkPoolAction::CommitmentCreate { job_id });
125 }
126 }
127 SnarkPoolAction::CommitmentCreate { job_id } => {
128 let job_id = job_id.clone();
129 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
130 let Some(summary) = global_state.snark_pool.job_summary(&job_id) else {
131 return;
132 };
133
134 if global_state.external_snark_worker.is_idle() {
135 dispatcher.push(ExternalSnarkWorkerAction::SubmitWork {
136 job_id: job_id.clone(),
137 summary,
138 });
139
140 let timestamp_ms = meta.time_as_nanos() / 1_000_000;
141 let Some(config) = global_state.config.snarker.as_ref() else {
142 return;
143 };
144 dispatcher.push(SnarkPoolAction::CommitmentAdd {
145 commitment: SnarkJobCommitment::new(
146 timestamp_ms,
147 job_id,
148 config.fee.clone(),
149 config.public_key.clone().into(),
150 ),
151 sender: global_state.p2p.my_id(),
152 });
153 }
154 }
155 SnarkPoolAction::CommitmentAdd { commitment, sender } => {
156 state.set_commitment(JobCommitment {
157 commitment: commitment.clone(),
158 received_t: meta.time(),
159 sender: *sender,
160 });
161
162 let commitment = commitment.clone();
164 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
165 if let Some(job_id) = global_state.external_snark_worker.working_job_id() {
166 let Some(config) = global_state.config.snarker.as_ref() else {
167 return;
168 };
169 if &commitment.job_id == job_id
170 && &commitment.snarker != config.public_key.as_ref()
171 {
172 dispatcher.push(ExternalSnarkWorkerAction::CancelWork);
173 }
174 }
175 }
176 SnarkPoolAction::WorkAdd {
177 snark,
178 sender,
179 is_sender_local,
180 } => {
181 state.set_snark_work(SnarkWork {
182 work: snark.clone(),
183 received_t: meta.time(),
184 sender: *sender,
185 });
186 state.candidates.remove_inferior_snarks(snark);
187
188 let snark = snark.clone();
190 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
191 if let Some(job_id) = global_state
192 .external_snark_worker
193 .working_job_id()
194 .filter(|job_id| *job_id == &snark.job_id())
195 {
196 if let Some(commitment) = global_state
197 .snark_pool
198 .get(job_id)
199 .and_then(|job| job.commitment.as_ref())
200 {
201 if snark > commitment.commitment {
202 dispatcher.push(ExternalSnarkWorkerAction::CancelWork);
203 }
204 }
205 }
206
207 dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast {
208 snark: snark.clone(),
209 nonce: 0,
210 is_local: *is_sender_local,
211 });
212 }
213 SnarkPoolAction::P2pSendAll => {
214 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
215 for peer_id in global_state.p2p.ready_peers() {
216 dispatcher.push(SnarkPoolAction::P2pSend { peer_id });
217 }
218 }
219 SnarkPoolAction::P2pSend { peer_id } => {
220 let peer_id = *peer_id;
221 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
222 let Some(peer) = global_state.p2p.get_ready_peer(&peer_id) else {
223 return;
224 };
225
226 let index_and_limit = peer
228 .channels
229 .snark_job_commitment
230 .next_send_index_and_limit();
231 let (commitments, first_index, last_index) = global_state
232 .snark_pool
233 .next_commitments_to_send(index_and_limit);
234
235 let send_commitments = P2pChannelsSnarkJobCommitmentAction::ResponseSend {
236 peer_id,
237 commitments,
238 first_index,
239 last_index,
240 };
241
242 let index_and_limit = peer.channels.snark.next_send_index_and_limit();
244 let (snarks, first_index, last_index) =
245 global_state.snark_pool.next_snarks_to_send(index_and_limit);
246
247 dispatcher.push(send_commitments);
248 dispatcher.push(P2pChannelsSnarkAction::ResponseSend {
249 peer_id,
250 snarks,
251 first_index,
252 last_index,
253 });
254 }
255 SnarkPoolAction::CheckTimeouts => {
256 state.last_check_timeouts = meta.time();
257
258 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
260 let timed_out_ids = global_state
261 .snark_pool
262 .timed_out_commitments_iter(meta.time())
263 .cloned()
264 .collect::<Vec<_>>();
265 for job_id in timed_out_ids {
266 dispatcher.push(SnarkPoolAction::JobCommitmentTimeout { job_id });
267 }
268 }
269 SnarkPoolAction::JobCommitmentTimeout { job_id } => {
270 state.remove_commitment(job_id);
271
272 let dispatcher = state_context.into_dispatcher();
274 dispatcher.push(SnarkPoolAction::AutoCreateCommitment);
275 }
276 }
277 }
278}