node/external_snark_worker/
external_snark_worker_reducer.rs1use openmina_core::snark::Snark;
2use redux::Timestamp;
3
4use super::{
5 available_job_to_snark_worker_spec,
6 external_snark_worker_state::{ExternalSnarkWorker, ExternalSnarkWorkerState},
7 ExternalSnarkWorkerAction, ExternalSnarkWorkerActionWithMetaRef, ExternalSnarkWorkerWorkError,
8 ExternalSnarkWorkers,
9};
10use crate::{
11 external_snark_worker_effectful::ExternalSnarkWorkerEffectfulAction, p2p_ready,
12 SnarkPoolAction, Substate,
13};
14
15impl ExternalSnarkWorkers {
16 pub fn reducer(
17 state_context: Substate<ExternalSnarkWorkers>,
18 action: ExternalSnarkWorkerActionWithMetaRef<'_>,
19 ) {
20 ExternalSnarkWorker::reducer(Substate::from_compatible_substate(state_context), action);
21 }
22}
23
24impl ExternalSnarkWorker {
25 pub fn reducer(
26 mut state_context: Substate<ExternalSnarkWorker>,
27 action: ExternalSnarkWorkerActionWithMetaRef<'_>,
28 ) {
29 let Ok(worker_state) = state_context.get_substate_mut() else {
30 return;
31 };
32 let (action, meta) = action.split();
33 match action {
34 ExternalSnarkWorkerAction::Start => {
35 worker_state.state = ExternalSnarkWorkerState::Starting;
36 worker_state.update_timestamp(meta.time());
37
38 let (dispatcher, state) = state_context.into_dispatcher_and_state();
39 let Some(config) = &state.config.snarker else {
40 return;
41 };
42
43 let public_key = config.public_key.clone().into();
44 let fee = config.fee.clone();
45
46 dispatcher.push(ExternalSnarkWorkerEffectfulAction::Start { public_key, fee });
47 }
48 ExternalSnarkWorkerAction::Started => {
49 worker_state.state = ExternalSnarkWorkerState::Idle;
50 worker_state.update_timestamp(meta.time());
51
52 let dispatcher = state_context.into_dispatcher();
53 dispatcher.push(SnarkPoolAction::AutoCreateCommitment);
54 }
55 ExternalSnarkWorkerAction::StartTimeout { .. } => {
56 let dispatcher = state_context.into_dispatcher();
57 dispatcher.push(ExternalSnarkWorkerAction::Error {
58 error: super::ExternalSnarkWorkerError::StartTimeout,
59 permanent: true,
60 });
61 }
62 ExternalSnarkWorkerAction::Kill => {
63 worker_state.state = ExternalSnarkWorkerState::Killing;
64 worker_state.update_timestamp(meta.time());
65
66 let dispatcher = state_context.into_dispatcher();
67 dispatcher.push(ExternalSnarkWorkerEffectfulAction::Kill);
68 }
69 ExternalSnarkWorkerAction::Killed => {
70 worker_state.state = ExternalSnarkWorkerState::None;
71 worker_state.update_timestamp(meta.time());
72 }
73 ExternalSnarkWorkerAction::Error { error, permanent } => {
74 worker_state.state = ExternalSnarkWorkerState::Error(error.clone(), *permanent);
75 worker_state.update_timestamp(meta.time());
76
77 let dispatcher = state_context.into_dispatcher();
78 dispatcher.push(ExternalSnarkWorkerAction::Kill);
79 }
80 ExternalSnarkWorkerAction::SubmitWork { job_id, summary } => {
81 worker_state.state =
82 ExternalSnarkWorkerState::Working(job_id.clone(), summary.clone());
83 worker_state.update_timestamp(meta.time());
84
85 let (dispatcher, state) = state_context.into_dispatcher_and_state();
86 let Some(job) = state.snark_pool.get(job_id) else {
87 return;
88 };
89
90 match available_job_to_snark_worker_spec(
91 job.job.clone(),
92 &state.transition_frontier,
93 ) {
94 Ok(spec) => {
95 dispatcher.push(ExternalSnarkWorkerEffectfulAction::SubmitWork {
96 spec: Box::new(spec),
97 });
98 }
99 Err(err) => {
100 dispatcher.push(ExternalSnarkWorkerAction::WorkError {
101 error: ExternalSnarkWorkerWorkError::WorkSpecError(err),
102 });
103 }
104 }
105 }
106 ExternalSnarkWorkerAction::WorkResult { result } => {
107 let ExternalSnarkWorkerState::Working(job_id, _) = &worker_state.state else {
108 return;
109 };
110 worker_state.state =
111 ExternalSnarkWorkerState::WorkReady(job_id.clone(), result.clone());
112 worker_state.update_timestamp(meta.time());
113
114 let (dispatcher, state) = state_context.into_dispatcher_and_state();
115 let Some(config) = &state.config.snarker else {
116 return;
117 };
118 let p2p = p2p_ready!(state.p2p, meta.time());
119 let snarker = config.public_key.clone().into();
120 let fee = config.fee.clone();
121 let snark = Snark {
122 snarker,
123 fee,
124 proofs: result.clone(),
125 };
126 let sender = p2p.my_id();
127 dispatcher.push(SnarkPoolAction::WorkAdd {
129 snark,
130 sender,
131 is_sender_local: true,
132 });
133 dispatcher.push(ExternalSnarkWorkerAction::PruneWork);
134 }
135 ExternalSnarkWorkerAction::WorkError { error } => {
136 let ExternalSnarkWorkerState::Working(job_id, _) = &worker_state.state else {
137 return;
138 };
139 worker_state.state =
140 ExternalSnarkWorkerState::WorkError(job_id.clone(), error.clone());
141 worker_state.update_timestamp(meta.time());
142
143 let dispatcher = state_context.into_dispatcher();
144 dispatcher.push(ExternalSnarkWorkerAction::PruneWork);
145 }
146 ExternalSnarkWorkerAction::WorkTimeout { .. } => {
147 let dispatcher = state_context.into_dispatcher();
148 dispatcher.push(ExternalSnarkWorkerAction::CancelWork);
149 }
150 ExternalSnarkWorkerAction::CancelWork => {
151 let ExternalSnarkWorkerState::Working(job_id, _) = &worker_state.state else {
152 return;
153 };
154 worker_state.state = ExternalSnarkWorkerState::Cancelling(job_id.clone());
155 worker_state.update_timestamp(meta.time());
156
157 let dispatcher = state_context.into_dispatcher();
158 dispatcher.push(ExternalSnarkWorkerEffectfulAction::CancelWork);
159 }
160 ExternalSnarkWorkerAction::WorkCancelled => {
161 let ExternalSnarkWorkerState::Cancelling(job_id) = &worker_state.state else {
162 return;
163 };
164 worker_state.state = ExternalSnarkWorkerState::Cancelled(job_id.clone());
165 worker_state.update_timestamp(meta.time());
166
167 let dispatcher = state_context.into_dispatcher();
168 dispatcher.push(ExternalSnarkWorkerAction::PruneWork);
169 }
170 ExternalSnarkWorkerAction::PruneWork => {
171 worker_state.state = ExternalSnarkWorkerState::Idle;
172 worker_state.update_timestamp(meta.time());
173
174 let dispatcher = state_context.into_dispatcher();
175 dispatcher.push(SnarkPoolAction::AutoCreateCommitment);
176 }
177 }
178 }
179
180 fn update_timestamp(&mut self, time: Timestamp) {
181 self.timestamp = time;
182 }
183}