mina_node/snark_pool/candidate/
snark_pool_candidate_reducer.rs1use std::collections::BTreeMap;
2
3use crate::{
4 p2p::{
5 channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest},
6 disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
7 BroadcastMessageId, P2pNetworkPubsubAction, PeerId,
8 },
9 p2p_ready, SnarkPoolAction,
10};
11use mina_core::snark::{Snark, SnarkJobId};
12use mina_snark::{work_verify::SnarkWorkVerifyAction, work_verify_effectful::SnarkWorkVerifyId};
13
14use super::{
15 SnarkPoolCandidateAction, SnarkPoolCandidateActionWithMetaRef, SnarkPoolCandidatesState,
16};
17
18impl SnarkPoolCandidatesState {
19 pub fn reducer(
20 mut state_context: crate::Substate<Self>,
21 action: SnarkPoolCandidateActionWithMetaRef<'_>,
22 ) {
23 let Ok(state) = state_context.get_substate_mut() else {
24 return;
26 };
27 let (action, meta) = action.split();
28
29 match action {
30 SnarkPoolCandidateAction::InfoReceived { peer_id, info } => {
31 state.info_received(meta.time(), *peer_id, info.clone());
32 }
33 SnarkPoolCandidateAction::WorkFetchAll => {
34 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
35 let p2p = p2p_ready!(global_state.p2p, meta.time());
36 let peers = p2p.ready_peers_iter().map(|(id, _)| *id);
37 let get_order = |job_id: &_| {
38 global_state
39 .snark_pool
40 .get(job_id)
41 .map(|job| job.order)
42 .unwrap_or(usize::MAX)
43 };
44 let list = global_state
45 .snark_pool
46 .candidates
47 .peers_next_work_to_fetch(peers, get_order);
48
49 for (peer_id, job_id) in list {
50 dispatcher.push(SnarkPoolCandidateAction::WorkFetchInit { peer_id, job_id });
51 }
52 }
53 SnarkPoolCandidateAction::WorkFetchInit { peer_id, job_id } => {
54 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
55 let peer_id = *peer_id;
56 let job_id = job_id.clone();
57 let p2p = p2p_ready!(global_state.p2p, meta.time());
58 let Some(peer) = p2p.get_ready_peer(&peer_id) else {
59 return;
60 };
61 let rpc_id = peer.channels.next_local_rpc_id();
62
63 dispatcher.push(P2pChannelsRpcAction::RequestSend {
64 peer_id,
65 id: rpc_id,
66 request: Box::new(P2pRpcRequest::Snark(job_id.clone())),
67 on_init: Some(redux::callback!(
68 on_send_p2p_snark_rpc_request(
69 (peer_id: PeerId, rpc_id: P2pRpcId, request: P2pRpcRequest)
70 ) -> crate::Action {
71 let P2pRpcRequest::Snark(job_id) = request else {
72 unreachable!()
73 };
74 SnarkPoolCandidateAction::WorkFetchPending {
75 job_id,
76 peer_id,
77 rpc_id,
78 }
79 }
80 )),
81 });
82 }
83 SnarkPoolCandidateAction::WorkFetchPending {
84 peer_id,
85 job_id,
86 rpc_id,
87 } => {
88 state.work_fetch_pending(meta.time(), peer_id, job_id, *rpc_id);
89 }
90 SnarkPoolCandidateAction::WorkFetchError { peer_id, job_id } => {
91 state.peer_work_remove(*peer_id, job_id);
92 }
93 SnarkPoolCandidateAction::WorkFetchSuccess { peer_id, work } => {
94 state.work_received(meta.time(), *peer_id, work.clone());
95 }
96 SnarkPoolCandidateAction::WorkVerifyNext => {
97 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
98
99 let job_id_orders = global_state
100 .snark_pool
101 .range(..)
102 .map(|(_, v)| (v.order, &v.id))
103 .collect::<BTreeMap<_, _>>();
104 let job_ids_ordered_iter = job_id_orders.into_values();
105 let batch = global_state
106 .snark_pool
107 .candidates
108 .get_batch_to_verify(job_ids_ordered_iter);
109 let Some((peer_id, batch)) = batch else {
110 return;
111 };
112
113 let req_id = global_state.snark.work_verify.next_req_id();
114 let job_ids = batch.iter().map(|v| v.job_id()).collect::<Vec<_>>();
115 let sender = peer_id.to_string();
116 dispatcher.push(SnarkWorkVerifyAction::Init {
117 req_id,
118 batch,
119 sender,
120 on_success: redux::callback!(
121 on_snark_pool_candidate_work_verify_success((req_id: SnarkWorkVerifyId, sender: String, batch: Vec<Snark>)) -> crate::Action {
122 SnarkPoolCandidateAction::WorkVerifySuccess {
123 peer_id: sender.parse().unwrap(),
124 verify_id: req_id,
125 batch
126 }
127 }),
128 on_error: redux::callback!(
129 on_snark_pool_candidate_work_verify_error((req_id: SnarkWorkVerifyId, sender: String, batch: Vec<SnarkJobId>)) -> crate::Action {
130 SnarkPoolCandidateAction::WorkVerifyError {
131 peer_id: sender.parse().unwrap(),
132 verify_id: req_id,
133 batch
134 }
135 }),
136 });
137 dispatcher.push(SnarkPoolCandidateAction::WorkVerifyPending {
138 peer_id,
139 job_ids,
140 verify_id: req_id,
141 });
142 }
143 SnarkPoolCandidateAction::WorkVerifyPending {
144 peer_id,
145 job_ids,
146 verify_id,
147 } => {
148 state.verify_pending(meta.time(), peer_id, *verify_id, job_ids);
149 }
150 SnarkPoolCandidateAction::WorkVerifyError {
151 peer_id,
152 verify_id,
153 batch,
154 } => {
155 state.verify_result(meta.time(), peer_id, *verify_id, Err(()));
156
157 let dispatcher = state_context.into_dispatcher();
159 let peer_id = *peer_id;
160 dispatcher.push(P2pDisconnectionAction::Init {
161 peer_id,
162 reason: P2pDisconnectionReason::SnarkPoolVerifyError,
163 });
164
165 for snark_job_id in batch {
172 dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
173 message_id: Some(BroadcastMessageId::Snark {
174 job_id: snark_job_id.clone(),
175 }),
176 peer_id: None,
177 reason: "Snark work verification failed".to_string(),
178 });
179 }
180 }
181 SnarkPoolCandidateAction::WorkVerifySuccess {
182 peer_id,
183 verify_id,
184 batch,
185 } => {
186 state.verify_result(meta.time(), peer_id, *verify_id, Ok(()));
187
188 let dispatcher = state_context.into_dispatcher();
190
191 for snark in batch {
192 dispatcher.push(SnarkPoolAction::WorkAdd {
193 snark: snark.clone(),
194 sender: *peer_id,
195 is_sender_local: false,
196 });
197 }
198 }
199 SnarkPoolCandidateAction::PeerPrune { peer_id } => {
200 state.peer_remove(*peer_id);
201 }
202 }
203 }
204}