mina_node/snark_pool/
snark_pool_reducer.rs

1use std::collections::BTreeMap;
2
3use crate::{
4    p2p::channels::{
5        snark::P2pChannelsSnarkAction, snark_job_commitment::P2pChannelsSnarkJobCommitmentAction,
6    },
7    snark_pool::JobCommitment,
8    ExternalSnarkWorkerAction, SnarkerStrategy,
9};
10use mina_core::snark::{SnarkJobCommitment, SnarkJobId};
11
12use super::{
13    JobState, SnarkPoolAction, SnarkPoolActionWithMetaRef, SnarkPoolEffectfulAction,
14    SnarkPoolState, SnarkWork,
15};
16
17impl SnarkPoolState {
18    pub fn reducer(
19        mut state_context: crate::Substate<Self>,
20        action: SnarkPoolActionWithMetaRef<'_>,
21    ) {
22        let Ok(state) = state_context.get_substate_mut() else {
23            // TODO: log or propagate
24            return;
25        };
26        let (action, meta) = action.split();
27
28        match action {
29            SnarkPoolAction::Candidate(action) => {
30                super::candidate::SnarkPoolCandidatesState::reducer(
31                    crate::Substate::from_compatible_substate(state_context),
32                    meta.with_action(action),
33                );
34            }
35            SnarkPoolAction::JobsUpdate {
36                jobs,
37                orphaned_snarks,
38            } => {
39                let mut jobs_map = jobs
40                    .iter()
41                    .enumerate()
42                    .map(|(index, job)| (SnarkJobId::from(job), (index, job.clone())))
43                    .collect::<BTreeMap<_, _>>();
44
45                state.retain(|id| jobs_map.remove(id).map(|(order, _)| order));
46                for (id, (order, job)) in jobs_map {
47                    state.insert(JobState {
48                        time: meta.time(),
49                        id,
50                        job,
51                        commitment: None,
52                        snark: None,
53                        order,
54                    });
55                }
56
57                let orphaned_snarks = orphaned_snarks
58                    .iter()
59                    .map(|snark| (snark.work.job_id(), snark.clone()));
60
61                for (id, snark) in orphaned_snarks {
62                    let take = state
63                        .get(&id)
64                        .and_then(|job| job.snark.as_ref())
65                        .is_none_or(|old_snark| snark.work > old_snark.work);
66                    if take {
67                        state.set_snark_work(snark.clone());
68                    }
69                }
70
71                state.candidates_prune();
72
73                // Dispatch
74                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
75                if let Some(job_id) = global_state.external_snark_worker.working_job_id() {
76                    if !global_state.snark_pool.contains(job_id) {
77                        // job is no longer needed.
78                        dispatcher.push(ExternalSnarkWorkerAction::CancelWork);
79                    }
80                } else {
81                    dispatcher.push(SnarkPoolAction::AutoCreateCommitment);
82                }
83            }
84            SnarkPoolAction::AutoCreateCommitment => {
85                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
86                let Some(snarker_config) = &global_state.config.snarker else {
87                    return;
88                };
89                let available_workers = global_state.external_snark_worker.available();
90
91                if available_workers > 0 {
92                    let jobs = global_state
93                        .snark_pool
94                        .available_jobs_with_highest_priority(available_workers);
95                    match snarker_config.strategy {
96                        SnarkerStrategy::Sequential => {
97                            let job_ids = jobs
98                                .into_iter()
99                                .map(|job| job.id.clone())
100                                .take(available_workers) // just in case
101                                .collect();
102                            dispatcher.push(SnarkPoolAction::CommitmentCreateMany { job_ids });
103                        }
104                        SnarkerStrategy::Random => {
105                            let jobs = global_state.snark_pool.available_jobs_iter();
106                            let choices = jobs.map(|job| job.id.clone()).collect();
107
108                            dispatcher.push(SnarkPoolEffectfulAction::SnarkPoolJobsRandomChoose {
109                                choices,
110                                count: available_workers,
111                                on_result: redux::callback!(
112                                    on_snark_pool_jobs_random_choose_result(job_ids: Vec<SnarkJobId>) -> crate::Action {
113                                        SnarkPoolAction::CommitmentCreateMany { job_ids }
114                                    }
115                                ),
116                            });
117                        }
118                    }
119                };
120            }
121            SnarkPoolAction::CommitmentCreateMany { job_ids } => {
122                let dispatcher = state_context.into_dispatcher();
123                for job_id in job_ids.iter().cloned() {
124                    dispatcher.push(SnarkPoolAction::CommitmentCreate { job_id });
125                }
126            }
127            SnarkPoolAction::CommitmentCreate { job_id } => {
128                let job_id = job_id.clone();
129                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
130                let Some(summary) = global_state.snark_pool.job_summary(&job_id) else {
131                    return;
132                };
133
134                if global_state.external_snark_worker.is_idle() {
135                    dispatcher.push(ExternalSnarkWorkerAction::SubmitWork {
136                        job_id: job_id.clone(),
137                        summary,
138                    });
139
140                    let timestamp_ms = meta.time_as_nanos() / 1_000_000;
141                    let Some(config) = global_state.config.snarker.as_ref() else {
142                        return;
143                    };
144                    dispatcher.push(SnarkPoolAction::CommitmentAdd {
145                        commitment: SnarkJobCommitment::new(
146                            timestamp_ms,
147                            job_id,
148                            config.fee.clone(),
149                            config.public_key.clone().into(),
150                        ),
151                        sender: global_state.p2p.my_id(),
152                    });
153                }
154            }
155            SnarkPoolAction::CommitmentAdd { commitment, sender } => {
156                state.set_commitment(JobCommitment {
157                    commitment: commitment.clone(),
158                    received_t: meta.time(),
159                    sender: *sender,
160                });
161
162                // Dispatch
163                let commitment = commitment.clone();
164                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
165                if let Some(job_id) = global_state.external_snark_worker.working_job_id() {
166                    let Some(config) = global_state.config.snarker.as_ref() else {
167                        return;
168                    };
169                    if &commitment.job_id == job_id
170                        && &commitment.snarker != config.public_key.as_ref()
171                    {
172                        dispatcher.push(ExternalSnarkWorkerAction::CancelWork);
173                    }
174                }
175            }
176            SnarkPoolAction::WorkAdd {
177                snark,
178                sender,
179                is_sender_local,
180            } => {
181                state.set_snark_work(SnarkWork {
182                    work: snark.clone(),
183                    received_t: meta.time(),
184                    sender: *sender,
185                });
186                state.candidates.remove_inferior_snarks(snark);
187
188                // Dispatch
189                let snark = snark.clone();
190                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
191                if let Some(job_id) = global_state
192                    .external_snark_worker
193                    .working_job_id()
194                    .filter(|job_id| *job_id == &snark.job_id())
195                {
196                    if let Some(commitment) = global_state
197                        .snark_pool
198                        .get(job_id)
199                        .and_then(|job| job.commitment.as_ref())
200                    {
201                        if snark > commitment.commitment {
202                            dispatcher.push(ExternalSnarkWorkerAction::CancelWork);
203                        }
204                    }
205                }
206
207                dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast {
208                    snark: snark.clone(),
209                    nonce: 0,
210                    is_local: *is_sender_local,
211                });
212            }
213            SnarkPoolAction::P2pSendAll => {
214                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
215                for peer_id in global_state.p2p.ready_peers() {
216                    dispatcher.push(SnarkPoolAction::P2pSend { peer_id });
217                }
218            }
219            SnarkPoolAction::P2pSend { peer_id } => {
220                let peer_id = *peer_id;
221                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
222                let Some(peer) = global_state.p2p.get_ready_peer(&peer_id) else {
223                    return;
224                };
225
226                // Send commitments.
227                let index_and_limit = peer
228                    .channels
229                    .snark_job_commitment
230                    .next_send_index_and_limit();
231                let (commitments, first_index, last_index) = global_state
232                    .snark_pool
233                    .next_commitments_to_send(index_and_limit);
234
235                let send_commitments = P2pChannelsSnarkJobCommitmentAction::ResponseSend {
236                    peer_id,
237                    commitments,
238                    first_index,
239                    last_index,
240                };
241
242                // Send snarks.
243                let index_and_limit = peer.channels.snark.next_send_index_and_limit();
244                let (snarks, first_index, last_index) =
245                    global_state.snark_pool.next_snarks_to_send(index_and_limit);
246
247                dispatcher.push(send_commitments);
248                dispatcher.push(P2pChannelsSnarkAction::ResponseSend {
249                    peer_id,
250                    snarks,
251                    first_index,
252                    last_index,
253                });
254            }
255            SnarkPoolAction::CheckTimeouts => {
256                state.last_check_timeouts = meta.time();
257
258                // Dispatch
259                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
260                let timed_out_ids = global_state
261                    .snark_pool
262                    .timed_out_commitments_iter(meta.time())
263                    .cloned()
264                    .collect::<Vec<_>>();
265                for job_id in timed_out_ids {
266                    dispatcher.push(SnarkPoolAction::JobCommitmentTimeout { job_id });
267                }
268            }
269            SnarkPoolAction::JobCommitmentTimeout { job_id } => {
270                state.remove_commitment(job_id);
271
272                // Dispatch
273                let dispatcher = state_context.into_dispatcher();
274                dispatcher.push(SnarkPoolAction::AutoCreateCommitment);
275            }
276        }
277    }
278}