node/ledger/read/
ledger_read_reducer.rs

1use mina_p2p_messages::v2;
2use openmina_core::{bug_condition, requests::RequestId};
3use p2p::{
4    channels::{
5        rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest, P2pRpcResponse},
6        streaming_rpc::{P2pChannelsStreamingRpcAction, P2pStreamingRpcRequest},
7    },
8    P2pAction, PeerId,
9};
10use redux::Dispatcher;
11
12use crate::{
13    block_producer::vrf_evaluator::BlockProducerVrfEvaluatorAction,
14    ledger_effectful::LedgerEffectfulAction, Action, RpcAction, State, Substate,
15};
16
17use super::{
18    LedgerAddress, LedgerReadAction, LedgerReadActionWithMetaRef, LedgerReadIdType,
19    LedgerReadInitCallback, LedgerReadRequest, LedgerReadResponse,
20    LedgerReadStagedLedgerAuxAndPendingCoinbases, LedgerReadState,
21};
22
23impl LedgerReadState {
24    pub fn reducer(mut state_context: Substate<Self>, action: LedgerReadActionWithMetaRef<'_>) {
25        let (action, meta) = action.split();
26        let Ok(state) = state_context.get_substate_mut() else {
27            return;
28        };
29
30        match action {
31            LedgerReadAction::FindTodos => {
32                let (dispatcher, state) = state_context.into_dispatcher_and_state();
33                Self::next_read_requests_init(dispatcher, state);
34            }
35            LedgerReadAction::Init { request, callback } => {
36                let (dispatcher, state) = state_context.into_dispatcher_and_state();
37                if state.ledger.read.has_same_request(request) {
38                    return;
39                }
40
41                let id = state.ledger.read.next_req_id();
42                dispatcher.push(LedgerEffectfulAction::ReadInit {
43                    request: request.clone(),
44                    callback: callback.clone(),
45                    id,
46                });
47            }
48            LedgerReadAction::Pending { request, .. } => {
49                state.add(meta.time(), request.clone());
50            }
51            LedgerReadAction::Success { id, response } => {
52                state.add_response(*id, meta.time(), response.clone());
53
54                let (dispatcher, state) = state_context.into_dispatcher_and_state();
55                Self::propagate_read_response(dispatcher, state, *id, response.clone());
56                dispatcher.push(LedgerReadAction::Prune { id: *id });
57            }
58            LedgerReadAction::Prune { id } => {
59                state.remove(*id);
60            }
61        }
62    }
63
64    fn propagate_read_response(
65        dispatcher: &mut Dispatcher<Action, State>,
66        state: &State,
67        id: RequestId<LedgerReadIdType>,
68        response: LedgerReadResponse,
69    ) {
70        let Some(request) = state.ledger.read.get(id) else {
71            bug_condition!("Request with id: {} not found", id);
72            return;
73        };
74
75        match (request.request(), response) {
76            (
77                LedgerReadRequest::DelegatorTable(ledger_hash, pub_key),
78                LedgerReadResponse::DelegatorTable(table),
79            ) => {
80                let expected = state.block_producer.vrf_delegator_table_inputs();
81                if !expected.is_some_and(|(expected_hash, producer)| {
82                    ledger_hash == expected_hash && pub_key == producer
83                }) {
84                    bug_condition!("delegator table unexpected");
85                    return;
86                }
87                match table {
88                    None => {
89                        // TODO(tizoc): Revise this, may be better to dispatch a different action here
90                        // and avoid running the VRF evaluator altogether when we know that the
91                        // table is empty.
92                        dispatcher.push(
93                            BlockProducerVrfEvaluatorAction::FinalizeDelegatorTableConstruction {
94                                delegator_table: Default::default(),
95                            },
96                        );
97                    }
98                    Some(table) => {
99                        dispatcher.push(
100                            BlockProducerVrfEvaluatorAction::FinalizeDelegatorTableConstruction {
101                                delegator_table: table.into(),
102                            },
103                        );
104                    }
105                }
106            }
107            (_, LedgerReadResponse::DelegatorTable(..)) => unreachable!(),
108            (req, LedgerReadResponse::GetNumAccounts(resp)) => {
109                for (peer_id, id, _) in find_peers_with_ledger_rpc(state, req) {
110                    dispatcher.push(P2pChannelsRpcAction::ResponseSend {
111                        peer_id,
112                        id,
113                        response: resp.as_ref().map(|(num_accounts, hash)| {
114                            Box::new(P2pRpcResponse::LedgerQuery(
115                                v2::MinaLedgerSyncLedgerAnswerStableV2::NumAccounts(
116                                    (*num_accounts).into(),
117                                    hash.clone(),
118                                ),
119                            ))
120                        }),
121                    });
122                }
123            }
124            (req, LedgerReadResponse::GetChildHashesAtAddr(resp)) => {
125                for (peer_id, id, _) in find_peers_with_ledger_rpc(state, req) {
126                    dispatcher.push(P2pChannelsRpcAction::ResponseSend {
127                        peer_id,
128                        id,
129                        response: resp.as_ref().map(|(left, right)| {
130                            Box::new(P2pRpcResponse::LedgerQuery(
131                                v2::MinaLedgerSyncLedgerAnswerStableV2::ChildHashesAre(
132                                    left.clone(),
133                                    right.clone(),
134                                ),
135                            ))
136                        }),
137                    });
138                }
139            }
140            (req, LedgerReadResponse::GetChildAccountsAtAddr(resp)) => {
141                for (peer_id, id, _) in find_peers_with_ledger_rpc(state, req) {
142                    dispatcher.push(P2pChannelsRpcAction::ResponseSend {
143                        peer_id,
144                        id,
145                        response: resp.as_ref().map(|accounts| {
146                            Box::new(P2pRpcResponse::LedgerQuery(
147                                v2::MinaLedgerSyncLedgerAnswerStableV2::ContentsAre(
148                                    accounts.iter().cloned().collect(),
149                                ),
150                            ))
151                        }),
152                    });
153                }
154            }
155            (req, LedgerReadResponse::GetStagedLedgerAuxAndPendingCoinbases(resp)) => {
156                for (peer_id, id, is_streaming) in find_peers_with_ledger_rpc(state, req) {
157                    if is_streaming {
158                        dispatcher.push(P2pChannelsStreamingRpcAction::ResponseSendInit {
159                            peer_id,
160                            id,
161                            response: resp.clone().map(Into::into),
162                        });
163                    } else {
164                        dispatcher.push(P2pChannelsRpcAction::ResponseSend {
165                            peer_id,
166                            id,
167                            response: resp.clone().map(|data| {
168                                Box::new(P2pRpcResponse::StagedLedgerAuxAndPendingCoinbasesAtBlock(
169                                    data,
170                                ))
171                            }),
172                        });
173                    }
174                }
175            }
176            (
177                LedgerReadRequest::ScanStateSummary(ledger_hash),
178                LedgerReadResponse::ScanStateSummary(scan_state),
179            ) => {
180                for rpc_id in state
181                    .rpc
182                    .scan_state_summary_rpc_ids()
183                    .filter(|(_, hash, _)| *hash == ledger_hash)
184                    .map(|(id, ..)| id)
185                    .collect::<Vec<_>>()
186                {
187                    dispatcher.push(RpcAction::ScanStateSummaryGetSuccess {
188                        rpc_id,
189                        scan_state: scan_state.clone(),
190                    });
191                }
192            }
193            (_, LedgerReadResponse::ScanStateSummary(..)) => unreachable!(),
194            (_req, LedgerReadResponse::GetAccounts(..)) => todo!(),
195            (_, LedgerReadResponse::AccountsForRpc(rpc_id, accounts, account_query)) => {
196                dispatcher.push(RpcAction::LedgerAccountsGetSuccess {
197                    rpc_id,
198                    accounts,
199                    account_query,
200                });
201            }
202            (_, LedgerReadResponse::GetLedgerStatus(rpc_id, resp)) => {
203                dispatcher.push(RpcAction::LedgerStatusGetSuccess {
204                    rpc_id,
205                    response: resp.clone(),
206                });
207            }
208            (_, LedgerReadResponse::GetAccountDelegators(rpc_id, resp)) => {
209                dispatcher.push(RpcAction::LedgerAccountDelegatorsGetSuccess {
210                    rpc_id,
211                    response: resp.clone(),
212                });
213            }
214        }
215    }
216
217    fn next_read_requests_init(dispatcher: &mut Dispatcher<Action, State>, state: &State) {
218        // fetching delegator table, this is required because delegator table construction requires reading from ledger.
219        // It could be that ledger read quota was reached when vrf tried to initiate that read, so we need to "retry" it if that's the case
220        dispatcher.push(BlockProducerVrfEvaluatorAction::BeginDelegatorTableConstruction);
221
222        // p2p rpcs
223        let mut peers = state
224            .p2p
225            .ready_peers_iter()
226            .filter(|(_, peer)| {
227                peer.channels
228                    .rpc
229                    .remote_todo_requests_iter()
230                    .next()
231                    .is_some()
232                    || peer.channels.streaming_rpc.remote_todo_request().is_some()
233            })
234            .map(|(peer_id, peer)| (*peer_id, peer.channels.rpc_remote_last_responded()))
235            .collect::<Vec<_>>();
236        peers.sort_by_key(|(_, last_responded)| *last_responded);
237        for (peer_id, _) in peers {
238            let Some((id, request, is_streaming)) = None.or_else(|| {
239                let peer = state.p2p.ready()?.get_ready_peer(&peer_id)?;
240                let mut reqs = peer.channels.rpc.remote_todo_requests_iter();
241                reqs.find_map(|req| {
242                    let ledger_request = match &req.request {
243                        P2pRpcRequest::LedgerQuery(hash, query) => match query {
244                            v2::MinaLedgerSyncLedgerQueryStableV1::NumAccounts => {
245                                LedgerReadRequest::GetNumAccounts(hash.clone())
246                            }
247                            v2::MinaLedgerSyncLedgerQueryStableV1::WhatChildHashes(addr) => {
248                                LedgerReadRequest::GetChildHashesAtAddr(hash.clone(), addr.into())
249                            }
250                            v2::MinaLedgerSyncLedgerQueryStableV1::WhatContents(addr) => {
251                                LedgerReadRequest::GetChildAccountsAtAddr(hash.clone(), addr.into())
252                            }
253                        },
254                        P2pRpcRequest::StagedLedgerAuxAndPendingCoinbasesAtBlock(block_hash) => {
255                            build_staged_ledger_parts_request(state, block_hash)?
256                        }
257                        _ => return None,
258                    };
259
260                    Some((req.id, ledger_request, false))
261                })
262                .or_else(|| {
263                    let (id, req) = peer.channels.streaming_rpc.remote_todo_request()?;
264                    let ledger_request = match req {
265                        P2pStreamingRpcRequest::StagedLedgerParts(block_hash) => {
266                            build_staged_ledger_parts_request(state, block_hash)?
267                        }
268                    };
269                    Some((id, ledger_request, true))
270                })
271            }) else {
272                continue;
273            };
274
275            dispatcher.push(LedgerReadAction::Init {
276            request,
277            callback: LedgerReadInitCallback::P2pChannelsResponsePending
278         {      callback: redux::callback!(on_ledger_read_init_p2p_channels_response_pending((is_streaming: bool, id: P2pRpcId, peer_id: PeerId)) -> crate::Action{
279                    if is_streaming {
280                        P2pAction::from(P2pChannelsStreamingRpcAction::ResponsePending {
281                            peer_id,
282                            id,
283                        })
284                    } else {
285                        P2pAction::from(P2pChannelsRpcAction::ResponsePending {
286                            peer_id,
287                            id,
288                        })
289                    }
290                }),
291                args:(is_streaming, id, peer_id)
292        }
293        });
294
295            if !state.ledger.read.is_total_cost_under_limit() {
296                return;
297            }
298        }
299
300        // rpcs
301        let rpcs = state
302            .rpc
303            .scan_state_summary_rpc_ids()
304            .filter(|(.., status)| status.is_init())
305            .map(|(id, ..)| id)
306            .collect::<Vec<_>>();
307
308        for rpc_id in rpcs {
309            dispatcher.push(RpcAction::ScanStateSummaryLedgerGetInit { rpc_id });
310            if !state.ledger.read.is_total_cost_under_limit() {
311                return;
312            }
313        }
314
315        let ledger_account_rpc = state
316            .rpc
317            .accounts_request_rpc_ids()
318            .filter(|(.., status)| status.is_init())
319            .map(|(id, req, _)| (id, req))
320            .collect::<Vec<_>>();
321
322        for (rpc_id, req) in ledger_account_rpc {
323            dispatcher.push(RpcAction::LedgerAccountsGetInit {
324                rpc_id,
325                account_query: req,
326            });
327            if !state.ledger.read.is_total_cost_under_limit() {
328                return;
329            }
330        }
331    }
332}
333
334fn find_peers_with_ledger_rpc(
335    state: &crate::State,
336    req: &LedgerReadRequest,
337) -> Vec<(PeerId, P2pRpcId, bool)> {
338    let Some(p2p) = state.p2p.ready() else {
339        return Vec::new();
340    };
341    p2p.ready_peers_iter()
342        .flat_map(|(peer_id, peer)| {
343            let rpcs = peer
344                .channels
345                .rpc
346                .remote_pending_requests_iter()
347                .map(move |req| (peer_id, req.id, &req.request))
348                .filter(|(_, _, peer_req)| match (req, peer_req) {
349                    (
350                        LedgerReadRequest::GetNumAccounts(h1),
351                        P2pRpcRequest::LedgerQuery(
352                            h2,
353                            v2::MinaLedgerSyncLedgerQueryStableV1::NumAccounts,
354                        ),
355                    ) => h1 == h2,
356                    (
357                        LedgerReadRequest::GetChildHashesAtAddr(h1, addr1),
358                        P2pRpcRequest::LedgerQuery(
359                            h2,
360                            v2::MinaLedgerSyncLedgerQueryStableV1::WhatChildHashes(addr2),
361                        ),
362                    ) => h1 == h2 && addr1 == &LedgerAddress::from(addr2),
363                    (
364                        LedgerReadRequest::GetChildAccountsAtAddr(h1, addr1),
365                        P2pRpcRequest::LedgerQuery(
366                            h2,
367                            v2::MinaLedgerSyncLedgerQueryStableV1::WhatContents(addr2),
368                        ),
369                    ) => h1 == h2 && addr1 == &LedgerAddress::from(addr2),
370                    (
371                        LedgerReadRequest::GetStagedLedgerAuxAndPendingCoinbases(data),
372                        P2pRpcRequest::StagedLedgerAuxAndPendingCoinbasesAtBlock(block_hash),
373                    ) => state
374                        .transition_frontier
375                        .get_state_body(block_hash)
376                        .is_some_and(|b| b.blockchain_state.staged_ledger_hash == data.ledger_hash),
377                    _ => false,
378                })
379                .map(|(peer_id, rpc_id, _)| (*peer_id, rpc_id, false));
380            let streaming_rpcs = peer
381                .channels
382                .streaming_rpc
383                .remote_pending_request()
384                .into_iter()
385                .filter(|(_, peer_req)| match (req, peer_req) {
386                    (
387                        LedgerReadRequest::GetStagedLedgerAuxAndPendingCoinbases(data),
388                        P2pStreamingRpcRequest::StagedLedgerParts(block_hash),
389                    ) => state
390                        .transition_frontier
391                        .get_state_body(block_hash)
392                        .is_some_and(|b| b.blockchain_state.staged_ledger_hash == data.ledger_hash),
393                    _ => false,
394                })
395                .map(|(rpc_id, _)| (*peer_id, rpc_id, true));
396            rpcs.chain(streaming_rpcs)
397        })
398        .collect()
399}
400
401fn build_staged_ledger_parts_request(
402    state: &crate::State,
403    block_hash: &v2::StateHash,
404) -> Option<LedgerReadRequest> {
405    let tf = &state.transition_frontier;
406    let ledger_hash = tf
407        .best_chain
408        .iter()
409        .find(|b| b.hash() == block_hash)
410        .map(|b| b.staged_ledger_hashes().clone())?;
411    let protocol_states = tf
412        .needed_protocol_states
413        .iter()
414        .map(|(hash, b)| (hash.clone(), b.clone()))
415        .chain(
416            tf.best_chain
417                .iter()
418                .take_while(|b| b.hash() != block_hash)
419                .map(|b| (b.hash().clone(), b.header().protocol_state.clone())),
420        )
421        .collect();
422
423    Some(LedgerReadRequest::GetStagedLedgerAuxAndPendingCoinbases(
424        LedgerReadStagedLedgerAuxAndPendingCoinbases {
425            ledger_hash,
426            protocol_states,
427        },
428    ))
429}