mina_node/ledger/
ledger_manager.rs

1use super::{
2    read::{LedgerReadId, LedgerReadRequest, LedgerReadResponse, LedgerStatus},
3    write::{LedgerWriteRequest, LedgerWriteResponse},
4    LedgerCtx, LedgerService,
5};
6use crate::{
7    account::AccountPublicKey, ledger::LedgerAddress, rpc::AccountQuery,
8    transition_frontier::sync::ledger::snarked::TransitionFrontierSyncLedgerSnarkedService,
9};
10use ledger::{
11    staged_ledger::staged_ledger::{SkipVerification, StagedLedger},
12    Account, AccountId, Mask,
13};
14use mina_core::{channels::mpsc, thread};
15use mina_p2p_messages::v2::{self, LedgerHash, MinaBaseAccountBinableArgStableV2};
16use mina_signer::CompressedPubKey;
17use std::collections::BTreeMap;
18
19/// The type enumerating different requests that can be made to the
20/// service. Each specific constructor has a specific response
21/// constructor associated with it. Unfortunately, this relationship
22/// can't be expressed in the Rust type system at the moment. For this
23/// reason this type is private while functions wrapping the whole call
24/// to the service are exposed as the service's methods.
25#[expect(
26    clippy::large_enum_variant,
27    reason = "This is storing large messages, but size difference is only 2x between largest and second-largest variants"
28)]
29pub(super) enum LedgerRequest {
30    Write(LedgerWriteRequest),
31    Read(LedgerReadId, LedgerReadRequest),
32    /// expected response: `LedgerHash`
33    AccountsSet {
34        snarked_ledger_hash: LedgerHash,
35        parent: LedgerAddress,
36        accounts: Vec<MinaBaseAccountBinableArgStableV2>,
37    },
38    /// expected response: `Vec<Account>`
39    AccountsGet {
40        ledger_hash: LedgerHash,
41        account_ids: Vec<AccountId>,
42    },
43    /// expected response: `ChildHashes`
44    ChildHashesGet {
45        snarked_ledger_hash: LedgerHash,
46        parent: LedgerAddress,
47    },
48    /// expected response: `Success`
49    ComputeSnarkedLedgerHashes {
50        snarked_ledger_hash: LedgerHash,
51    },
52    /// expected response: `SnarkedLedgerContentsCopied`
53    CopySnarkedLedgerContentsForSync {
54        origin_snarked_ledger_hash: Vec<LedgerHash>,
55        target_snarked_ledger_hash: LedgerHash,
56        overwrite: bool,
57    },
58    /// expected response: `ProducersWithDelegatesMap`
59    GetProducersWithDelegates {
60        ledger_hash: LedgerHash,
61        filter: fn(&CompressedPubKey) -> bool,
62    },
63    /// expected response: `LedgerMask`
64    GetMask {
65        ledger_hash: LedgerHash,
66    },
67    InsertGenesisLedger {
68        mask: Mask,
69    },
70    StagedLedgerReconstructResult {
71        staged_ledger_hash: LedgerHash,
72        result: Result<StagedLedger, String>,
73    },
74}
75
76#[derive(Debug)]
77pub enum LedgerResponse {
78    Write(LedgerWriteResponse),
79    Read(LedgerReadId, LedgerReadResponse),
80    ChildHashes(Option<(LedgerHash, LedgerHash)>),
81    AccountsSet(Result<LedgerHash, String>),
82    AccountsGet(Result<Vec<Account>, String>),
83    LedgerMask(Option<(Mask, bool)>),
84    #[allow(clippy::type_complexity)]
85    ProducersWithDelegatesMap(
86        Option<BTreeMap<AccountPublicKey, Vec<(ledger::AccountIndex, AccountPublicKey, u64)>>>,
87    ),
88    SnarkedLedgerContentsCopied(Result<bool, String>),
89    Success, // operation was performed and result stored; nothing to return.
90}
91
92impl LedgerRequest {
93    fn handle(
94        self,
95        ledger_ctx: &mut LedgerCtx,
96        caller: &LedgerCaller,
97        force_sync: bool,
98    ) -> LedgerResponse {
99        match self {
100            Self::Write(request) => LedgerResponse::Write(match request {
101                LedgerWriteRequest::StagedLedgerReconstruct {
102                    snarked_ledger_hash,
103                    parts,
104                } => {
105                    if !force_sync {
106                        let caller = caller.clone();
107                        let cb = move |staged_ledger_hash, result| {
108                            caller.call(LedgerRequest::StagedLedgerReconstructResult {
109                                staged_ledger_hash,
110                                result,
111                            })
112                        };
113                        if let Err(e) =
114                            ledger_ctx.staged_ledger_reconstruct(snarked_ledger_hash, parts, cb)
115                        {
116                            mina_core::log::inner::error!(
117                                "Failed to reconstruct staged ledger: {:?}",
118                                e
119                            );
120                            // TODO: Handle the error in the state machine
121                        }
122                        return LedgerResponse::Success;
123                    } else {
124                        let (staged_ledger_hash, result) = match ledger_ctx
125                            .staged_ledger_reconstruct_sync(snarked_ledger_hash, parts)
126                        {
127                            Ok(result) => result,
128                            Err(e) => (v2::LedgerHash::zero(), Err(String::from(e))),
129                        };
130
131                        LedgerWriteResponse::StagedLedgerReconstruct {
132                            staged_ledger_hash,
133                            result,
134                        }
135                    }
136                }
137                LedgerWriteRequest::StagedLedgerDiffCreate {
138                    pred_block,
139                    global_slot_since_genesis: global_slot,
140                    is_new_epoch,
141                    producer,
142                    delegator,
143                    coinbase_receiver,
144                    completed_snarks,
145                    supercharge_coinbase,
146                    transactions_by_fee,
147                } => {
148                    let pred_block_hash = pred_block.hash().clone();
149                    let global_slot_since_genesis = global_slot.clone();
150                    let result = ledger_ctx.staged_ledger_diff_create(
151                        pred_block,
152                        global_slot,
153                        is_new_epoch,
154                        producer,
155                        delegator,
156                        coinbase_receiver,
157                        completed_snarks,
158                        supercharge_coinbase,
159                        transactions_by_fee,
160                    );
161                    LedgerWriteResponse::StagedLedgerDiffCreate {
162                        pred_block_hash,
163                        global_slot_since_genesis,
164                        result: result.map(Into::into),
165                    }
166                }
167                LedgerWriteRequest::BlockApply {
168                    block,
169                    pred_block,
170                    skip_verification,
171                } => {
172                    let block_hash = block.hash().clone();
173                    let skip_verification = if skip_verification {
174                        Some(SkipVerification::All)
175                    } else {
176                        None
177                    };
178                    let result = ledger_ctx.block_apply(block, pred_block, skip_verification);
179                    LedgerWriteResponse::BlockApply { block_hash, result }
180                }
181                LedgerWriteRequest::Commit {
182                    ledgers_to_keep,
183                    root_snarked_ledger_updates,
184                    needed_protocol_states,
185                    new_root,
186                    new_best_tip,
187                } => {
188                    let best_tip_hash = new_best_tip.hash().clone();
189                    let result = ledger_ctx.commit(
190                        ledgers_to_keep,
191                        root_snarked_ledger_updates,
192                        needed_protocol_states,
193                        &new_root,
194                        &new_best_tip,
195                    );
196                    LedgerWriteResponse::Commit {
197                        best_tip_hash,
198                        result,
199                    }
200                }
201            }),
202            Self::Read(id, request) => LedgerResponse::Read(
203                id,
204                match request {
205                    LedgerReadRequest::DelegatorTable(ledger_hash, producer) => {
206                        let res = ledger_ctx
207                            .producers_with_delegates(&ledger_hash, |pub_key| {
208                                AccountPublicKey::from(pub_key.clone()) == producer
209                            })
210                            .and_then(|list| list.into_iter().next())
211                            .map(|(_, table)| {
212                                table
213                                    .into_iter()
214                                    .map(|(index, pub_key, balance)| (index, (pub_key, balance)))
215                                    .collect()
216                            });
217
218                        LedgerReadResponse::DelegatorTable(res)
219                    }
220                    LedgerReadRequest::GetNumAccounts(ledger_hash) => {
221                        let res = ledger_ctx.get_num_accounts(ledger_hash);
222                        LedgerReadResponse::GetNumAccounts(res)
223                    }
224                    LedgerReadRequest::GetChildHashesAtAddr(ledger_hash, addr) => {
225                        let res = ledger_ctx.get_child_hashes(ledger_hash, addr);
226                        LedgerReadResponse::GetChildHashesAtAddr(res)
227                    }
228                    LedgerReadRequest::GetChildAccountsAtAddr(ledger_hash, addr) => {
229                        let res = ledger_ctx.get_child_accounts(ledger_hash, addr);
230                        LedgerReadResponse::GetChildAccountsAtAddr(res)
231                    }
232                    LedgerReadRequest::GetStagedLedgerAuxAndPendingCoinbases(data) => {
233                        let res = ledger_ctx.staged_ledger_aux_and_pending_coinbase(
234                            &data.ledger_hash,
235                            data.protocol_states,
236                        );
237                        LedgerReadResponse::GetStagedLedgerAuxAndPendingCoinbases(res)
238                    }
239                    LedgerReadRequest::ScanStateSummary(ledger_hash) => {
240                        let res = ledger_ctx.scan_state_summary(&ledger_hash);
241                        LedgerReadResponse::ScanStateSummary(res)
242                    }
243                    LedgerReadRequest::GetAccounts(ledger_hash, account_ids, rpc_id) => {
244                        let res = ledger_ctx.get_accounts(ledger_hash, account_ids);
245                        LedgerReadResponse::GetAccounts(res, rpc_id)
246                    }
247                    LedgerReadRequest::AccountsForRpc(rpc_id, ledger_hash, account_query) => {
248                        let res = match &account_query {
249                            AccountQuery::All => ledger_ctx.get_accounts_for_rpc(ledger_hash, None),
250                            AccountQuery::SinglePublicKey(public_key) => ledger_ctx
251                                .get_accounts_for_rpc(ledger_hash, Some(public_key.clone())),
252                            AccountQuery::PubKeyWithTokenId(public_key, token_id_key_hash) => {
253                                let id = AccountId {
254                                    public_key: public_key.clone().try_into().unwrap(),
255                                    token_id: token_id_key_hash.clone().into(),
256                                };
257                                ledger_ctx.get_accounts(ledger_hash, vec![id])
258                            }
259                            AccountQuery::MultipleIds(ids) => {
260                                ledger_ctx.get_accounts(ledger_hash, ids.clone())
261                            }
262                        };
263
264                        LedgerReadResponse::AccountsForRpc(rpc_id, res, account_query)
265                    }
266                    LedgerReadRequest::GetLedgerStatus(rpc_id, ledger_hash) => {
267                        let res = ledger_ctx.get_num_accounts(ledger_hash).map(
268                            |(num_accounts, ledger_hash)| LedgerStatus {
269                                num_accounts,
270                                best_tip_staged_ledger_hash: ledger_hash,
271                            },
272                        );
273
274                        LedgerReadResponse::GetLedgerStatus(rpc_id, res)
275                    }
276                    LedgerReadRequest::GetAccountDelegators(rpc_id, ledger_hash, account_id) => {
277                        let res = ledger_ctx.get_account_delegators(&ledger_hash, &account_id);
278                        LedgerReadResponse::GetAccountDelegators(rpc_id, res)
279                    }
280                },
281            ),
282            LedgerRequest::AccountsSet {
283                snarked_ledger_hash,
284                parent,
285                accounts,
286            } => LedgerResponse::AccountsSet(ledger_ctx.accounts_set(
287                snarked_ledger_hash,
288                &parent,
289                accounts,
290            )),
291            LedgerRequest::ChildHashesGet {
292                snarked_ledger_hash,
293                parent,
294            } => {
295                let res = ledger_ctx.get_child_hashes(snarked_ledger_hash, parent);
296                LedgerResponse::ChildHashes(res)
297            }
298            LedgerRequest::ComputeSnarkedLedgerHashes {
299                snarked_ledger_hash,
300            } => {
301                ledger_ctx
302                    .compute_snarked_ledger_hashes(&snarked_ledger_hash)
303                    .unwrap();
304                LedgerResponse::Success
305            }
306            LedgerRequest::CopySnarkedLedgerContentsForSync {
307                origin_snarked_ledger_hash,
308                target_snarked_ledger_hash,
309                overwrite,
310            } => {
311                let origin_snarked_ledger_hash = origin_snarked_ledger_hash
312                    .iter()
313                    .find(|hash| ledger_ctx.contains_snarked_ledger(hash))
314                    .unwrap_or_else(|| {
315                        origin_snarked_ledger_hash
316                            .first()
317                            .expect("origin_snarked_ledger_hash cannot be empty")
318                    })
319                    .clone();
320
321                let res = ledger_ctx.copy_snarked_ledger_contents_for_sync(
322                    origin_snarked_ledger_hash,
323                    target_snarked_ledger_hash,
324                    overwrite,
325                );
326                LedgerResponse::SnarkedLedgerContentsCopied(res)
327            }
328            LedgerRequest::GetMask { ledger_hash } => {
329                LedgerResponse::LedgerMask(ledger_ctx.mask(&ledger_hash))
330            }
331            LedgerRequest::GetProducersWithDelegates {
332                ledger_hash,
333                filter,
334            } => {
335                let res = ledger_ctx.producers_with_delegates(&ledger_hash, filter);
336                LedgerResponse::ProducersWithDelegatesMap(res)
337            }
338            LedgerRequest::InsertGenesisLedger { mask } => {
339                ledger_ctx.insert_genesis_ledger(mask);
340                LedgerResponse::Success
341            }
342            LedgerRequest::StagedLedgerReconstructResult {
343                staged_ledger_hash,
344                result,
345            } => {
346                let result = match result {
347                    Err(err) => Err(err),
348                    Ok(ledger) => {
349                        ledger_ctx.staged_ledger_reconstruct_result_store(ledger);
350                        Ok(())
351                    }
352                };
353                LedgerResponse::Write(LedgerWriteResponse::StagedLedgerReconstruct {
354                    staged_ledger_hash,
355                    result,
356                })
357            }
358            LedgerRequest::AccountsGet {
359                ledger_hash,
360                account_ids,
361            } => {
362                let res = ledger_ctx.get_accounts(ledger_hash, account_ids);
363                LedgerResponse::AccountsGet(Ok(res))
364            }
365        }
366    }
367}
368
369struct LedgerRequestWithChan {
370    request: LedgerRequest,
371    responder: Option<std::sync::mpsc::SyncSender<LedgerResponse>>,
372}
373
374pub struct LedgerManager {
375    caller: LedgerCaller,
376    join_handle: thread::JoinHandle<LedgerCtx>,
377}
378
379#[derive(Clone)]
380pub(super) struct LedgerCaller(mpsc::TrackedUnboundedSender<LedgerRequestWithChan>);
381
382impl LedgerManager {
383    pub fn spawn(mut ledger_ctx: LedgerCtx) -> LedgerManager {
384        let (sender, mut receiver) = mpsc::tracked_unbounded_channel();
385        let caller = LedgerCaller(sender);
386        let ledger_caller = caller.clone();
387
388        let ledger_manager_loop = move || {
389            while let Some(msg) = receiver.blocking_recv() {
390                let LedgerRequestWithChan { request, responder } = msg.0;
391                let response = request.handle(&mut ledger_ctx, &ledger_caller, responder.is_some());
392                match (response, responder) {
393                    (LedgerResponse::Write(resp), None) => {
394                        ledger_ctx.send_write_response(resp);
395                    }
396                    (LedgerResponse::Write(resp), Some(responder)) => {
397                        ledger_ctx.send_write_response(resp.clone());
398                        let _ = responder.send(LedgerResponse::Write(resp));
399                    }
400                    (LedgerResponse::Read(id, resp), None) => {
401                        ledger_ctx.send_read_response(id, resp);
402                    }
403                    (LedgerResponse::Read(id, resp), Some(responder)) => {
404                        ledger_ctx.send_read_response(id, resp.clone());
405                        let _ = responder.send(LedgerResponse::Read(id, resp));
406                    }
407                    (resp, Some(responder)) => {
408                        let _ = responder.send(resp);
409                    }
410                    (_, None) => {}
411                }
412            }
413            ledger_ctx
414        };
415        let join_handle = thread::Builder::new()
416            .name("ledger-manager".into())
417            .spawn(ledger_manager_loop)
418            .expect("Failed: ledger manager");
419        LedgerManager {
420            caller,
421            join_handle,
422        }
423    }
424
425    pub fn pending_calls(&self) -> usize {
426        self.caller.0.len()
427    }
428
429    pub(super) fn call(&self, request: LedgerRequest) {
430        self.caller.call(request)
431    }
432
433    pub(super) fn call_sync(
434        &self,
435        request: LedgerRequest,
436    ) -> Result<LedgerResponse, std::sync::mpsc::RecvError> {
437        self.caller.call_sync(request)
438    }
439
440    pub async fn wait_for_stop(self) -> thread::Result<LedgerCtx> {
441        self.join_handle.join()
442    }
443
444    pub fn insert_genesis_ledger(&self, mask: Mask) {
445        self.call(LedgerRequest::InsertGenesisLedger { mask });
446    }
447
448    pub fn get_mask(&self, ledger_hash: &LedgerHash) -> Option<(Mask, bool)> {
449        match self.call_sync(LedgerRequest::GetMask {
450            ledger_hash: ledger_hash.clone(),
451        }) {
452            Ok(LedgerResponse::LedgerMask(mask)) => mask,
453            _ => panic!("get_mask failed"),
454        }
455    }
456
457    pub fn get_accounts(
458        &self,
459        ledger_hash: &LedgerHash,
460        account_ids: Vec<AccountId>,
461    ) -> Result<Vec<Account>, String> {
462        // TODO: this should be asynchronous
463        match self.call_sync(LedgerRequest::AccountsGet {
464            ledger_hash: ledger_hash.clone(),
465            account_ids,
466        }) {
467            Ok(LedgerResponse::AccountsGet(result)) => result,
468            _ => panic!("get_accounts failed"),
469        }
470    }
471
472    #[allow(clippy::type_complexity)]
473    pub fn producers_with_delegates(
474        &self,
475        ledger_hash: &LedgerHash,
476        filter: fn(&CompressedPubKey) -> bool,
477    ) -> Option<BTreeMap<AccountPublicKey, Vec<(ledger::AccountIndex, AccountPublicKey, u64)>>>
478    {
479        match self.call_sync(LedgerRequest::GetProducersWithDelegates {
480            ledger_hash: ledger_hash.clone(),
481            filter,
482        }) {
483            Ok(LedgerResponse::ProducersWithDelegatesMap(map)) => map,
484            _ => panic!("producers_with_delegates failed"),
485        }
486    }
487}
488
489impl LedgerCaller {
490    pub fn call(&self, request: LedgerRequest) {
491        self.0
492            .tracked_send(LedgerRequestWithChan {
493                request,
494                responder: None,
495            })
496            .unwrap();
497    }
498
499    fn call_sync(
500        &self,
501        request: LedgerRequest,
502    ) -> Result<LedgerResponse, std::sync::mpsc::RecvError> {
503        let (responder, receiver) = std::sync::mpsc::sync_channel(0);
504        self.0
505            .tracked_send(LedgerRequestWithChan {
506                request,
507                responder: Some(responder),
508            })
509            .unwrap();
510        receiver.recv()
511    }
512}
513
514fn format_response_error(method: &str, res: LedgerResponse) -> String {
515    format!("LedgerManager::{method}: unexpected response: {res:?}")
516}
517
518impl<T: LedgerService> TransitionFrontierSyncLedgerSnarkedService for T {
519    fn compute_snarked_ledger_hashes(
520        &self,
521        snarked_ledger_hash: &LedgerHash,
522    ) -> Result<(), String> {
523        self.ledger_manager()
524            .call_sync(LedgerRequest::ComputeSnarkedLedgerHashes {
525                snarked_ledger_hash: snarked_ledger_hash.clone(),
526            })
527            .map_err(|_| "compute_snarked_ledger_hashes responder dropped".to_owned())
528            .and_then(|res| {
529                if let LedgerResponse::Success = res {
530                    Ok(())
531                } else {
532                    Err(format_response_error("compute_snarked_ledger_hashes", res))
533                }
534            })
535    }
536
537    fn copy_snarked_ledger_contents_for_sync(
538        &self,
539        origin_snarked_ledger_hash: Vec<LedgerHash>,
540        target_snarked_ledger_hash: LedgerHash,
541        overwrite: bool,
542    ) -> Result<bool, String> {
543        self.ledger_manager()
544            .call_sync(LedgerRequest::CopySnarkedLedgerContentsForSync {
545                origin_snarked_ledger_hash,
546                target_snarked_ledger_hash,
547                overwrite,
548            })
549            .map_err(|_| "copy_snarked_ledger_contents_for_sync responder dropped".to_owned())
550            .and_then(|res| {
551                if let LedgerResponse::SnarkedLedgerContentsCopied(copied) = res {
552                    copied
553                } else {
554                    Err(format_response_error(
555                        "copy_snarked_ledger_contents_for_sync",
556                        res,
557                    ))
558                }
559            })
560    }
561
562    fn child_hashes_get(
563        &self,
564        snarked_ledger_hash: LedgerHash,
565        parent: &LedgerAddress,
566    ) -> Result<(LedgerHash, LedgerHash), String> {
567        self.ledger_manager()
568            .call_sync(LedgerRequest::ChildHashesGet {
569                snarked_ledger_hash,
570                parent: parent.clone(),
571            })
572            .map_err(|_| "child_hashes_get responder dropped".to_owned())
573            .and_then(|res| {
574                if let LedgerResponse::ChildHashes(Some(res)) = res {
575                    Ok(res)
576                } else {
577                    Err(format_response_error("child_hashes_get", res))
578                }
579            })
580    }
581
582    fn accounts_set(
583        &self,
584        snarked_ledger_hash: LedgerHash,
585        parent: &LedgerAddress,
586        accounts: Vec<MinaBaseAccountBinableArgStableV2>,
587    ) -> Result<LedgerHash, String> {
588        self.ledger_manager()
589            .call_sync(LedgerRequest::AccountsSet {
590                snarked_ledger_hash,
591                parent: parent.clone(),
592                accounts,
593            })
594            .map_err(|_| "accounts_set responder dropped".to_owned())
595            .and_then(|res| {
596                if let LedgerResponse::AccountsSet(res) = res {
597                    res
598                } else {
599                    Err(format_response_error("accounts_set", res))
600                }
601            })
602    }
603}