node/snark_pool/
snark_pool_reducer.rs

1use std::collections::BTreeMap;
2
3use crate::{snark_pool::JobCommitment, ExternalSnarkWorkerAction, SnarkerStrategy};
4use openmina_core::snark::{SnarkJobCommitment, SnarkJobId};
5use p2p::channels::{
6    snark::P2pChannelsSnarkAction, snark_job_commitment::P2pChannelsSnarkJobCommitmentAction,
7};
8
9use super::{
10    JobState, SnarkPoolAction, SnarkPoolActionWithMetaRef, SnarkPoolEffectfulAction,
11    SnarkPoolState, SnarkWork,
12};
13
14impl SnarkPoolState {
15    pub fn reducer(
16        mut state_context: crate::Substate<Self>,
17        action: SnarkPoolActionWithMetaRef<'_>,
18    ) {
19        let Ok(state) = state_context.get_substate_mut() else {
20            // TODO: log or propagate
21            return;
22        };
23        let (action, meta) = action.split();
24
25        match action {
26            SnarkPoolAction::Candidate(action) => {
27                super::candidate::SnarkPoolCandidatesState::reducer(
28                    crate::Substate::from_compatible_substate(state_context),
29                    meta.with_action(action),
30                );
31            }
32            SnarkPoolAction::JobsUpdate {
33                jobs,
34                orphaned_snarks,
35            } => {
36                let mut jobs_map = jobs
37                    .iter()
38                    .enumerate()
39                    .map(|(index, job)| (SnarkJobId::from(job), (index, job.clone())))
40                    .collect::<BTreeMap<_, _>>();
41
42                state.retain(|id| jobs_map.remove(id).map(|(order, _)| order));
43                for (id, (order, job)) in jobs_map {
44                    state.insert(JobState {
45                        time: meta.time(),
46                        id,
47                        job,
48                        commitment: None,
49                        snark: None,
50                        order,
51                    });
52                }
53
54                let orphaned_snarks = orphaned_snarks
55                    .iter()
56                    .map(|snark| (snark.work.job_id(), snark.clone()));
57
58                for (id, snark) in orphaned_snarks {
59                    let take = state
60                        .get(&id)
61                        .and_then(|job| job.snark.as_ref())
62                        .is_none_or(|old_snark| snark.work > old_snark.work);
63                    if take {
64                        state.set_snark_work(snark.clone());
65                    }
66                }
67
68                state.candidates_prune();
69
70                // Dispatch
71                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
72                if let Some(job_id) = global_state.external_snark_worker.working_job_id() {
73                    if !global_state.snark_pool.contains(job_id) {
74                        // job is no longer needed.
75                        dispatcher.push(ExternalSnarkWorkerAction::CancelWork);
76                    }
77                } else {
78                    dispatcher.push(SnarkPoolAction::AutoCreateCommitment);
79                }
80            }
81            SnarkPoolAction::AutoCreateCommitment => {
82                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
83                let Some(snarker_config) = &global_state.config.snarker else {
84                    return;
85                };
86                let available_workers = global_state.external_snark_worker.available();
87
88                if available_workers > 0 {
89                    let jobs = global_state
90                        .snark_pool
91                        .available_jobs_with_highest_priority(available_workers);
92                    match snarker_config.strategy {
93                        SnarkerStrategy::Sequential => {
94                            let job_ids = jobs
95                                .into_iter()
96                                .map(|job| job.id.clone())
97                                .take(available_workers) // just in case
98                                .collect();
99                            dispatcher.push(SnarkPoolAction::CommitmentCreateMany { job_ids });
100                        }
101                        SnarkerStrategy::Random => {
102                            let jobs = global_state.snark_pool.available_jobs_iter();
103                            let choices = jobs.map(|job| job.id.clone()).collect();
104
105                            dispatcher.push(SnarkPoolEffectfulAction::SnarkPoolJobsRandomChoose {
106                                choices,
107                                count: available_workers,
108                                on_result: redux::callback!(
109                                    on_snark_pool_jobs_random_choose_result(job_ids: Vec<SnarkJobId>) -> crate::Action {
110                                        SnarkPoolAction::CommitmentCreateMany { job_ids }
111                                    }
112                                ),
113                            });
114                        }
115                    }
116                };
117            }
118            SnarkPoolAction::CommitmentCreateMany { job_ids } => {
119                let dispatcher = state_context.into_dispatcher();
120                for job_id in job_ids.iter().cloned() {
121                    dispatcher.push(SnarkPoolAction::CommitmentCreate { job_id });
122                }
123            }
124            SnarkPoolAction::CommitmentCreate { job_id } => {
125                let job_id = job_id.clone();
126                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
127                let Some(summary) = global_state.snark_pool.job_summary(&job_id) else {
128                    return;
129                };
130
131                if global_state.external_snark_worker.is_idle() {
132                    dispatcher.push(ExternalSnarkWorkerAction::SubmitWork {
133                        job_id: job_id.clone(),
134                        summary,
135                    });
136
137                    let timestamp_ms = meta.time_as_nanos() / 1_000_000;
138                    let Some(config) = global_state.config.snarker.as_ref() else {
139                        return;
140                    };
141                    dispatcher.push(SnarkPoolAction::CommitmentAdd {
142                        commitment: SnarkJobCommitment::new(
143                            timestamp_ms,
144                            job_id,
145                            config.fee.clone(),
146                            config.public_key.clone().into(),
147                        ),
148                        sender: global_state.p2p.my_id(),
149                    });
150                }
151            }
152            SnarkPoolAction::CommitmentAdd { commitment, sender } => {
153                state.set_commitment(JobCommitment {
154                    commitment: commitment.clone(),
155                    received_t: meta.time(),
156                    sender: *sender,
157                });
158
159                // Dispatch
160                let commitment = commitment.clone();
161                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
162                if let Some(job_id) = global_state.external_snark_worker.working_job_id() {
163                    let Some(config) = global_state.config.snarker.as_ref() else {
164                        return;
165                    };
166                    if &commitment.job_id == job_id
167                        && &commitment.snarker != config.public_key.as_ref()
168                    {
169                        dispatcher.push(ExternalSnarkWorkerAction::CancelWork);
170                    }
171                }
172            }
173            SnarkPoolAction::WorkAdd {
174                snark,
175                sender,
176                is_sender_local,
177            } => {
178                state.set_snark_work(SnarkWork {
179                    work: snark.clone(),
180                    received_t: meta.time(),
181                    sender: *sender,
182                });
183                state.candidates.remove_inferior_snarks(snark);
184
185                // Dispatch
186                let snark = snark.clone();
187                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
188                if let Some(job_id) = global_state
189                    .external_snark_worker
190                    .working_job_id()
191                    .filter(|job_id| *job_id == &snark.job_id())
192                {
193                    if let Some(commitment) = global_state
194                        .snark_pool
195                        .get(job_id)
196                        .and_then(|job| job.commitment.as_ref())
197                    {
198                        if snark > commitment.commitment {
199                            dispatcher.push(ExternalSnarkWorkerAction::CancelWork);
200                        }
201                    }
202                }
203
204                dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast {
205                    snark: snark.clone(),
206                    nonce: 0,
207                    is_local: *is_sender_local,
208                });
209            }
210            SnarkPoolAction::P2pSendAll { .. } => {
211                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
212                for peer_id in global_state.p2p.ready_peers() {
213                    dispatcher.push(SnarkPoolAction::P2pSend { peer_id });
214                }
215            }
216            SnarkPoolAction::P2pSend { peer_id } => {
217                let peer_id = *peer_id;
218                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
219                let Some(peer) = global_state.p2p.get_ready_peer(&peer_id) else {
220                    return;
221                };
222
223                // Send commitments.
224                let index_and_limit = peer
225                    .channels
226                    .snark_job_commitment
227                    .next_send_index_and_limit();
228                let (commitments, first_index, last_index) = global_state
229                    .snark_pool
230                    .next_commitments_to_send(index_and_limit);
231
232                let send_commitments = P2pChannelsSnarkJobCommitmentAction::ResponseSend {
233                    peer_id,
234                    commitments,
235                    first_index,
236                    last_index,
237                };
238
239                // Send snarks.
240                let index_and_limit = peer.channels.snark.next_send_index_and_limit();
241                let (snarks, first_index, last_index) =
242                    global_state.snark_pool.next_snarks_to_send(index_and_limit);
243
244                dispatcher.push(send_commitments);
245                dispatcher.push(P2pChannelsSnarkAction::ResponseSend {
246                    peer_id,
247                    snarks,
248                    first_index,
249                    last_index,
250                });
251            }
252            SnarkPoolAction::CheckTimeouts => {
253                state.last_check_timeouts = meta.time();
254
255                // Dispatch
256                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
257                let timed_out_ids = global_state
258                    .snark_pool
259                    .timed_out_commitments_iter(meta.time())
260                    .cloned()
261                    .collect::<Vec<_>>();
262                for job_id in timed_out_ids {
263                    dispatcher.push(SnarkPoolAction::JobCommitmentTimeout { job_id });
264                }
265            }
266            SnarkPoolAction::JobCommitmentTimeout { job_id } => {
267                state.remove_commitment(job_id);
268
269                // Dispatch
270                let dispatcher = state_context.into_dispatcher();
271                dispatcher.push(SnarkPoolAction::AutoCreateCommitment);
272            }
273        }
274    }
275}