node/ledger/
ledger_manager.rs

1use super::{
2    read::{LedgerReadId, LedgerReadRequest, LedgerReadResponse, LedgerStatus},
3    write::{LedgerWriteRequest, LedgerWriteResponse},
4    LedgerCtx, LedgerService,
5};
6use crate::{
7    account::AccountPublicKey, ledger::LedgerAddress, rpc::AccountQuery,
8    transition_frontier::sync::ledger::snarked::TransitionFrontierSyncLedgerSnarkedService,
9};
10use ledger::{
11    staged_ledger::staged_ledger::{SkipVerification, StagedLedger},
12    Account, AccountId, Mask,
13};
14use mina_p2p_messages::v2::{self, LedgerHash, MinaBaseAccountBinableArgStableV2};
15use mina_signer::CompressedPubKey;
16use openmina_core::{channels::mpsc, thread};
17use std::collections::BTreeMap;
18
19/// The type enumerating different requests that can be made to the
20/// service. Each specific constructor has a specific response
21/// constructor associated with it. Unfortunately, this relationship
22/// can't be expressed in the Rust type system at the moment. For this
23/// reason this type is private while functions wrapping the whole call
24/// to the service are exposed as the service's methods.
25#[allow(dead_code)] // TODO
26pub(super) enum LedgerRequest {
27    Write(LedgerWriteRequest),
28    Read(LedgerReadId, LedgerReadRequest),
29    AccountsSet {
30        snarked_ledger_hash: LedgerHash,
31        parent: LedgerAddress,
32        accounts: Vec<MinaBaseAccountBinableArgStableV2>,
33    }, // expected response: LedgerHash
34    AccountsGet {
35        ledger_hash: LedgerHash,
36        account_ids: Vec<AccountId>,
37    }, // expected response: Vec<Account>
38    ChildHashesGet {
39        snarked_ledger_hash: LedgerHash,
40        parent: LedgerAddress,
41    }, // expected response: ChildHashes
42    ComputeSnarkedLedgerHashes {
43        snarked_ledger_hash: LedgerHash,
44    }, // expected response: Success
45    CopySnarkedLedgerContentsForSync {
46        origin_snarked_ledger_hash: Vec<LedgerHash>,
47        target_snarked_ledger_hash: LedgerHash,
48        overwrite: bool,
49    }, // expected response: SnarkedLedgerContentsCopied
50    GetProducersWithDelegates {
51        ledger_hash: LedgerHash,
52        filter: fn(&CompressedPubKey) -> bool,
53    }, // expected response: ProducersWithDelegatesMap
54    GetMask {
55        ledger_hash: LedgerHash,
56    }, // expected response: LedgerMask
57    InsertGenesisLedger {
58        mask: Mask,
59    },
60    StagedLedgerReconstructResult {
61        staged_ledger_hash: LedgerHash,
62        result: Result<StagedLedger, String>,
63    },
64}
65
66#[derive(Debug)]
67pub enum LedgerResponse {
68    Write(LedgerWriteResponse),
69    Read(LedgerReadId, LedgerReadResponse),
70    ChildHashes(Option<(LedgerHash, LedgerHash)>),
71    AccountsSet(Result<LedgerHash, String>),
72    AccountsGet(Result<Vec<Account>, String>),
73    LedgerMask(Option<(Mask, bool)>),
74    #[allow(clippy::type_complexity)]
75    ProducersWithDelegatesMap(
76        Option<BTreeMap<AccountPublicKey, Vec<(ledger::AccountIndex, AccountPublicKey, u64)>>>,
77    ),
78    SnarkedLedgerContentsCopied(Result<bool, String>),
79    Success, // operation was performed and result stored; nothing to return.
80}
81
82impl LedgerRequest {
83    fn handle(
84        self,
85        ledger_ctx: &mut LedgerCtx,
86        caller: &LedgerCaller,
87        force_sync: bool,
88    ) -> LedgerResponse {
89        match self {
90            Self::Write(request) => LedgerResponse::Write(match request {
91                LedgerWriteRequest::StagedLedgerReconstruct {
92                    snarked_ledger_hash,
93                    parts,
94                } => {
95                    if !force_sync {
96                        let caller = caller.clone();
97                        let cb = move |staged_ledger_hash, result| {
98                            caller.call(LedgerRequest::StagedLedgerReconstructResult {
99                                staged_ledger_hash,
100                                result,
101                            })
102                        };
103                        if let Err(e) =
104                            ledger_ctx.staged_ledger_reconstruct(snarked_ledger_hash, parts, cb)
105                        {
106                            openmina_core::log::inner::error!(
107                                "Failed to reconstruct staged ledger: {:?}",
108                                e
109                            );
110                            // TODO: Handle the error in the state machine
111                        }
112                        return LedgerResponse::Success;
113                    } else {
114                        let (staged_ledger_hash, result) = match ledger_ctx
115                            .staged_ledger_reconstruct_sync(snarked_ledger_hash, parts)
116                        {
117                            Ok(result) => result,
118                            Err(e) => (v2::LedgerHash::zero(), Err(String::from(e))),
119                        };
120
121                        LedgerWriteResponse::StagedLedgerReconstruct {
122                            staged_ledger_hash,
123                            result,
124                        }
125                    }
126                }
127                LedgerWriteRequest::StagedLedgerDiffCreate {
128                    pred_block,
129                    global_slot_since_genesis: global_slot,
130                    is_new_epoch,
131                    producer,
132                    delegator,
133                    coinbase_receiver,
134                    completed_snarks,
135                    supercharge_coinbase,
136                    transactions_by_fee,
137                } => {
138                    let pred_block_hash = pred_block.hash().clone();
139                    let global_slot_since_genesis = global_slot.clone();
140                    let result = ledger_ctx.staged_ledger_diff_create(
141                        pred_block,
142                        global_slot,
143                        is_new_epoch,
144                        producer,
145                        delegator,
146                        coinbase_receiver,
147                        completed_snarks,
148                        supercharge_coinbase,
149                        transactions_by_fee,
150                    );
151                    LedgerWriteResponse::StagedLedgerDiffCreate {
152                        pred_block_hash,
153                        global_slot_since_genesis,
154                        result: result.map(Into::into),
155                    }
156                }
157                LedgerWriteRequest::BlockApply {
158                    block,
159                    pred_block,
160                    skip_verification,
161                } => {
162                    let block_hash = block.hash().clone();
163                    let skip_verification = if skip_verification {
164                        Some(SkipVerification::All)
165                    } else {
166                        None
167                    };
168                    let result = ledger_ctx.block_apply(block, pred_block, skip_verification);
169                    LedgerWriteResponse::BlockApply { block_hash, result }
170                }
171                LedgerWriteRequest::Commit {
172                    ledgers_to_keep,
173                    root_snarked_ledger_updates,
174                    needed_protocol_states,
175                    new_root,
176                    new_best_tip,
177                } => {
178                    let best_tip_hash = new_best_tip.hash().clone();
179                    let result = ledger_ctx.commit(
180                        ledgers_to_keep,
181                        root_snarked_ledger_updates,
182                        needed_protocol_states,
183                        &new_root,
184                        &new_best_tip,
185                    );
186                    LedgerWriteResponse::Commit {
187                        best_tip_hash,
188                        result,
189                    }
190                }
191            }),
192            Self::Read(id, request) => LedgerResponse::Read(
193                id,
194                match request {
195                    LedgerReadRequest::DelegatorTable(ledger_hash, producer) => {
196                        let res = ledger_ctx
197                            .producers_with_delegates(&ledger_hash, |pub_key| {
198                                AccountPublicKey::from(pub_key.clone()) == producer
199                            })
200                            .and_then(|list| list.into_iter().next())
201                            .map(|(_, table)| {
202                                table
203                                    .into_iter()
204                                    .map(|(index, pub_key, balance)| (index, (pub_key, balance)))
205                                    .collect()
206                            });
207
208                        LedgerReadResponse::DelegatorTable(res)
209                    }
210                    LedgerReadRequest::GetNumAccounts(ledger_hash) => {
211                        let res = ledger_ctx.get_num_accounts(ledger_hash);
212                        LedgerReadResponse::GetNumAccounts(res)
213                    }
214                    LedgerReadRequest::GetChildHashesAtAddr(ledger_hash, addr) => {
215                        let res = ledger_ctx.get_child_hashes(ledger_hash, addr);
216                        LedgerReadResponse::GetChildHashesAtAddr(res)
217                    }
218                    LedgerReadRequest::GetChildAccountsAtAddr(ledger_hash, addr) => {
219                        let res = ledger_ctx.get_child_accounts(ledger_hash, addr);
220                        LedgerReadResponse::GetChildAccountsAtAddr(res)
221                    }
222                    LedgerReadRequest::GetStagedLedgerAuxAndPendingCoinbases(data) => {
223                        let res = ledger_ctx.staged_ledger_aux_and_pending_coinbase(
224                            &data.ledger_hash,
225                            data.protocol_states,
226                        );
227                        LedgerReadResponse::GetStagedLedgerAuxAndPendingCoinbases(res)
228                    }
229                    LedgerReadRequest::ScanStateSummary(ledger_hash) => {
230                        let res = ledger_ctx.scan_state_summary(&ledger_hash);
231                        LedgerReadResponse::ScanStateSummary(res)
232                    }
233                    LedgerReadRequest::GetAccounts(ledger_hash, account_ids, rpc_id) => {
234                        let res = ledger_ctx.get_accounts(ledger_hash, account_ids);
235                        LedgerReadResponse::GetAccounts(res, rpc_id)
236                    }
237                    LedgerReadRequest::AccountsForRpc(rpc_id, ledger_hash, account_query) => {
238                        let res = match &account_query {
239                            AccountQuery::All => ledger_ctx.get_accounts_for_rpc(ledger_hash, None),
240                            AccountQuery::SinglePublicKey(public_key) => ledger_ctx
241                                .get_accounts_for_rpc(ledger_hash, Some(public_key.clone())),
242                            AccountQuery::PubKeyWithTokenId(public_key, token_id_key_hash) => {
243                                let id = AccountId {
244                                    public_key: public_key.clone().try_into().unwrap(),
245                                    token_id: token_id_key_hash.clone().into(),
246                                };
247                                ledger_ctx.get_accounts(ledger_hash, vec![id])
248                            }
249                            AccountQuery::MultipleIds(ids) => {
250                                ledger_ctx.get_accounts(ledger_hash, ids.clone())
251                            }
252                        };
253
254                        LedgerReadResponse::AccountsForRpc(rpc_id, res, account_query)
255                    }
256                    LedgerReadRequest::GetLedgerStatus(rpc_id, ledger_hash) => {
257                        let res = ledger_ctx.get_num_accounts(ledger_hash).map(
258                            |(num_accounts, ledger_hash)| LedgerStatus {
259                                num_accounts,
260                                best_tip_staged_ledger_hash: ledger_hash,
261                            },
262                        );
263
264                        LedgerReadResponse::GetLedgerStatus(rpc_id, res)
265                    }
266                    LedgerReadRequest::GetAccountDelegators(rpc_id, ledger_hash, account_id) => {
267                        let res = ledger_ctx.get_account_delegators(&ledger_hash, &account_id);
268                        LedgerReadResponse::GetAccountDelegators(rpc_id, res)
269                    }
270                },
271            ),
272            LedgerRequest::AccountsSet {
273                snarked_ledger_hash,
274                parent,
275                accounts,
276            } => LedgerResponse::AccountsSet(ledger_ctx.accounts_set(
277                snarked_ledger_hash,
278                &parent,
279                accounts,
280            )),
281            LedgerRequest::ChildHashesGet {
282                snarked_ledger_hash,
283                parent,
284            } => {
285                let res = ledger_ctx.get_child_hashes(snarked_ledger_hash, parent);
286                LedgerResponse::ChildHashes(res)
287            }
288            LedgerRequest::ComputeSnarkedLedgerHashes {
289                snarked_ledger_hash,
290            } => {
291                ledger_ctx
292                    .compute_snarked_ledger_hashes(&snarked_ledger_hash)
293                    .unwrap();
294                LedgerResponse::Success
295            }
296            LedgerRequest::CopySnarkedLedgerContentsForSync {
297                origin_snarked_ledger_hash,
298                target_snarked_ledger_hash,
299                overwrite,
300            } => {
301                let origin_snarked_ledger_hash = origin_snarked_ledger_hash
302                    .iter()
303                    .find(|hash| ledger_ctx.contains_snarked_ledger(hash))
304                    .unwrap_or_else(|| {
305                        origin_snarked_ledger_hash
306                            .first()
307                            .expect("origin_snarked_ledger_hash cannot be empty")
308                    })
309                    .clone();
310
311                let res = ledger_ctx.copy_snarked_ledger_contents_for_sync(
312                    origin_snarked_ledger_hash,
313                    target_snarked_ledger_hash,
314                    overwrite,
315                );
316                LedgerResponse::SnarkedLedgerContentsCopied(res)
317            }
318            LedgerRequest::GetMask { ledger_hash } => {
319                LedgerResponse::LedgerMask(ledger_ctx.mask(&ledger_hash))
320            }
321            LedgerRequest::GetProducersWithDelegates {
322                ledger_hash,
323                filter,
324            } => {
325                let res = ledger_ctx.producers_with_delegates(&ledger_hash, filter);
326                LedgerResponse::ProducersWithDelegatesMap(res)
327            }
328            LedgerRequest::InsertGenesisLedger { mask } => {
329                ledger_ctx.insert_genesis_ledger(mask);
330                LedgerResponse::Success
331            }
332            LedgerRequest::StagedLedgerReconstructResult {
333                staged_ledger_hash,
334                result,
335            } => {
336                let result = match result {
337                    Err(err) => Err(err),
338                    Ok(ledger) => {
339                        ledger_ctx.staged_ledger_reconstruct_result_store(ledger);
340                        Ok(())
341                    }
342                };
343                LedgerResponse::Write(LedgerWriteResponse::StagedLedgerReconstruct {
344                    staged_ledger_hash,
345                    result,
346                })
347            }
348            LedgerRequest::AccountsGet {
349                ledger_hash,
350                account_ids,
351            } => {
352                let res = ledger_ctx.get_accounts(ledger_hash, account_ids);
353                LedgerResponse::AccountsGet(Ok(res))
354            }
355        }
356    }
357}
358
359struct LedgerRequestWithChan {
360    request: LedgerRequest,
361    responder: Option<std::sync::mpsc::SyncSender<LedgerResponse>>,
362}
363
364pub struct LedgerManager {
365    caller: LedgerCaller,
366    join_handle: thread::JoinHandle<LedgerCtx>,
367}
368
369#[derive(Clone)]
370pub(super) struct LedgerCaller(mpsc::TrackedUnboundedSender<LedgerRequestWithChan>);
371
372impl LedgerManager {
373    pub fn spawn(mut ledger_ctx: LedgerCtx) -> LedgerManager {
374        let (sender, mut receiver) = mpsc::tracked_unbounded_channel();
375        let caller = LedgerCaller(sender);
376        let ledger_caller = caller.clone();
377
378        let ledger_manager_loop = move || {
379            while let Some(msg) = receiver.blocking_recv() {
380                let LedgerRequestWithChan { request, responder } = msg.0;
381                let response = request.handle(&mut ledger_ctx, &ledger_caller, responder.is_some());
382                match (response, responder) {
383                    (LedgerResponse::Write(resp), None) => {
384                        ledger_ctx.send_write_response(resp);
385                    }
386                    (LedgerResponse::Write(resp), Some(responder)) => {
387                        ledger_ctx.send_write_response(resp.clone());
388                        let _ = responder.send(LedgerResponse::Write(resp));
389                    }
390                    (LedgerResponse::Read(id, resp), None) => {
391                        ledger_ctx.send_read_response(id, resp);
392                    }
393                    (LedgerResponse::Read(id, resp), Some(responder)) => {
394                        ledger_ctx.send_read_response(id, resp.clone());
395                        let _ = responder.send(LedgerResponse::Read(id, resp));
396                    }
397                    (resp, Some(responder)) => {
398                        let _ = responder.send(resp);
399                    }
400                    (_, None) => {}
401                }
402            }
403            ledger_ctx
404        };
405        let join_handle = thread::Builder::new()
406            .name("ledger-manager".into())
407            .spawn(ledger_manager_loop)
408            .expect("Failed: ledger manager");
409        LedgerManager {
410            caller,
411            join_handle,
412        }
413    }
414
415    pub fn pending_calls(&self) -> usize {
416        self.caller.0.len()
417    }
418
419    pub(super) fn call(&self, request: LedgerRequest) {
420        self.caller.call(request)
421    }
422
423    pub(super) fn call_sync(
424        &self,
425        request: LedgerRequest,
426    ) -> Result<LedgerResponse, std::sync::mpsc::RecvError> {
427        self.caller.call_sync(request)
428    }
429
430    pub async fn wait_for_stop(self) -> thread::Result<LedgerCtx> {
431        self.join_handle.join()
432    }
433
434    pub fn insert_genesis_ledger(&self, mask: Mask) {
435        self.call(LedgerRequest::InsertGenesisLedger { mask });
436    }
437
438    pub fn get_mask(&self, ledger_hash: &LedgerHash) -> Option<(Mask, bool)> {
439        match self.call_sync(LedgerRequest::GetMask {
440            ledger_hash: ledger_hash.clone(),
441        }) {
442            Ok(LedgerResponse::LedgerMask(mask)) => mask,
443            _ => panic!("get_mask failed"),
444        }
445    }
446
447    pub fn get_accounts(
448        &self,
449        ledger_hash: &LedgerHash,
450        account_ids: Vec<AccountId>,
451    ) -> Result<Vec<Account>, String> {
452        // TODO: this should be asynchronous
453        match self.call_sync(LedgerRequest::AccountsGet {
454            ledger_hash: ledger_hash.clone(),
455            account_ids,
456        }) {
457            Ok(LedgerResponse::AccountsGet(result)) => result,
458            _ => panic!("get_accounts failed"),
459        }
460    }
461
462    #[allow(clippy::type_complexity)]
463    pub fn producers_with_delegates(
464        &self,
465        ledger_hash: &LedgerHash,
466        filter: fn(&CompressedPubKey) -> bool,
467    ) -> Option<BTreeMap<AccountPublicKey, Vec<(ledger::AccountIndex, AccountPublicKey, u64)>>>
468    {
469        match self.call_sync(LedgerRequest::GetProducersWithDelegates {
470            ledger_hash: ledger_hash.clone(),
471            filter,
472        }) {
473            Ok(LedgerResponse::ProducersWithDelegatesMap(map)) => map,
474            _ => panic!("producers_with_delegates failed"),
475        }
476    }
477}
478
479impl LedgerCaller {
480    pub fn call(&self, request: LedgerRequest) {
481        self.0
482            .tracked_send(LedgerRequestWithChan {
483                request,
484                responder: None,
485            })
486            .unwrap();
487    }
488
489    fn call_sync(
490        &self,
491        request: LedgerRequest,
492    ) -> Result<LedgerResponse, std::sync::mpsc::RecvError> {
493        let (responder, receiver) = std::sync::mpsc::sync_channel(0);
494        self.0
495            .tracked_send(LedgerRequestWithChan {
496                request,
497                responder: Some(responder),
498            })
499            .unwrap();
500        receiver.recv()
501    }
502}
503
504fn format_response_error(method: &str, res: LedgerResponse) -> String {
505    format!("LedgerManager::{method}: unexpected response: {res:?}")
506}
507
508impl<T: LedgerService> TransitionFrontierSyncLedgerSnarkedService for T {
509    fn compute_snarked_ledger_hashes(
510        &self,
511        snarked_ledger_hash: &LedgerHash,
512    ) -> Result<(), String> {
513        self.ledger_manager()
514            .call_sync(LedgerRequest::ComputeSnarkedLedgerHashes {
515                snarked_ledger_hash: snarked_ledger_hash.clone(),
516            })
517            .map_err(|_| "compute_snarked_ledger_hashes responder dropped".to_owned())
518            .and_then(|res| {
519                if let LedgerResponse::Success = res {
520                    Ok(())
521                } else {
522                    Err(format_response_error("compute_snarked_ledger_hashes", res))
523                }
524            })
525    }
526
527    fn copy_snarked_ledger_contents_for_sync(
528        &self,
529        origin_snarked_ledger_hash: Vec<LedgerHash>,
530        target_snarked_ledger_hash: LedgerHash,
531        overwrite: bool,
532    ) -> Result<bool, String> {
533        self.ledger_manager()
534            .call_sync(LedgerRequest::CopySnarkedLedgerContentsForSync {
535                origin_snarked_ledger_hash,
536                target_snarked_ledger_hash,
537                overwrite,
538            })
539            .map_err(|_| "copy_snarked_ledger_contents_for_sync responder dropped".to_owned())
540            .and_then(|res| {
541                if let LedgerResponse::SnarkedLedgerContentsCopied(copied) = res {
542                    copied
543                } else {
544                    Err(format_response_error(
545                        "copy_snarked_ledger_contents_for_sync",
546                        res,
547                    ))
548                }
549            })
550    }
551
552    fn child_hashes_get(
553        &self,
554        snarked_ledger_hash: LedgerHash,
555        parent: &LedgerAddress,
556    ) -> Result<(LedgerHash, LedgerHash), String> {
557        self.ledger_manager()
558            .call_sync(LedgerRequest::ChildHashesGet {
559                snarked_ledger_hash,
560                parent: parent.clone(),
561            })
562            .map_err(|_| "child_hashes_get responder dropped".to_owned())
563            .and_then(|res| {
564                if let LedgerResponse::ChildHashes(Some(res)) = res {
565                    Ok(res)
566                } else {
567                    Err(format_response_error("child_hashes_get", res))
568                }
569            })
570    }
571
572    fn accounts_set(
573        &self,
574        snarked_ledger_hash: LedgerHash,
575        parent: &LedgerAddress,
576        accounts: Vec<MinaBaseAccountBinableArgStableV2>,
577    ) -> Result<LedgerHash, String> {
578        self.ledger_manager()
579            .call_sync(LedgerRequest::AccountsSet {
580                snarked_ledger_hash,
581                parent: parent.clone(),
582                accounts,
583            })
584            .map_err(|_| "accounts_set responder dropped".to_owned())
585            .and_then(|res| {
586                if let LedgerResponse::AccountsSet(res) = res {
587                    res
588                } else {
589                    Err(format_response_error("accounts_set", res))
590                }
591            })
592    }
593}