node/snark_pool/candidate/
snark_pool_candidate_reducer.rs1use std::collections::BTreeMap;
2
3use crate::{p2p_ready, SnarkPoolAction};
4use openmina_core::snark::{Snark, SnarkJobId};
5use p2p::{
6 channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest},
7 disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
8 BroadcastMessageId, P2pNetworkPubsubAction, PeerId,
9};
10use snark::{work_verify::SnarkWorkVerifyAction, work_verify_effectful::SnarkWorkVerifyId};
11
12use super::{
13 SnarkPoolCandidateAction, SnarkPoolCandidateActionWithMetaRef, SnarkPoolCandidatesState,
14};
15
16impl SnarkPoolCandidatesState {
17 pub fn reducer(
18 mut state_context: crate::Substate<Self>,
19 action: SnarkPoolCandidateActionWithMetaRef<'_>,
20 ) {
21 let Ok(state) = state_context.get_substate_mut() else {
22 return;
24 };
25 let (action, meta) = action.split();
26
27 match action {
28 SnarkPoolCandidateAction::InfoReceived { peer_id, info } => {
29 state.info_received(meta.time(), *peer_id, info.clone());
30 }
31 SnarkPoolCandidateAction::WorkFetchAll => {
32 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
33 let p2p = p2p_ready!(global_state.p2p, meta.time());
34 let peers = p2p.ready_peers_iter().map(|(id, _)| *id);
35 let get_order = |job_id: &_| {
36 global_state
37 .snark_pool
38 .get(job_id)
39 .map(|job| job.order)
40 .unwrap_or(usize::MAX)
41 };
42 let list = global_state
43 .snark_pool
44 .candidates
45 .peers_next_work_to_fetch(peers, get_order);
46
47 for (peer_id, job_id) in list {
48 dispatcher.push(SnarkPoolCandidateAction::WorkFetchInit { peer_id, job_id });
49 }
50 }
51 SnarkPoolCandidateAction::WorkFetchInit { peer_id, job_id } => {
52 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
53 let peer_id = *peer_id;
54 let job_id = job_id.clone();
55 let p2p = p2p_ready!(global_state.p2p, meta.time());
56 let Some(peer) = p2p.get_ready_peer(&peer_id) else {
57 return;
58 };
59 let rpc_id = peer.channels.next_local_rpc_id();
60
61 dispatcher.push(P2pChannelsRpcAction::RequestSend {
62 peer_id,
63 id: rpc_id,
64 request: Box::new(P2pRpcRequest::Snark(job_id.clone())),
65 on_init: Some(redux::callback!(
66 on_send_p2p_snark_rpc_request(
67 (peer_id: PeerId, rpc_id: P2pRpcId, request: P2pRpcRequest)
68 ) -> crate::Action {
69 let P2pRpcRequest::Snark(job_id) = request else {
70 unreachable!()
71 };
72 SnarkPoolCandidateAction::WorkFetchPending {
73 job_id,
74 peer_id,
75 rpc_id,
76 }
77 }
78 )),
79 });
80 }
81 SnarkPoolCandidateAction::WorkFetchPending {
82 peer_id,
83 job_id,
84 rpc_id,
85 } => {
86 state.work_fetch_pending(meta.time(), peer_id, job_id, *rpc_id);
87 }
88 SnarkPoolCandidateAction::WorkFetchError { peer_id, job_id } => {
89 state.peer_work_remove(*peer_id, job_id);
90 }
91 SnarkPoolCandidateAction::WorkFetchSuccess { peer_id, work } => {
92 state.work_received(meta.time(), *peer_id, work.clone());
93 }
94 SnarkPoolCandidateAction::WorkVerifyNext => {
95 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
96
97 let job_id_orders = global_state
98 .snark_pool
99 .range(..)
100 .map(|(_, v)| (v.order, &v.id))
101 .collect::<BTreeMap<_, _>>();
102 let job_ids_ordered_iter = job_id_orders.into_values();
103 let batch = global_state
104 .snark_pool
105 .candidates
106 .get_batch_to_verify(job_ids_ordered_iter);
107 let Some((peer_id, batch)) = batch else {
108 return;
109 };
110
111 let req_id = global_state.snark.work_verify.next_req_id();
112 let job_ids = batch.iter().map(|v| v.job_id()).collect::<Vec<_>>();
113 let sender = peer_id.to_string();
114 dispatcher.push(SnarkWorkVerifyAction::Init {
115 req_id,
116 batch,
117 sender,
118 on_success: redux::callback!(
119 on_snark_pool_candidate_work_verify_success((req_id: SnarkWorkVerifyId, sender: String, batch: Vec<Snark>)) -> crate::Action {
120 SnarkPoolCandidateAction::WorkVerifySuccess {
121 peer_id: sender.parse().unwrap(),
122 verify_id: req_id,
123 batch
124 }
125 }),
126 on_error: redux::callback!(
127 on_snark_pool_candidate_work_verify_error((req_id: SnarkWorkVerifyId, sender: String, batch: Vec<SnarkJobId>)) -> crate::Action {
128 SnarkPoolCandidateAction::WorkVerifyError {
129 peer_id: sender.parse().unwrap(),
130 verify_id: req_id,
131 batch
132 }
133 }),
134 });
135 dispatcher.push(SnarkPoolCandidateAction::WorkVerifyPending {
136 peer_id,
137 job_ids,
138 verify_id: req_id,
139 });
140 }
141 SnarkPoolCandidateAction::WorkVerifyPending {
142 peer_id,
143 job_ids,
144 verify_id,
145 } => {
146 state.verify_pending(meta.time(), peer_id, *verify_id, job_ids);
147 }
148 SnarkPoolCandidateAction::WorkVerifyError {
149 peer_id,
150 verify_id,
151 batch,
152 } => {
153 state.verify_result(meta.time(), peer_id, *verify_id, Err(()));
154
155 let dispatcher = state_context.into_dispatcher();
157 let peer_id = *peer_id;
158 dispatcher.push(P2pDisconnectionAction::Init {
159 peer_id,
160 reason: P2pDisconnectionReason::SnarkPoolVerifyError,
161 });
162
163 for snark_job_id in batch {
170 dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
171 message_id: Some(BroadcastMessageId::Snark {
172 job_id: snark_job_id.clone(),
173 }),
174 peer_id: None,
175 reason: "Snark work verification failed".to_string(),
176 });
177 }
178 }
179 SnarkPoolCandidateAction::WorkVerifySuccess {
180 peer_id,
181 verify_id,
182 batch,
183 } => {
184 state.verify_result(meta.time(), peer_id, *verify_id, Ok(()));
185
186 let dispatcher = state_context.into_dispatcher();
188
189 for snark in batch {
190 dispatcher.push(SnarkPoolAction::WorkAdd {
191 snark: snark.clone(),
192 sender: *peer_id,
193 is_sender_local: false,
194 });
195 }
196 }
197 SnarkPoolCandidateAction::PeerPrune { peer_id } => {
198 state.peer_remove(*peer_id);
199 }
200 }
201 }
202}