mina_node/snark_pool/candidate/
snark_pool_candidate_reducer.rs

1use std::collections::BTreeMap;
2
3use crate::{
4    p2p::{
5        channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest},
6        disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
7        BroadcastMessageId, P2pNetworkPubsubAction, PeerId,
8    },
9    p2p_ready, SnarkPoolAction,
10};
11use mina_core::snark::{Snark, SnarkJobId};
12use mina_snark::{work_verify::SnarkWorkVerifyAction, work_verify_effectful::SnarkWorkVerifyId};
13
14use super::{
15    SnarkPoolCandidateAction, SnarkPoolCandidateActionWithMetaRef, SnarkPoolCandidatesState,
16};
17
18impl SnarkPoolCandidatesState {
19    pub fn reducer(
20        mut state_context: crate::Substate<Self>,
21        action: SnarkPoolCandidateActionWithMetaRef<'_>,
22    ) {
23        let Ok(state) = state_context.get_substate_mut() else {
24            // TODO: log or propagate
25            return;
26        };
27        let (action, meta) = action.split();
28
29        match action {
30            SnarkPoolCandidateAction::InfoReceived { peer_id, info } => {
31                state.info_received(meta.time(), *peer_id, info.clone());
32            }
33            SnarkPoolCandidateAction::WorkFetchAll => {
34                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
35                let p2p = p2p_ready!(global_state.p2p, meta.time());
36                let peers = p2p.ready_peers_iter().map(|(id, _)| *id);
37                let get_order = |job_id: &_| {
38                    global_state
39                        .snark_pool
40                        .get(job_id)
41                        .map(|job| job.order)
42                        .unwrap_or(usize::MAX)
43                };
44                let list = global_state
45                    .snark_pool
46                    .candidates
47                    .peers_next_work_to_fetch(peers, get_order);
48
49                for (peer_id, job_id) in list {
50                    dispatcher.push(SnarkPoolCandidateAction::WorkFetchInit { peer_id, job_id });
51                }
52            }
53            SnarkPoolCandidateAction::WorkFetchInit { peer_id, job_id } => {
54                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
55                let peer_id = *peer_id;
56                let job_id = job_id.clone();
57                let p2p = p2p_ready!(global_state.p2p, meta.time());
58                let Some(peer) = p2p.get_ready_peer(&peer_id) else {
59                    return;
60                };
61                let rpc_id = peer.channels.next_local_rpc_id();
62
63                dispatcher.push(P2pChannelsRpcAction::RequestSend {
64                    peer_id,
65                    id: rpc_id,
66                    request: Box::new(P2pRpcRequest::Snark(job_id.clone())),
67                    on_init: Some(redux::callback!(
68                        on_send_p2p_snark_rpc_request(
69                            (peer_id: PeerId, rpc_id: P2pRpcId, request: P2pRpcRequest)
70                        ) -> crate::Action {
71                            let P2pRpcRequest::Snark(job_id) = request else {
72                                unreachable!()
73                            };
74                            SnarkPoolCandidateAction::WorkFetchPending {
75                                job_id,
76                                peer_id,
77                                rpc_id,
78                            }
79                        }
80                    )),
81                });
82            }
83            SnarkPoolCandidateAction::WorkFetchPending {
84                peer_id,
85                job_id,
86                rpc_id,
87            } => {
88                state.work_fetch_pending(meta.time(), peer_id, job_id, *rpc_id);
89            }
90            SnarkPoolCandidateAction::WorkFetchError { peer_id, job_id } => {
91                state.peer_work_remove(*peer_id, job_id);
92            }
93            SnarkPoolCandidateAction::WorkFetchSuccess { peer_id, work } => {
94                state.work_received(meta.time(), *peer_id, work.clone());
95            }
96            SnarkPoolCandidateAction::WorkVerifyNext => {
97                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
98
99                let job_id_orders = global_state
100                    .snark_pool
101                    .range(..)
102                    .map(|(_, v)| (v.order, &v.id))
103                    .collect::<BTreeMap<_, _>>();
104                let job_ids_ordered_iter = job_id_orders.into_values();
105                let batch = global_state
106                    .snark_pool
107                    .candidates
108                    .get_batch_to_verify(job_ids_ordered_iter);
109                let Some((peer_id, batch)) = batch else {
110                    return;
111                };
112
113                let req_id = global_state.snark.work_verify.next_req_id();
114                let job_ids = batch.iter().map(|v| v.job_id()).collect::<Vec<_>>();
115                let sender = peer_id.to_string();
116                dispatcher.push(SnarkWorkVerifyAction::Init {
117                    req_id,
118                    batch,
119                    sender,
120                    on_success: redux::callback!(
121                        on_snark_pool_candidate_work_verify_success((req_id: SnarkWorkVerifyId, sender: String, batch: Vec<Snark>)) -> crate::Action {
122                            SnarkPoolCandidateAction::WorkVerifySuccess {
123                                peer_id: sender.parse().unwrap(),
124                                verify_id: req_id,
125                                batch
126                            }
127                        }),
128                    on_error: redux::callback!(
129                        on_snark_pool_candidate_work_verify_error((req_id: SnarkWorkVerifyId, sender: String, batch: Vec<SnarkJobId>)) -> crate::Action {
130                            SnarkPoolCandidateAction::WorkVerifyError {
131                                peer_id: sender.parse().unwrap(),
132                                verify_id: req_id,
133                                batch
134                            }
135                        }),
136                });
137                dispatcher.push(SnarkPoolCandidateAction::WorkVerifyPending {
138                    peer_id,
139                    job_ids,
140                    verify_id: req_id,
141                });
142            }
143            SnarkPoolCandidateAction::WorkVerifyPending {
144                peer_id,
145                job_ids,
146                verify_id,
147            } => {
148                state.verify_pending(meta.time(), peer_id, *verify_id, job_ids);
149            }
150            SnarkPoolCandidateAction::WorkVerifyError {
151                peer_id,
152                verify_id,
153                batch,
154            } => {
155                state.verify_result(meta.time(), peer_id, *verify_id, Err(()));
156
157                // TODO(binier): blacklist peer
158                let dispatcher = state_context.into_dispatcher();
159                let peer_id = *peer_id;
160                dispatcher.push(P2pDisconnectionAction::Init {
161                    peer_id,
162                    reason: P2pDisconnectionReason::SnarkPoolVerifyError,
163                });
164
165                // TODO: This is not correct. We are rejecting all snark messages, but the fact that the batch
166                // failed to verify means that there is at least one invalid snark in the batch, not that all of them
167                // are invalid.
168                // Instead, what should happen here is that we split the batch in two and try to verify the two batches
169                // again. Repeating until batches don't fail to verify anymore, or each batch is of size 1.
170                // It may also be worth capping the batch sizes to 10.
171                for snark_job_id in batch {
172                    dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
173                        message_id: Some(BroadcastMessageId::Snark {
174                            job_id: snark_job_id.clone(),
175                        }),
176                        peer_id: None,
177                        reason: "Snark work verification failed".to_string(),
178                    });
179                }
180            }
181            SnarkPoolCandidateAction::WorkVerifySuccess {
182                peer_id,
183                verify_id,
184                batch,
185            } => {
186                state.verify_result(meta.time(), peer_id, *verify_id, Ok(()));
187
188                // Dispatch
189                let dispatcher = state_context.into_dispatcher();
190
191                for snark in batch {
192                    dispatcher.push(SnarkPoolAction::WorkAdd {
193                        snark: snark.clone(),
194                        sender: *peer_id,
195                        is_sender_local: false,
196                    });
197                }
198            }
199            SnarkPoolCandidateAction::PeerPrune { peer_id } => {
200                state.peer_remove(*peer_id);
201            }
202        }
203    }
204}