node/snark_pool/candidate/
snark_pool_candidate_reducer.rs

1use std::collections::BTreeMap;
2
3use crate::{p2p_ready, SnarkPoolAction};
4use openmina_core::snark::{Snark, SnarkJobId};
5use p2p::{
6    channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest},
7    disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
8    BroadcastMessageId, P2pNetworkPubsubAction, PeerId,
9};
10use snark::{work_verify::SnarkWorkVerifyAction, work_verify_effectful::SnarkWorkVerifyId};
11
12use super::{
13    SnarkPoolCandidateAction, SnarkPoolCandidateActionWithMetaRef, SnarkPoolCandidatesState,
14};
15
16impl SnarkPoolCandidatesState {
17    pub fn reducer(
18        mut state_context: crate::Substate<Self>,
19        action: SnarkPoolCandidateActionWithMetaRef<'_>,
20    ) {
21        let Ok(state) = state_context.get_substate_mut() else {
22            // TODO: log or propagate
23            return;
24        };
25        let (action, meta) = action.split();
26
27        match action {
28            SnarkPoolCandidateAction::InfoReceived { peer_id, info } => {
29                state.info_received(meta.time(), *peer_id, info.clone());
30            }
31            SnarkPoolCandidateAction::WorkFetchAll => {
32                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
33                let p2p = p2p_ready!(global_state.p2p, meta.time());
34                let peers = p2p.ready_peers_iter().map(|(id, _)| *id);
35                let get_order = |job_id: &_| {
36                    global_state
37                        .snark_pool
38                        .get(job_id)
39                        .map(|job| job.order)
40                        .unwrap_or(usize::MAX)
41                };
42                let list = global_state
43                    .snark_pool
44                    .candidates
45                    .peers_next_work_to_fetch(peers, get_order);
46
47                for (peer_id, job_id) in list {
48                    dispatcher.push(SnarkPoolCandidateAction::WorkFetchInit { peer_id, job_id });
49                }
50            }
51            SnarkPoolCandidateAction::WorkFetchInit { peer_id, job_id } => {
52                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
53                let peer_id = *peer_id;
54                let job_id = job_id.clone();
55                let p2p = p2p_ready!(global_state.p2p, meta.time());
56                let Some(peer) = p2p.get_ready_peer(&peer_id) else {
57                    return;
58                };
59                let rpc_id = peer.channels.next_local_rpc_id();
60
61                dispatcher.push(P2pChannelsRpcAction::RequestSend {
62                    peer_id,
63                    id: rpc_id,
64                    request: Box::new(P2pRpcRequest::Snark(job_id.clone())),
65                    on_init: Some(redux::callback!(
66                        on_send_p2p_snark_rpc_request(
67                            (peer_id: PeerId, rpc_id: P2pRpcId, request: P2pRpcRequest)
68                        ) -> crate::Action {
69                            let P2pRpcRequest::Snark(job_id) = request else {
70                                unreachable!()
71                            };
72                            SnarkPoolCandidateAction::WorkFetchPending {
73                                job_id,
74                                peer_id,
75                                rpc_id,
76                            }
77                        }
78                    )),
79                });
80            }
81            SnarkPoolCandidateAction::WorkFetchPending {
82                peer_id,
83                job_id,
84                rpc_id,
85            } => {
86                state.work_fetch_pending(meta.time(), peer_id, job_id, *rpc_id);
87            }
88            SnarkPoolCandidateAction::WorkFetchError { peer_id, job_id } => {
89                state.peer_work_remove(*peer_id, job_id);
90            }
91            SnarkPoolCandidateAction::WorkFetchSuccess { peer_id, work } => {
92                state.work_received(meta.time(), *peer_id, work.clone());
93            }
94            SnarkPoolCandidateAction::WorkVerifyNext => {
95                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
96
97                let job_id_orders = global_state
98                    .snark_pool
99                    .range(..)
100                    .map(|(_, v)| (v.order, &v.id))
101                    .collect::<BTreeMap<_, _>>();
102                let job_ids_ordered_iter = job_id_orders.into_values();
103                let batch = global_state
104                    .snark_pool
105                    .candidates
106                    .get_batch_to_verify(job_ids_ordered_iter);
107                let Some((peer_id, batch)) = batch else {
108                    return;
109                };
110
111                let req_id = global_state.snark.work_verify.next_req_id();
112                let job_ids = batch.iter().map(|v| v.job_id()).collect::<Vec<_>>();
113                let sender = peer_id.to_string();
114                dispatcher.push(SnarkWorkVerifyAction::Init {
115                    req_id,
116                    batch,
117                    sender,
118                    on_success: redux::callback!(
119                        on_snark_pool_candidate_work_verify_success((req_id: SnarkWorkVerifyId, sender: String, batch: Vec<Snark>)) -> crate::Action {
120                            SnarkPoolCandidateAction::WorkVerifySuccess {
121                                peer_id: sender.parse().unwrap(),
122                                verify_id: req_id,
123                                batch
124                            }
125                        }),
126                    on_error: redux::callback!(
127                        on_snark_pool_candidate_work_verify_error((req_id: SnarkWorkVerifyId, sender: String, batch: Vec<SnarkJobId>)) -> crate::Action {
128                            SnarkPoolCandidateAction::WorkVerifyError {
129                                peer_id: sender.parse().unwrap(),
130                                verify_id: req_id,
131                                batch
132                            }
133                        }),
134                });
135                dispatcher.push(SnarkPoolCandidateAction::WorkVerifyPending {
136                    peer_id,
137                    job_ids,
138                    verify_id: req_id,
139                });
140            }
141            SnarkPoolCandidateAction::WorkVerifyPending {
142                peer_id,
143                job_ids,
144                verify_id,
145            } => {
146                state.verify_pending(meta.time(), peer_id, *verify_id, job_ids);
147            }
148            SnarkPoolCandidateAction::WorkVerifyError {
149                peer_id,
150                verify_id,
151                batch,
152            } => {
153                state.verify_result(meta.time(), peer_id, *verify_id, Err(()));
154
155                // TODO(binier): blacklist peer
156                let dispatcher = state_context.into_dispatcher();
157                let peer_id = *peer_id;
158                dispatcher.push(P2pDisconnectionAction::Init {
159                    peer_id,
160                    reason: P2pDisconnectionReason::SnarkPoolVerifyError,
161                });
162
163                // TODO: This is not correct. We are rejecting all snark messages, but the fact that the batch
164                // failed to verify means that there is at least one invalid snark in the batch, not that all of them
165                // are invalid.
166                // Instead, what should happen here is that we split the batch in two and try to verify the two batches
167                // again. Repeating until batches don't fail to verify anymore, or each batch is of size 1.
168                // It may also be worth capping the batch sizes to 10.
169                for snark_job_id in batch {
170                    dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
171                        message_id: Some(BroadcastMessageId::Snark {
172                            job_id: snark_job_id.clone(),
173                        }),
174                        peer_id: None,
175                        reason: "Snark work verification failed".to_string(),
176                    });
177                }
178            }
179            SnarkPoolCandidateAction::WorkVerifySuccess {
180                peer_id,
181                verify_id,
182                batch,
183            } => {
184                state.verify_result(meta.time(), peer_id, *verify_id, Ok(()));
185
186                // Dispatch
187                let dispatcher = state_context.into_dispatcher();
188
189                for snark in batch {
190                    dispatcher.push(SnarkPoolAction::WorkAdd {
191                        snark: snark.clone(),
192                        sender: *peer_id,
193                        is_sender_local: false,
194                    });
195                }
196            }
197            SnarkPoolCandidateAction::PeerPrune { peer_id } => {
198                state.peer_remove(*peer_id);
199            }
200        }
201    }
202}