1use mina_p2p_messages::v2;
2use openmina_core::{bug_condition, requests::RequestId};
3use p2p::{
4 channels::{
5 rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest, P2pRpcResponse},
6 streaming_rpc::{P2pChannelsStreamingRpcAction, P2pStreamingRpcRequest},
7 },
8 P2pAction, PeerId,
9};
10use redux::Dispatcher;
11
12use crate::{
13 block_producer::vrf_evaluator::BlockProducerVrfEvaluatorAction,
14 ledger_effectful::LedgerEffectfulAction, Action, RpcAction, State, Substate,
15};
16
17use super::{
18 LedgerAddress, LedgerReadAction, LedgerReadActionWithMetaRef, LedgerReadIdType,
19 LedgerReadInitCallback, LedgerReadRequest, LedgerReadResponse,
20 LedgerReadStagedLedgerAuxAndPendingCoinbases, LedgerReadState,
21};
22
23impl LedgerReadState {
24 pub fn reducer(mut state_context: Substate<Self>, action: LedgerReadActionWithMetaRef<'_>) {
25 let (action, meta) = action.split();
26 let Ok(state) = state_context.get_substate_mut() else {
27 return;
28 };
29
30 match action {
31 LedgerReadAction::FindTodos => {
32 let (dispatcher, state) = state_context.into_dispatcher_and_state();
33 Self::next_read_requests_init(dispatcher, state);
34 }
35 LedgerReadAction::Init { request, callback } => {
36 let (dispatcher, state) = state_context.into_dispatcher_and_state();
37 if state.ledger.read.has_same_request(request) {
38 return;
39 }
40
41 let id = state.ledger.read.next_req_id();
42 dispatcher.push(LedgerEffectfulAction::ReadInit {
43 request: request.clone(),
44 callback: callback.clone(),
45 id,
46 });
47 }
48 LedgerReadAction::Pending { request, .. } => {
49 state.add(meta.time(), request.clone());
50 }
51 LedgerReadAction::Success { id, response } => {
52 state.add_response(*id, meta.time(), response.clone());
53
54 let (dispatcher, state) = state_context.into_dispatcher_and_state();
55 Self::propagate_read_response(dispatcher, state, *id, response.clone());
56 dispatcher.push(LedgerReadAction::Prune { id: *id });
57 }
58 LedgerReadAction::Prune { id } => {
59 state.remove(*id);
60 }
61 }
62 }
63
64 fn propagate_read_response(
65 dispatcher: &mut Dispatcher<Action, State>,
66 state: &State,
67 id: RequestId<LedgerReadIdType>,
68 response: LedgerReadResponse,
69 ) {
70 let Some(request) = state.ledger.read.get(id) else {
71 bug_condition!("Request with id: {} not found", id);
72 return;
73 };
74
75 match (request.request(), response) {
76 (
77 LedgerReadRequest::DelegatorTable(ledger_hash, pub_key),
78 LedgerReadResponse::DelegatorTable(table),
79 ) => {
80 let expected = state.block_producer.vrf_delegator_table_inputs();
81 if !expected.is_some_and(|(expected_hash, producer)| {
82 ledger_hash == expected_hash && pub_key == producer
83 }) {
84 bug_condition!("delegator table unexpected");
85 return;
86 }
87 match table {
88 None => {
89 dispatcher.push(
93 BlockProducerVrfEvaluatorAction::FinalizeDelegatorTableConstruction {
94 delegator_table: Default::default(),
95 },
96 );
97 }
98 Some(table) => {
99 dispatcher.push(
100 BlockProducerVrfEvaluatorAction::FinalizeDelegatorTableConstruction {
101 delegator_table: table.into(),
102 },
103 );
104 }
105 }
106 }
107 (_, LedgerReadResponse::DelegatorTable(..)) => unreachable!(),
108 (req, LedgerReadResponse::GetNumAccounts(resp)) => {
109 for (peer_id, id, _) in find_peers_with_ledger_rpc(state, req) {
110 dispatcher.push(P2pChannelsRpcAction::ResponseSend {
111 peer_id,
112 id,
113 response: resp.as_ref().map(|(num_accounts, hash)| {
114 Box::new(P2pRpcResponse::LedgerQuery(
115 v2::MinaLedgerSyncLedgerAnswerStableV2::NumAccounts(
116 (*num_accounts).into(),
117 hash.clone(),
118 ),
119 ))
120 }),
121 });
122 }
123 }
124 (req, LedgerReadResponse::GetChildHashesAtAddr(resp)) => {
125 for (peer_id, id, _) in find_peers_with_ledger_rpc(state, req) {
126 dispatcher.push(P2pChannelsRpcAction::ResponseSend {
127 peer_id,
128 id,
129 response: resp.as_ref().map(|(left, right)| {
130 Box::new(P2pRpcResponse::LedgerQuery(
131 v2::MinaLedgerSyncLedgerAnswerStableV2::ChildHashesAre(
132 left.clone(),
133 right.clone(),
134 ),
135 ))
136 }),
137 });
138 }
139 }
140 (req, LedgerReadResponse::GetChildAccountsAtAddr(resp)) => {
141 for (peer_id, id, _) in find_peers_with_ledger_rpc(state, req) {
142 dispatcher.push(P2pChannelsRpcAction::ResponseSend {
143 peer_id,
144 id,
145 response: resp.as_ref().map(|accounts| {
146 Box::new(P2pRpcResponse::LedgerQuery(
147 v2::MinaLedgerSyncLedgerAnswerStableV2::ContentsAre(
148 accounts.iter().cloned().collect(),
149 ),
150 ))
151 }),
152 });
153 }
154 }
155 (req, LedgerReadResponse::GetStagedLedgerAuxAndPendingCoinbases(resp)) => {
156 for (peer_id, id, is_streaming) in find_peers_with_ledger_rpc(state, req) {
157 if is_streaming {
158 dispatcher.push(P2pChannelsStreamingRpcAction::ResponseSendInit {
159 peer_id,
160 id,
161 response: resp.clone().map(Into::into),
162 });
163 } else {
164 dispatcher.push(P2pChannelsRpcAction::ResponseSend {
165 peer_id,
166 id,
167 response: resp.clone().map(|data| {
168 Box::new(P2pRpcResponse::StagedLedgerAuxAndPendingCoinbasesAtBlock(
169 data,
170 ))
171 }),
172 });
173 }
174 }
175 }
176 (
177 LedgerReadRequest::ScanStateSummary(ledger_hash),
178 LedgerReadResponse::ScanStateSummary(scan_state),
179 ) => {
180 for rpc_id in state
181 .rpc
182 .scan_state_summary_rpc_ids()
183 .filter(|(_, hash, _)| *hash == ledger_hash)
184 .map(|(id, ..)| id)
185 .collect::<Vec<_>>()
186 {
187 dispatcher.push(RpcAction::ScanStateSummaryGetSuccess {
188 rpc_id,
189 scan_state: scan_state.clone(),
190 });
191 }
192 }
193 (_, LedgerReadResponse::ScanStateSummary(..)) => unreachable!(),
194 (_req, LedgerReadResponse::GetAccounts(..)) => todo!(),
195 (_, LedgerReadResponse::AccountsForRpc(rpc_id, accounts, account_query)) => {
196 dispatcher.push(RpcAction::LedgerAccountsGetSuccess {
197 rpc_id,
198 accounts,
199 account_query,
200 });
201 }
202 (_, LedgerReadResponse::GetLedgerStatus(rpc_id, resp)) => {
203 dispatcher.push(RpcAction::LedgerStatusGetSuccess {
204 rpc_id,
205 response: resp.clone(),
206 });
207 }
208 (_, LedgerReadResponse::GetAccountDelegators(rpc_id, resp)) => {
209 dispatcher.push(RpcAction::LedgerAccountDelegatorsGetSuccess {
210 rpc_id,
211 response: resp.clone(),
212 });
213 }
214 }
215 }
216
217 fn next_read_requests_init(dispatcher: &mut Dispatcher<Action, State>, state: &State) {
218 dispatcher.push(BlockProducerVrfEvaluatorAction::BeginDelegatorTableConstruction);
221
222 let mut peers = state
224 .p2p
225 .ready_peers_iter()
226 .filter(|(_, peer)| {
227 peer.channels
228 .rpc
229 .remote_todo_requests_iter()
230 .next()
231 .is_some()
232 || peer.channels.streaming_rpc.remote_todo_request().is_some()
233 })
234 .map(|(peer_id, peer)| (*peer_id, peer.channels.rpc_remote_last_responded()))
235 .collect::<Vec<_>>();
236 peers.sort_by_key(|(_, last_responded)| *last_responded);
237 for (peer_id, _) in peers {
238 let Some((id, request, is_streaming)) = None.or_else(|| {
239 let peer = state.p2p.ready()?.get_ready_peer(&peer_id)?;
240 let mut reqs = peer.channels.rpc.remote_todo_requests_iter();
241 reqs.find_map(|req| {
242 let ledger_request = match &req.request {
243 P2pRpcRequest::LedgerQuery(hash, query) => match query {
244 v2::MinaLedgerSyncLedgerQueryStableV1::NumAccounts => {
245 LedgerReadRequest::GetNumAccounts(hash.clone())
246 }
247 v2::MinaLedgerSyncLedgerQueryStableV1::WhatChildHashes(addr) => {
248 LedgerReadRequest::GetChildHashesAtAddr(hash.clone(), addr.into())
249 }
250 v2::MinaLedgerSyncLedgerQueryStableV1::WhatContents(addr) => {
251 LedgerReadRequest::GetChildAccountsAtAddr(hash.clone(), addr.into())
252 }
253 },
254 P2pRpcRequest::StagedLedgerAuxAndPendingCoinbasesAtBlock(block_hash) => {
255 build_staged_ledger_parts_request(state, block_hash)?
256 }
257 _ => return None,
258 };
259
260 Some((req.id, ledger_request, false))
261 })
262 .or_else(|| {
263 let (id, req) = peer.channels.streaming_rpc.remote_todo_request()?;
264 let ledger_request = match req {
265 P2pStreamingRpcRequest::StagedLedgerParts(block_hash) => {
266 build_staged_ledger_parts_request(state, block_hash)?
267 }
268 };
269 Some((id, ledger_request, true))
270 })
271 }) else {
272 continue;
273 };
274
275 dispatcher.push(LedgerReadAction::Init {
276 request,
277 callback: LedgerReadInitCallback::P2pChannelsResponsePending
278 { callback: redux::callback!(on_ledger_read_init_p2p_channels_response_pending((is_streaming: bool, id: P2pRpcId, peer_id: PeerId)) -> crate::Action{
279 if is_streaming {
280 P2pAction::from(P2pChannelsStreamingRpcAction::ResponsePending {
281 peer_id,
282 id,
283 })
284 } else {
285 P2pAction::from(P2pChannelsRpcAction::ResponsePending {
286 peer_id,
287 id,
288 })
289 }
290 }),
291 args:(is_streaming, id, peer_id)
292 }
293 });
294
295 if !state.ledger.read.is_total_cost_under_limit() {
296 return;
297 }
298 }
299
300 let rpcs = state
302 .rpc
303 .scan_state_summary_rpc_ids()
304 .filter(|(.., status)| status.is_init())
305 .map(|(id, ..)| id)
306 .collect::<Vec<_>>();
307
308 for rpc_id in rpcs {
309 dispatcher.push(RpcAction::ScanStateSummaryLedgerGetInit { rpc_id });
310 if !state.ledger.read.is_total_cost_under_limit() {
311 return;
312 }
313 }
314
315 let ledger_account_rpc = state
316 .rpc
317 .accounts_request_rpc_ids()
318 .filter(|(.., status)| status.is_init())
319 .map(|(id, req, _)| (id, req))
320 .collect::<Vec<_>>();
321
322 for (rpc_id, req) in ledger_account_rpc {
323 dispatcher.push(RpcAction::LedgerAccountsGetInit {
324 rpc_id,
325 account_query: req,
326 });
327 if !state.ledger.read.is_total_cost_under_limit() {
328 return;
329 }
330 }
331 }
332}
333
334fn find_peers_with_ledger_rpc(
335 state: &crate::State,
336 req: &LedgerReadRequest,
337) -> Vec<(PeerId, P2pRpcId, bool)> {
338 let Some(p2p) = state.p2p.ready() else {
339 return Vec::new();
340 };
341 p2p.ready_peers_iter()
342 .flat_map(|(peer_id, peer)| {
343 let rpcs = peer
344 .channels
345 .rpc
346 .remote_pending_requests_iter()
347 .map(move |req| (peer_id, req.id, &req.request))
348 .filter(|(_, _, peer_req)| match (req, peer_req) {
349 (
350 LedgerReadRequest::GetNumAccounts(h1),
351 P2pRpcRequest::LedgerQuery(
352 h2,
353 v2::MinaLedgerSyncLedgerQueryStableV1::NumAccounts,
354 ),
355 ) => h1 == h2,
356 (
357 LedgerReadRequest::GetChildHashesAtAddr(h1, addr1),
358 P2pRpcRequest::LedgerQuery(
359 h2,
360 v2::MinaLedgerSyncLedgerQueryStableV1::WhatChildHashes(addr2),
361 ),
362 ) => h1 == h2 && addr1 == &LedgerAddress::from(addr2),
363 (
364 LedgerReadRequest::GetChildAccountsAtAddr(h1, addr1),
365 P2pRpcRequest::LedgerQuery(
366 h2,
367 v2::MinaLedgerSyncLedgerQueryStableV1::WhatContents(addr2),
368 ),
369 ) => h1 == h2 && addr1 == &LedgerAddress::from(addr2),
370 (
371 LedgerReadRequest::GetStagedLedgerAuxAndPendingCoinbases(data),
372 P2pRpcRequest::StagedLedgerAuxAndPendingCoinbasesAtBlock(block_hash),
373 ) => state
374 .transition_frontier
375 .get_state_body(block_hash)
376 .is_some_and(|b| b.blockchain_state.staged_ledger_hash == data.ledger_hash),
377 _ => false,
378 })
379 .map(|(peer_id, rpc_id, _)| (*peer_id, rpc_id, false));
380 let streaming_rpcs = peer
381 .channels
382 .streaming_rpc
383 .remote_pending_request()
384 .into_iter()
385 .filter(|(_, peer_req)| match (req, peer_req) {
386 (
387 LedgerReadRequest::GetStagedLedgerAuxAndPendingCoinbases(data),
388 P2pStreamingRpcRequest::StagedLedgerParts(block_hash),
389 ) => state
390 .transition_frontier
391 .get_state_body(block_hash)
392 .is_some_and(|b| b.blockchain_state.staged_ledger_hash == data.ledger_hash),
393 _ => false,
394 })
395 .map(|(rpc_id, _)| (*peer_id, rpc_id, true));
396 rpcs.chain(streaming_rpcs)
397 })
398 .collect()
399}
400
401fn build_staged_ledger_parts_request(
402 state: &crate::State,
403 block_hash: &v2::StateHash,
404) -> Option<LedgerReadRequest> {
405 let tf = &state.transition_frontier;
406 let ledger_hash = tf
407 .best_chain
408 .iter()
409 .find(|b| b.hash() == block_hash)
410 .map(|b| b.staged_ledger_hashes().clone())?;
411 let protocol_states = tf
412 .needed_protocol_states
413 .iter()
414 .map(|(hash, b)| (hash.clone(), b.clone()))
415 .chain(
416 tf.best_chain
417 .iter()
418 .take_while(|b| b.hash() != block_hash)
419 .map(|b| (b.hash().clone(), b.header().protocol_state.clone())),
420 )
421 .collect();
422
423 Some(LedgerReadRequest::GetStagedLedgerAuxAndPendingCoinbases(
424 LedgerReadStagedLedgerAuxAndPendingCoinbases {
425 ledger_hash,
426 protocol_states,
427 },
428 ))
429}