node/external_snark_worker/
external_snark_worker_reducer.rs

1use openmina_core::snark::Snark;
2use redux::Timestamp;
3
4use super::{
5    available_job_to_snark_worker_spec,
6    external_snark_worker_state::{ExternalSnarkWorker, ExternalSnarkWorkerState},
7    ExternalSnarkWorkerAction, ExternalSnarkWorkerActionWithMetaRef, ExternalSnarkWorkerWorkError,
8    ExternalSnarkWorkers,
9};
10use crate::{
11    external_snark_worker_effectful::ExternalSnarkWorkerEffectfulAction, p2p_ready,
12    SnarkPoolAction, Substate,
13};
14
15impl ExternalSnarkWorkers {
16    pub fn reducer(
17        state_context: Substate<ExternalSnarkWorkers>,
18        action: ExternalSnarkWorkerActionWithMetaRef<'_>,
19    ) {
20        ExternalSnarkWorker::reducer(Substate::from_compatible_substate(state_context), action);
21    }
22}
23
24impl ExternalSnarkWorker {
25    pub fn reducer(
26        mut state_context: Substate<ExternalSnarkWorker>,
27        action: ExternalSnarkWorkerActionWithMetaRef<'_>,
28    ) {
29        let Ok(worker_state) = state_context.get_substate_mut() else {
30            return;
31        };
32        let (action, meta) = action.split();
33        match action {
34            ExternalSnarkWorkerAction::Start => {
35                worker_state.state = ExternalSnarkWorkerState::Starting;
36                worker_state.update_timestamp(meta.time());
37
38                let (dispatcher, state) = state_context.into_dispatcher_and_state();
39                let Some(config) = &state.config.snarker else {
40                    return;
41                };
42
43                let public_key = config.public_key.clone().into();
44                let fee = config.fee.clone();
45
46                dispatcher.push(ExternalSnarkWorkerEffectfulAction::Start { public_key, fee });
47            }
48            ExternalSnarkWorkerAction::Started => {
49                worker_state.state = ExternalSnarkWorkerState::Idle;
50                worker_state.update_timestamp(meta.time());
51
52                let dispatcher = state_context.into_dispatcher();
53                dispatcher.push(SnarkPoolAction::AutoCreateCommitment);
54            }
55            ExternalSnarkWorkerAction::StartTimeout { .. } => {
56                let dispatcher = state_context.into_dispatcher();
57                dispatcher.push(ExternalSnarkWorkerAction::Error {
58                    error: super::ExternalSnarkWorkerError::StartTimeout,
59                    permanent: true,
60                });
61            }
62            ExternalSnarkWorkerAction::Kill => {
63                worker_state.state = ExternalSnarkWorkerState::Killing;
64                worker_state.update_timestamp(meta.time());
65
66                let dispatcher = state_context.into_dispatcher();
67                dispatcher.push(ExternalSnarkWorkerEffectfulAction::Kill);
68            }
69            ExternalSnarkWorkerAction::Killed => {
70                worker_state.state = ExternalSnarkWorkerState::None;
71                worker_state.update_timestamp(meta.time());
72            }
73            ExternalSnarkWorkerAction::Error { error, permanent } => {
74                worker_state.state = ExternalSnarkWorkerState::Error(error.clone(), *permanent);
75                worker_state.update_timestamp(meta.time());
76
77                let dispatcher = state_context.into_dispatcher();
78                dispatcher.push(ExternalSnarkWorkerAction::Kill);
79            }
80            ExternalSnarkWorkerAction::SubmitWork { job_id, summary } => {
81                worker_state.state =
82                    ExternalSnarkWorkerState::Working(job_id.clone(), summary.clone());
83                worker_state.update_timestamp(meta.time());
84
85                let (dispatcher, state) = state_context.into_dispatcher_and_state();
86                let Some(job) = state.snark_pool.get(job_id) else {
87                    return;
88                };
89
90                match available_job_to_snark_worker_spec(
91                    job.job.clone(),
92                    &state.transition_frontier,
93                ) {
94                    Ok(spec) => {
95                        dispatcher.push(ExternalSnarkWorkerEffectfulAction::SubmitWork {
96                            spec: Box::new(spec),
97                        });
98                    }
99                    Err(err) => {
100                        dispatcher.push(ExternalSnarkWorkerAction::WorkError {
101                            error: ExternalSnarkWorkerWorkError::WorkSpecError(err),
102                        });
103                    }
104                }
105            }
106            ExternalSnarkWorkerAction::WorkResult { result } => {
107                let ExternalSnarkWorkerState::Working(job_id, _) = &worker_state.state else {
108                    return;
109                };
110                worker_state.state =
111                    ExternalSnarkWorkerState::WorkReady(job_id.clone(), result.clone());
112                worker_state.update_timestamp(meta.time());
113
114                let (dispatcher, state) = state_context.into_dispatcher_and_state();
115                let Some(config) = &state.config.snarker else {
116                    return;
117                };
118                let p2p = p2p_ready!(state.p2p, meta.time());
119                let snarker = config.public_key.clone().into();
120                let fee = config.fee.clone();
121                let snark = Snark {
122                    snarker,
123                    fee,
124                    proofs: result.clone(),
125                };
126                let sender = p2p.my_id();
127                // Directly add snark to the snark pool as it's produced by us.
128                dispatcher.push(SnarkPoolAction::WorkAdd {
129                    snark,
130                    sender,
131                    is_sender_local: true,
132                });
133                dispatcher.push(ExternalSnarkWorkerAction::PruneWork);
134            }
135            ExternalSnarkWorkerAction::WorkError { error } => {
136                let ExternalSnarkWorkerState::Working(job_id, _) = &worker_state.state else {
137                    return;
138                };
139                worker_state.state =
140                    ExternalSnarkWorkerState::WorkError(job_id.clone(), error.clone());
141                worker_state.update_timestamp(meta.time());
142
143                let dispatcher = state_context.into_dispatcher();
144                dispatcher.push(ExternalSnarkWorkerAction::PruneWork);
145            }
146            ExternalSnarkWorkerAction::WorkTimeout { .. } => {
147                let dispatcher = state_context.into_dispatcher();
148                dispatcher.push(ExternalSnarkWorkerAction::CancelWork);
149            }
150            ExternalSnarkWorkerAction::CancelWork => {
151                let ExternalSnarkWorkerState::Working(job_id, _) = &worker_state.state else {
152                    return;
153                };
154                worker_state.state = ExternalSnarkWorkerState::Cancelling(job_id.clone());
155                worker_state.update_timestamp(meta.time());
156
157                let dispatcher = state_context.into_dispatcher();
158                dispatcher.push(ExternalSnarkWorkerEffectfulAction::CancelWork);
159            }
160            ExternalSnarkWorkerAction::WorkCancelled => {
161                let ExternalSnarkWorkerState::Cancelling(job_id) = &worker_state.state else {
162                    return;
163                };
164                worker_state.state = ExternalSnarkWorkerState::Cancelled(job_id.clone());
165                worker_state.update_timestamp(meta.time());
166
167                let dispatcher = state_context.into_dispatcher();
168                dispatcher.push(ExternalSnarkWorkerAction::PruneWork);
169            }
170            ExternalSnarkWorkerAction::PruneWork => {
171                worker_state.state = ExternalSnarkWorkerState::Idle;
172                worker_state.update_timestamp(meta.time());
173
174                let dispatcher = state_context.into_dispatcher();
175                dispatcher.push(SnarkPoolAction::AutoCreateCommitment);
176            }
177        }
178    }
179
180    fn update_timestamp(&mut self, time: Timestamp) {
181        self.timestamp = time;
182    }
183}