node/ledger/
ledger_service.rs

1use super::{
2    ledger_empty_hash_at_depth,
3    read::{LedgerReadId, LedgerReadRequest, LedgerReadResponse},
4    write::{CommitResult, LedgerWriteRequest, LedgerWriteResponse, LedgersToKeep},
5    LedgerAddress, LedgerEvent, LEDGER_DEPTH,
6};
7use crate::{
8    account::AccountPublicKey,
9    block_producer_effectful::StagedLedgerDiffCreateOutput,
10    ledger::{
11        ledger_manager::{LedgerManager, LedgerRequest},
12        write::{BlockApplyResult, BlockApplyResultArchive},
13    },
14    p2p::channels::rpc::StagedLedgerAuxAndPendingCoinbases,
15    rpc::{
16        RpcScanStateSummaryBlockTransaction, RpcScanStateSummaryScanStateJob,
17        RpcScanStateSummaryScanStateJobKind, RpcSnarkPoolJobSnarkWorkDone,
18    },
19    transition_frontier::{
20        genesis::empty_pending_coinbase_hash,
21        sync::{
22            ledger::staged::StagedLedgerAuxAndPendingCoinbasesValid,
23            TransitionFrontierRootSnarkedLedgerUpdates,
24        },
25    },
26};
27use ledger::{
28    scan_state::{
29        currency::Slot,
30        scan_state::{AvailableJobMessage, JobValueBase, JobValueMerge, JobValueWithIndex, Pass},
31        transaction_logic::{
32            local_state::LocalState,
33            protocol_state::{protocol_state_view, ProtocolStateView},
34            transaction_partially_applied::TransactionPartiallyApplied,
35            valid,
36            zkapp_command::AccessedOrNot,
37            Transaction, TransactionStatus, UserCommand,
38        },
39    },
40    sparse_ledger::SparseLedger,
41    staged_ledger::{
42        diff::Diff,
43        staged_ledger::{SkipVerification, StagedLedger},
44        validate_block::block_body_hash,
45    },
46    verifier::Verifier,
47    Account, AccountId, AccountIndex, BaseLedger, Database, Mask, TokenId, UnregisterBehavior,
48};
49use mina_core::{
50    block::{AppliedBlock, ArcBlockWithHash},
51    bug_condition,
52    constants::constraint_constants,
53    snark::{Snark, SnarkJobId},
54    thread,
55};
56use mina_curves::pasta::Fp;
57use mina_p2p_messages::{
58    bigint::InvalidBigInt,
59    binprot::BinProtRead,
60    list::List,
61    v2::{
62        self, DataHashLibStateHashStableV1, LedgerHash, MinaBaseLedgerHash0StableV1,
63        MinaBasePendingCoinbaseStableV2, MinaBasePendingCoinbaseWitnessStableV2,
64        MinaBaseSokMessageStableV1, MinaBaseStagedLedgerHashStableV1,
65        MinaStateBlockchainStateValueStableV2LedgerProofStatement,
66        MinaStateProtocolStateValueStableV2, MinaTransactionTransactionStableV2, NonZeroCurvePoint,
67        StateHash,
68    },
69};
70use mina_signer::CompressedPubKey;
71use std::{
72    collections::{BTreeMap, BTreeSet},
73    path::Path,
74    sync::Arc,
75};
76
77fn merkle_root(mask: &mut Mask) -> LedgerHash {
78    MinaBaseLedgerHash0StableV1(mask.merkle_root().into()).into()
79}
80
81fn error_to_string(e: InvalidBigInt) -> String {
82    format!("{:?}", e)
83}
84
85/// Indexing `StagedLedger` both by their "merkle root hash" and their "staged ledger hash"
86#[derive(Default)]
87struct StagedLedgersStorage {
88    /// 1 merkle root hash can refer to 1 or more `StagedLedger`
89    by_merkle_root_hash: BTreeMap<LedgerHash, Vec<Arc<MinaBaseStagedLedgerHashStableV1>>>,
90    staged_ledgers: BTreeMap<Arc<MinaBaseStagedLedgerHashStableV1>, StagedLedger>,
91}
92
93impl StagedLedgersStorage {
94    /// Slow, it will recompute the full staged ledger hash
95    /// Prefer `Self::insert` when you have the "staged ledger hash" around
96    fn insert_by_recomputing_hash(&mut self, mut staged_ledger: StagedLedger) {
97        let staged_ledger_hash: MinaBaseStagedLedgerHashStableV1 = (&staged_ledger.hash()).into();
98        self.insert(Arc::new(staged_ledger_hash), staged_ledger);
99    }
100
101    fn insert(
102        &mut self,
103        staged_ledger_hash: Arc<MinaBaseStagedLedgerHashStableV1>,
104        staged_ledger: StagedLedger,
105    ) {
106        let merkle_root_hash: LedgerHash = merkle_root(&mut staged_ledger.ledger());
107        self.by_merkle_root_hash
108            .entry(merkle_root_hash.clone())
109            .or_insert_with(|| Vec::with_capacity(1))
110            .extend([staged_ledger_hash.clone()]);
111        self.staged_ledgers
112            .insert(staged_ledger_hash, staged_ledger);
113    }
114
115    fn get_mask(&self, root_hash: &LedgerHash) -> Option<Mask> {
116        let staged_ledger_hashes: &Vec<_> = self.by_merkle_root_hash.get(root_hash)?;
117        // Note: there can be multiple `staged_ledger_hashes`, but they all have the same
118        // mask, so we just take the 1st one
119        self.staged_ledgers
120            .get(staged_ledger_hashes.first()?)
121            .map(|staged_ledger| staged_ledger.ledger())
122    }
123
124    fn get(&self, staged_ledger_hash: &MinaBaseStagedLedgerHashStableV1) -> Option<&StagedLedger> {
125        self.staged_ledgers.get(staged_ledger_hash)
126    }
127
128    fn get_mut(
129        &mut self,
130        staged_ledger_hash: &MinaBaseStagedLedgerHashStableV1,
131    ) -> Option<&mut StagedLedger> {
132        self.staged_ledgers.get_mut(staged_ledger_hash)
133    }
134
135    fn retain<F>(&mut self, fun: F)
136    where
137        F: Fn(&MinaBaseStagedLedgerHashStableV1) -> bool,
138    {
139        self.by_merkle_root_hash.retain(|_, staged_ledger_hashes| {
140            staged_ledger_hashes.retain(|hash| {
141                if fun(hash) {
142                    return true;
143                }
144                self.staged_ledgers.remove(hash);
145                false
146            });
147            !staged_ledger_hashes.is_empty()
148        });
149    }
150
151    fn extend<I>(&mut self, iterator: I)
152    where
153        I: IntoIterator<Item = (Arc<MinaBaseStagedLedgerHashStableV1>, StagedLedger)>,
154    {
155        for (hash, staged_ledger) in iterator.into_iter() {
156            self.insert(hash, staged_ledger);
157        }
158    }
159
160    fn take(&mut self) -> BTreeMap<Arc<MinaBaseStagedLedgerHashStableV1>, StagedLedger> {
161        let Self {
162            by_merkle_root_hash,
163            staged_ledgers,
164        } = self;
165
166        let _ = std::mem::take(by_merkle_root_hash);
167        std::mem::take(staged_ledgers)
168    }
169}
170
171#[derive(Default)]
172pub struct LedgerCtx {
173    snarked_ledgers: BTreeMap<LedgerHash, Mask>,
174    /// Additional snarked ledgers specified at startup (loaded from disk)
175    additional_snarked_ledgers: BTreeMap<LedgerHash, Mask>,
176    staged_ledgers: StagedLedgersStorage,
177    sync: LedgerSyncState,
178    /// Returns more data on block application necessary for archive node
179    archive_mode: bool,
180    event_sender: Option<mina_core::channels::mpsc::UnboundedSender<crate::event_source::Event>>,
181}
182
183#[derive(Default)]
184struct LedgerSyncState {
185    snarked_ledgers: BTreeMap<LedgerHash, Mask>,
186    staged_ledgers: StagedLedgersStorage,
187}
188
189impl LedgerCtx {
190    pub fn new_with_additional_snarked_ledgers<P>(path: P) -> Self
191    where
192        P: AsRef<Path>,
193    {
194        use std::fs;
195
196        let Ok(dir) = fs::read_dir(path) else {
197            return Self::default();
198        };
199
200        let additional_snarked_ledgers = dir
201            .filter_map(|entry| {
202                let entry = entry.ok()?;
203                let hash = entry.file_name().to_str()?.parse().ok()?;
204                let mut file = fs::File::open(entry.path()).ok()?;
205
206                let _ = Option::<LedgerHash>::binprot_read(&mut file).ok()?;
207
208                let accounts = Vec::<Account>::binprot_read(&mut file).ok()?;
209                let mut mask = Mask::new_root(Database::create(35));
210                for account in accounts {
211                    let account_id = account.id();
212                    mask.get_or_create_account(account_id, account).unwrap();
213                }
214                Some((hash, mask))
215            })
216            .collect();
217
218        LedgerCtx {
219            additional_snarked_ledgers,
220            ..Default::default()
221        }
222    }
223
224    pub fn set_archive_mode(&mut self) {
225        self.archive_mode = true;
226    }
227
228    // TODO(tizoc): Only used for the current workaround to make staged ledger
229    // reconstruction async, can be removed when the ledger services are made async
230    pub fn set_event_sender(
231        &mut self,
232        event_sender: mina_core::channels::mpsc::UnboundedSender<crate::event_source::Event>,
233    ) {
234        self.event_sender = Some(event_sender);
235    }
236
237    pub(super) fn send_event(&self, event: LedgerEvent) {
238        if let Some(tx) = self.event_sender.as_ref() {
239            let _ = tx.send(event.into());
240        }
241    }
242
243    pub(super) fn send_write_response(&self, resp: LedgerWriteResponse) {
244        self.send_event(LedgerEvent::Write(resp))
245    }
246
247    pub(super) fn send_read_response(&self, id: LedgerReadId, resp: LedgerReadResponse) {
248        self.send_event(LedgerEvent::Read(id, resp))
249    }
250
251    pub fn insert_genesis_ledger(&mut self, mut mask: Mask) {
252        let merkle_root_hash = merkle_root(&mut mask);
253        let staged_ledger =
254            StagedLedger::create_exn(constraint_constants().clone(), mask.copy()).unwrap();
255        self.snarked_ledgers.insert(merkle_root_hash.clone(), mask);
256        // The genesis ledger is a specific case, some of its hashes are zero
257        let staged_ledger_hash =
258            MinaBaseStagedLedgerHashStableV1::zero(merkle_root_hash, empty_pending_coinbase_hash());
259        self.staged_ledgers
260            .insert(Arc::new(staged_ledger_hash), staged_ledger);
261    }
262
263    pub fn staged_ledger_reconstruct_result_store(&mut self, ledger: StagedLedger) {
264        self.staged_ledgers.insert_by_recomputing_hash(ledger);
265    }
266
267    // TODO(adonagy): Uh-oh, clean this up
268    pub fn get_accounts_for_rpc(
269        &self,
270        ledger_hash: LedgerHash,
271        requested_public_key: Option<AccountPublicKey>,
272    ) -> Vec<Account> {
273        if let Some((mask, _)) = self.mask(&ledger_hash) {
274            let mut accounts = Vec::new();
275            let mut single_account = Vec::new();
276
277            mask.iter(|account| {
278                accounts.push(account.clone());
279                if let Some(public_key) = requested_public_key.as_ref() {
280                    if public_key == &AccountPublicKey::from(account.public_key.clone()) {
281                        single_account.push(account.clone());
282                    }
283                }
284            });
285
286            if requested_public_key.is_some() {
287                single_account
288            } else {
289                accounts
290            }
291        } else {
292            vec![]
293        }
294    }
295
296    // TODO(tizoc): explain when `is_synced` is `true` and when it is `false`. Also use something else than a boolean.
297    /// Returns a tuple of `(mask, is_synced)` for a [Mask] with the specified `hash` if it exists or `None` otherwise.
298    pub fn mask(&self, hash: &LedgerHash) -> Option<(Mask, bool)> {
299        self.snarked_ledgers
300            .get(hash)
301            .cloned()
302            .map(|mask| (mask, true))
303            .or_else(|| Some((self.staged_ledgers.get_mask(hash)?, true)))
304            .or_else(|| self.sync.mask(hash))
305            .or_else(|| {
306                self.additional_snarked_ledgers
307                    .get(hash)
308                    .map(|l| (l.clone(), true))
309            })
310    }
311
312    pub fn contains_snarked_ledger(&self, hash: &LedgerHash) -> bool {
313        self.snarked_ledgers.contains_key(hash)
314    }
315
316    /// Returns the mask for a snarked ledger being synchronized or an error if it is not present
317    pub fn pending_sync_snarked_ledger_mask(&self, hash: &LedgerHash) -> Result<Mask, String> {
318        self.sync.pending_sync_snarked_ledger_mask(hash)
319    }
320
321    /// Copies the contents of an existing snarked ledger into the target
322    /// hash under the pending sync snarked ledgers state.
323    pub fn copy_snarked_ledger_contents_for_sync(
324        &mut self,
325        origin_snarked_ledger_hash: LedgerHash,
326        target_snarked_ledger_hash: LedgerHash,
327        overwrite: bool,
328    ) -> Result<bool, String> {
329        if !overwrite
330            && self
331                .sync
332                .snarked_ledgers
333                .contains_key(&target_snarked_ledger_hash)
334        {
335            return Ok(false);
336        }
337
338        let origin = self
339            .snarked_ledgers
340            .get(&origin_snarked_ledger_hash)
341            .or_else(|| {
342                // If it doesn't exist in completed ledgers, it may be
343                // an in-progress ledger from a previous attempt that we can reuse
344                self.sync.snarked_ledgers.get(&origin_snarked_ledger_hash)
345            })
346            .ok_or(format!(
347                "Tried to copy from non-existing snarked ledger with hash: {}",
348                origin_snarked_ledger_hash
349            ))?;
350
351        let target = origin.copy();
352        self.sync
353            .snarked_ledgers
354            .insert(target_snarked_ledger_hash, target);
355
356        Ok(true)
357    }
358
359    pub fn compute_snarked_ledger_hashes(
360        &mut self,
361        snarked_ledger_hash: &LedgerHash,
362    ) -> Result<(), String> {
363        let origin = self
364            .snarked_ledgers
365            .get_mut(snarked_ledger_hash)
366            .or_else(|| self.sync.snarked_ledgers.get_mut(snarked_ledger_hash))
367            .ok_or(format!(
368                "Cannot hash non-existing snarked ledger: {}",
369                snarked_ledger_hash
370            ))?;
371
372        // Our ledger is lazy when it comes to hashing, but retrieving the
373        // merkle root hash forces all pending hashes to be computed.
374        let _force_hashing = origin.merkle_root();
375
376        Ok(())
377    }
378
379    /// Returns a mutable reference to the [StagedLedger] with the specified `hash` if it exists or `None` otherwise.
380    fn staged_ledger_mut(
381        &mut self,
382        hash: &MinaBaseStagedLedgerHashStableV1,
383    ) -> Option<&mut StagedLedger> {
384        match self.staged_ledgers.get_mut(hash) {
385            Some(v) => Some(v),
386            None => self.sync.staged_ledger_mut(hash),
387        }
388    }
389
390    fn recreate_snarked_ledger(
391        &mut self,
392        root_snarked_ledger_updates: &TransitionFrontierRootSnarkedLedgerUpdates,
393        needed_protocol_states: &BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
394        snarked_ledger_hash: &LedgerHash,
395    ) -> Result<(), String> {
396        let Some(update) = root_snarked_ledger_updates.get(snarked_ledger_hash) else {
397            return Ok(());
398        };
399        self.recreate_snarked_ledger(
400            root_snarked_ledger_updates,
401            needed_protocol_states,
402            &update.parent,
403        )?;
404
405        self.push_snarked_ledger(
406            needed_protocol_states,
407            &update.parent,
408            snarked_ledger_hash,
409            &update.staged_ledger_hash,
410        )
411    }
412
413    fn push_snarked_ledger(
414        &mut self,
415        protocol_states: &BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
416        old_root_snarked_ledger_hash: &LedgerHash,
417        new_root_snarked_ledger_hash: &LedgerHash,
418        new_root_staged_ledger_hash: &MinaBaseStagedLedgerHashStableV1,
419    ) -> Result<(), String> {
420        mina_core::debug!(mina_core::log::system_time();
421            kind = "LedgerService::push_snarked_ledger",
422            summary = format!("{old_root_snarked_ledger_hash} -> {new_root_snarked_ledger_hash}"));
423        // Steps 4-7 from https://github.com/openmina/mina/blob/bc812dc9b90e05898c0c36ac76ba51ccf6cac137/src/lib/transition_frontier/full_frontier/full_frontier.ml#L354-L392
424        let constraint_constants = &constraint_constants();
425
426        // Step 4: create a new temporary mask `mt` with `s` as it's parent
427        let root_snarked_ledger = self
428            .snarked_ledgers
429            .get(old_root_snarked_ledger_hash)
430            .or_else(|| self.sync.snarked_ledgers.get(old_root_snarked_ledger_hash))
431            .ok_or_else(|| {
432                format!(
433                    "push_snarked_ledger: could not find old root snarked ledger: {}",
434                    old_root_snarked_ledger_hash,
435                )
436            })?;
437        let mut mt = root_snarked_ledger.make_child();
438
439        // Step 5: apply any transactions to `mt` that appear in the transition between `s` and `s'`
440        let apply_first_pass = |global_slot: Slot,
441                                txn_state_view: &ProtocolStateView,
442                                ledger: &mut Mask,
443                                transaction: &Transaction| {
444            ledger::scan_state::transaction_logic::apply_transaction_first_pass(
445                constraint_constants,
446                global_slot,
447                txn_state_view,
448                ledger,
449                transaction,
450            )
451        };
452
453        let apply_second_pass = |ledger: &mut Mask, tx: TransactionPartiallyApplied<Mask>| {
454            ledger::scan_state::transaction_logic::apply_transaction_second_pass(
455                constraint_constants,
456                ledger,
457                tx,
458            )
459        };
460
461        let apply_first_pass_sparse_ledger =
462            |global_slot: Slot,
463             txn_state_view: &ProtocolStateView,
464             sparse_ledger: &mut SparseLedger,
465             transaction: &Transaction| {
466                ledger::scan_state::transaction_logic::apply_transaction_first_pass(
467                    constraint_constants,
468                    global_slot,
469                    txn_state_view,
470                    sparse_ledger,
471                    transaction,
472                )
473            };
474
475        let get_protocol_state = |state_hash: Fp| {
476            let state_hash = StateHash::from_fp(state_hash);
477            if let Some(s) = protocol_states.get(&state_hash) {
478                Ok(s.clone())
479            } else {
480                Err(format!(
481                    "Failed to find protocol state for state hash: {}",
482                    state_hash
483                ))
484            }
485        };
486
487        let scan_state = self
488            .staged_ledger_mut(new_root_staged_ledger_hash)
489            .ok_or_else(|| {
490                format!(
491                    "Failed to find staged ledger with hash: {:#?}",
492                    new_root_staged_ledger_hash
493                )
494            })?
495            .scan_state();
496
497        let Pass::FirstPassLedgerHash(_first_pass_ledger_target) = scan_state
498            .get_snarked_ledger_sync(
499                &mut mt,
500                get_protocol_state,
501                apply_first_pass,
502                apply_second_pass,
503                apply_first_pass_sparse_ledger,
504            )?;
505
506        // Assert that the obtained ledger is the one we expect
507        let expected_hash = new_root_snarked_ledger_hash;
508        let obtained_hash = LedgerHash::from_fp(mt.merkle_root());
509
510        if expected_hash != &obtained_hash {
511            return Err(format!(
512                "Expected to obtain snarked root ledger hash {} but got {}",
513                expected_hash, obtained_hash
514            ));
515        }
516
517        self.sync
518            .snarked_ledgers
519            .insert(new_root_snarked_ledger_hash.clone(), mt);
520
521        Ok(())
522    }
523
524    pub fn get_account_delegators(
525        &self,
526        ledger_hash: &LedgerHash,
527        account_id: &AccountId,
528    ) -> Option<Vec<Account>> {
529        let (mask, _) = self.mask(ledger_hash)?;
530        let mut accounts = Vec::new();
531
532        mask.iter(|account| {
533            if account.delegate == Some(account_id.public_key.clone()) {
534                accounts.push(account.clone());
535            }
536        });
537
538        Some(accounts)
539    }
540
541    #[allow(clippy::type_complexity)]
542    pub fn producers_with_delegates<F: FnMut(&CompressedPubKey) -> bool>(
543        &self,
544        ledger_hash: &LedgerHash,
545        mut filter: F,
546    ) -> Option<BTreeMap<AccountPublicKey, Vec<(ledger::AccountIndex, AccountPublicKey, u64)>>>
547    {
548        let (mask, _) = self.mask(ledger_hash)?;
549        let mut accounts = Vec::new();
550
551        mask.iter(|account| {
552            if filter(account.delegate.as_ref().unwrap_or(&account.public_key)) {
553                accounts.push((
554                    account.id(),
555                    account.delegate.clone(),
556                    account.balance.as_u64(),
557                ))
558            }
559        });
560
561        let producers = accounts.into_iter().fold(
562            BTreeMap::<_, Vec<_>>::new(),
563            |mut producers, (id, delegate, balance)| {
564                let index = mask.index_of_account(id.clone()).unwrap();
565                let pub_key = AccountPublicKey::from(id.public_key);
566                let producer = delegate.map(Into::into).unwrap_or(pub_key.clone());
567                producers
568                    .entry(producer)
569                    .or_default()
570                    .push((index, pub_key, balance));
571                producers
572            },
573        );
574        Some(producers)
575    }
576
577    pub fn child_hashes_get(
578        &mut self,
579        snarked_ledger_hash: LedgerHash,
580        parent: &LedgerAddress,
581    ) -> Result<(LedgerHash, LedgerHash), String> {
582        let mut mask = self.pending_sync_snarked_ledger_mask(&snarked_ledger_hash)?;
583        let left_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.child_left())?);
584        let right_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.child_right())?);
585
586        Ok((left_hash, right_hash))
587    }
588
589    pub fn accounts_set(
590        &mut self,
591        snarked_ledger_hash: LedgerHash,
592        parent: &LedgerAddress,
593        accounts: Vec<v2::MinaBaseAccountBinableArgStableV2>,
594    ) -> Result<LedgerHash, String> {
595        let mut mask = self.pending_sync_snarked_ledger_mask(&snarked_ledger_hash)?;
596        let accounts: Vec<_> = accounts
597            .into_iter()
598            .map(|account| Ok(Box::new((&account).try_into()?)))
599            .collect::<Result<Vec<_>, InvalidBigInt>>()
600            .map_err(error_to_string)?;
601
602        mask.set_all_accounts_rooted_at(parent.clone(), &accounts)
603            .map_err(|_| "Failed when setting accounts".to_owned())?;
604
605        let computed_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.clone())?);
606
607        Ok(computed_hash)
608    }
609
610    pub fn staged_ledger_reconstruct<F>(
611        &mut self,
612        snarked_ledger_hash: LedgerHash,
613        parts: Option<Arc<StagedLedgerAuxAndPendingCoinbasesValid>>,
614        callback: F,
615    ) -> Result<(), InvalidBigInt>
616    where
617        F: 'static + FnOnce(v2::LedgerHash, Result<StagedLedger, String>) + Send,
618    {
619        let snarked_ledger = self
620            .sync
621            .snarked_ledger_mut(snarked_ledger_hash.clone())?
622            .copy();
623
624        thread::Builder::new()
625            .name("staged-ledger-reconstruct".into())
626            .spawn(move || {
627                match staged_ledger_reconstruct(snarked_ledger, snarked_ledger_hash, parts) {
628                    Ok((staged_ledger_hash, result)) => {
629                        callback(staged_ledger_hash, result);
630                    }
631                    Err(e) => callback(v2::LedgerHash::zero(), Err(format!("{:?}", e))),
632                }
633            })
634            .expect("Failed: staged ledger reconstruct thread");
635
636        Ok(())
637    }
638
639    pub fn staged_ledger_reconstruct_sync(
640        &mut self,
641        snarked_ledger_hash: LedgerHash,
642        parts: Option<Arc<StagedLedgerAuxAndPendingCoinbasesValid>>,
643    ) -> Result<(v2::LedgerHash, Result<(), String>), InvalidBigInt> {
644        let snarked_ledger = self
645            .sync
646            .snarked_ledger_mut(snarked_ledger_hash.clone())?
647            .copy();
648        let (staged_ledger_hash, result) =
649            staged_ledger_reconstruct(snarked_ledger, snarked_ledger_hash, parts)?;
650        let result = match result {
651            Err(err) => Err(err),
652            Ok(staged_ledger) => {
653                self.staged_ledger_reconstruct_result_store(staged_ledger);
654                Ok(())
655            }
656        };
657
658        Ok((staged_ledger_hash, result))
659    }
660
661    pub fn block_apply(
662        &mut self,
663        block: ArcBlockWithHash,
664        pred_block: AppliedBlock,
665        skip_verification: Option<SkipVerification>,
666    ) -> Result<BlockApplyResult, String> {
667        mina_core::info!(mina_core::log::system_time();
668            kind = "LedgerService::block_apply",
669            summary = format!("{}, {} <- {}", block.height(), block.hash(), block.pred_hash()),
670            pred_staged_ledger_hash = pred_block.merkle_root_hash().to_string(),
671            staged_ledger_hash = block.merkle_root_hash().to_string(),
672        );
673        let mut staged_ledger = self
674            .staged_ledger_mut(pred_block.staged_ledger_hashes())
675            .ok_or_else(|| {
676                format!(
677                    "parent staged ledger missing: {:#?}",
678                    pred_block.staged_ledger_hashes()
679                )
680            })?
681            .clone();
682
683        let global_slot = block.global_slot_since_genesis();
684        let prev_protocol_state = &pred_block.header().protocol_state;
685        let prev_state_view = protocol_state_view(prev_protocol_state).map_err(error_to_string)?;
686
687        let consensus_state = &block.header().protocol_state.body.consensus_state;
688        let coinbase_receiver: CompressedPubKey = (&consensus_state.coinbase_receiver)
689            .try_into()
690            .map_err(error_to_string)?;
691        let supercharge_coinbase = consensus_state.supercharge_coinbase;
692
693        let diff: Diff = (&block.body().staged_ledger_diff)
694            .try_into()
695            .map_err(error_to_string)?;
696
697        let prev_protocol_state: ledger::proofs::block::ProtocolState =
698            prev_protocol_state.try_into()?;
699
700        let result = staged_ledger
701            .apply(
702                skip_verification,
703                constraint_constants(),
704                Slot::from_u32(global_slot),
705                diff,
706                (),
707                &Verifier,
708                &prev_state_view,
709                prev_protocol_state.hashes(),
710                coinbase_receiver.clone(),
711                supercharge_coinbase,
712            )
713            .map_err(|err| format!("{err:?}"))?;
714        let just_emitted_a_proof = result.ledger_proof.is_some();
715        let ledger_hashes = MinaBaseStagedLedgerHashStableV1::from(&result.hash_after_applying);
716
717        // TODO(binier): return error if not matching.
718        let expected_ledger_hashes = block.staged_ledger_hashes();
719        if &ledger_hashes != expected_ledger_hashes {
720            let staged_ledger = self
721                .staged_ledger_mut(pred_block.staged_ledger_hashes())
722                .unwrap(); // We already know the ledger exists, see the same call a few lines above
723
724            match dump_application_to_file(staged_ledger, block.clone(), pred_block) {
725                Ok(filename) => mina_core::info!(
726                    mina_core::log::system_time();
727                    kind = "LedgerService::dump - Failed application",
728                    summary = format!("StagedLedger and block saved to: {filename:?}")
729                ),
730                Err(e) => mina_core::error!(
731                    mina_core::log::system_time();
732                    kind = "LedgerService::dump - Failed application",
733                    summary = format!("Failed to save block application to file: {e:?}")
734                ),
735            }
736
737            panic!("staged ledger hash mismatch. found: {ledger_hashes:#?}, expected: {expected_ledger_hashes:#?}");
738        }
739
740        let archive_data = if self.archive_mode {
741            let senders = block
742                .body()
743                .transactions()
744                .filter_map(|tx| UserCommand::try_from(tx).ok().map(|cmd| cmd.fee_payer()))
745                .collect::<BTreeSet<_>>()
746                .into_iter();
747
748            let coinbase_receiver_id = AccountId::new(coinbase_receiver, TokenId::default());
749
750            // <https://github.com/MinaProtocol/mina/blob/85149735ca3a76d026e8cf36b8ff22941a048e31/src/app/archive/lib/diff.ml#L78>
751            let (accessed, not_accessed): (BTreeSet<_>, BTreeSet<_>) = block
752                .body()
753                .tranasctions_with_status()
754                .flat_map(|(tx, status)| {
755                    let status: TransactionStatus = status.into();
756                    UserCommand::try_from(tx)
757                        .ok()
758                        .map(|cmd| cmd.account_access_statuses(&status))
759                        .into_iter()
760                        .flatten()
761                })
762                .partition(|(_, status)| *status == AccessedOrNot::Accessed);
763
764            let mut account_ids_accessed: BTreeSet<_> =
765                accessed.into_iter().map(|(id, _)| id).collect();
766            let mut account_ids_not_accessed: BTreeSet<_> =
767                not_accessed.into_iter().map(|(id, _)| id).collect();
768
769            // Coinbase receiver is included only when the block has a coinbase transaction
770            // Note: If for whatever reason the network has set the coinbase amount to zero,
771            // to mimic the behavior of the ocaml node, we still include the coinbase receiver
772            // in the accessed accounts as a coinbase transaction is created regardless of the coinbase amount.
773            // <https://github.com/MinaProtocol/mina/blob/b595a2bf00ae138d745737da628bd94bb2bd91e2/src/lib/staged_ledger/pre_diff_info.ml#L139>
774            let has_coinbase = block.body().has_coinbase();
775
776            if has_coinbase {
777                account_ids_accessed.insert(coinbase_receiver_id);
778            } else {
779                account_ids_not_accessed.insert(coinbase_receiver_id);
780            }
781
782            // Include the coinbase fee transfer accounts
783            let fee_transfer_accounts =
784                block.body().coinbase_fee_transfers_iter().filter_map(|cb| {
785                    let receiver: CompressedPubKey = cb.receiver_pk.inner().try_into().ok()?;
786                    let account_id = AccountId::new(receiver, TokenId::default());
787                    Some(account_id)
788                });
789            account_ids_accessed.extend(fee_transfer_accounts);
790
791            // TODO(adonagy): Create a struct instead of tuple
792            let accounts_accessed: Vec<(AccountIndex, Account)> = account_ids_accessed
793                .iter()
794                .filter_map(|id| {
795                    staged_ledger
796                        .ledger()
797                        .index_of_account(id.clone())
798                        .and_then(|index| {
799                            staged_ledger
800                                .ledger()
801                                .get_at_index(index)
802                                .map(|account| (index, *account))
803                        })
804                })
805                .collect();
806
807            let account_creation_fee = constraint_constants().account_creation_fee;
808
809            // TODO(adonagy): Create a struct instead of tuple
810            let accounts_created: Vec<(AccountId, u64)> = staged_ledger
811                .latest_block_accounts_created(pred_block.hash().to_field()?)
812                .iter()
813                .map(|id| (id.clone(), account_creation_fee))
814                .collect();
815
816            // A token is used regardless of txn status
817            // <https://github.com/MinaProtocol/mina/blob/85149735ca3a76d026e8cf36b8ff22941a048e31/src/app/archive/lib/diff.ml#L114>
818            let all_account_ids: BTreeSet<_> = account_ids_accessed
819                .union(&account_ids_not_accessed)
820                .collect();
821            let tokens_used: BTreeSet<(TokenId, Option<AccountId>)> = if has_coinbase {
822                all_account_ids
823                    .iter()
824                    .map(|id| {
825                        let token_id = id.token_id.clone();
826                        let token_owner = staged_ledger.ledger().token_owner(token_id.clone());
827                        (token_id, token_owner)
828                    })
829                    .collect()
830            } else {
831                BTreeSet::new()
832            };
833
834            let sender_receipt_chains_from_parent_ledger = senders
835                .filter_map(|sender| {
836                    if let Some(location) = staged_ledger.ledger().location_of_account(&sender) {
837                        staged_ledger.ledger().get(location).map(|account| {
838                            (
839                                sender,
840                                v2::ReceiptChainHash::from(account.receipt_chain_hash),
841                            )
842                        })
843                    } else {
844                        None
845                    }
846                })
847                .collect();
848            Some(BlockApplyResultArchive {
849                accounts_accessed,
850                accounts_created,
851                tokens_used,
852                sender_receipt_chains_from_parent_ledger,
853            })
854        } else {
855            None
856        };
857
858        self.sync
859            .staged_ledgers
860            .insert(Arc::new(ledger_hashes), staged_ledger);
861
862        // staged_ledger.ledger().get_at_index(index)
863
864        Ok(BlockApplyResult {
865            block,
866            just_emitted_a_proof,
867            archive_data,
868        })
869    }
870
871    pub fn commit(
872        &mut self,
873        ledgers_to_keep: LedgersToKeep,
874        root_snarked_ledger_updates: TransitionFrontierRootSnarkedLedgerUpdates,
875        needed_protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
876        new_root: &ArcBlockWithHash,
877        new_best_tip: &ArcBlockWithHash,
878    ) -> CommitResult {
879        mina_core::debug!(mina_core::log::system_time();
880            kind = "LedgerService::commit",
881            summary = format!("commit {}, {}", new_best_tip.height(), new_best_tip.hash()),
882            new_root = format!("{}, {}", new_root.height(), new_root.hash()),
883            new_root_staking_epoch_ledger = new_root.staking_epoch_ledger_hash().to_string(),
884            new_root_next_epoch_ledger = new_root.next_epoch_ledger_hash().to_string(),
885            new_root_snarked_ledger = new_root.snarked_ledger_hash().to_string(),
886        );
887        self.recreate_snarked_ledger(
888            &root_snarked_ledger_updates,
889            &needed_protocol_states,
890            new_root.snarked_ledger_hash(),
891        )
892        .unwrap();
893
894        self.snarked_ledgers.retain(|hash, _| {
895            let keep = ledgers_to_keep.contains(hash);
896            if !keep {
897                mina_core::debug!(mina_core::log::system_time();
898                    kind = "LedgerService::commit - snarked_ledgers.drop",
899                    summary = format!("drop snarked ledger {hash}"));
900            }
901            keep
902        });
903        self.snarked_ledgers.extend(
904            std::mem::take(&mut self.sync.snarked_ledgers)
905                .into_iter()
906                .filter(|(hash, _)| {
907                    let keep = ledgers_to_keep.contains(hash);
908                    if !keep {
909                        mina_core::debug!(mina_core::log::system_time();
910                            kind = "LedgerService::commit - snarked_ledgers.drop",
911                            summary = format!("drop snarked ledger {hash}"));
912                    }
913                    keep
914                }),
915        );
916
917        self.staged_ledgers
918            .retain(|hash| ledgers_to_keep.contains(hash));
919        self.staged_ledgers.extend(
920            self.sync
921                .staged_ledgers
922                .take()
923                .into_iter()
924                .filter(|(hash, _)| ledgers_to_keep.contains(&**hash)),
925        );
926
927        for ledger_hash in [
928            new_best_tip.staking_epoch_ledger_hash(),
929            new_root.snarked_ledger_hash(),
930            new_root.merkle_root_hash(),
931        ] {
932            if let Some((mut mask, is_synced)) = self.mask(ledger_hash) {
933                if !is_synced {
934                    panic!("ledger mask expected to be synced: {ledger_hash}");
935                }
936                let calculated = merkle_root(&mut mask);
937                assert_eq!(ledger_hash, &calculated, "ledger mask hash mismatch");
938            } else {
939                panic!("ledger mask is missing: {ledger_hash}");
940            }
941        }
942
943        for (ledger_hash, snarked_ledger) in self.snarked_ledgers.iter_mut() {
944            while let Some((parent_hash, parent)) = snarked_ledger
945                .get_parent()
946                .map(|mut parent| (merkle_root(&mut parent), parent))
947                .filter(|(parent_hash, _)| !ledgers_to_keep.contains(parent_hash))
948            {
949                mina_core::debug!(mina_core::log::system_time();
950                    kind = "LedgerService::commit - mask.commit_and_reparent",
951                    summary = format!("{ledger_hash} -> {parent_hash}"));
952                snarked_ledger.commit();
953                snarked_ledger.unregister_mask(UnregisterBehavior::Check);
954                *snarked_ledger = parent;
955            }
956        }
957
958        // TODO(tizoc): should this fail silently?
959        let Some(new_root_ledger) = self.staged_ledgers.get_mut(new_root.staged_ledger_hashes())
960        else {
961            return Default::default();
962        };
963
964        // Make staged ledger mask new root.
965        new_root_ledger.commit_and_reparent_to_root();
966
967        let needed_protocol_states = self
968            .staged_ledger_mut(new_root.staged_ledger_hashes())
969            .map(|l| {
970                l.scan_state()
971                    .required_state_hashes()
972                    .into_iter()
973                    .map(|fp| DataHashLibStateHashStableV1(fp.into()).into())
974                    .collect()
975            })
976            .unwrap_or_default();
977
978        let available_jobs = Arc::new(
979            self.staged_ledger_mut(new_best_tip.staged_ledger_hashes())
980                .map(|l| {
981                    l.scan_state()
982                        .all_job_pairs_iter()
983                        .map(|job| job.map(|single| AvailableJobMessage::from(single)))
984                        .collect()
985                })
986                .unwrap_or_default(),
987        );
988
989        // self.check_alive_masks();
990
991        CommitResult {
992            alive_masks: ::ledger::mask::alive_len(),
993            available_jobs,
994            needed_protocol_states,
995        }
996    }
997
998    #[allow(dead_code)]
999    fn check_alive_masks(&mut self) {
1000        let mut alive: BTreeSet<_> = ::ledger::mask::alive_collect();
1001        let staged_ledgers = self
1002            .staged_ledgers
1003            .staged_ledgers
1004            .iter()
1005            .map(|(hash, ledger)| (&hash.non_snark.ledger_hash, ledger.ledger_ref()));
1006
1007        let alive_ledgers = self
1008            .snarked_ledgers
1009            .iter()
1010            .chain(staged_ledgers)
1011            .map(|(hash, mask)| {
1012                let uuid = mask.get_uuid();
1013                if !alive.remove(&uuid) {
1014                    bug_condition!("mask not found among alive masks! uuid: {uuid}, hash: {hash}");
1015                }
1016                (uuid, hash)
1017            })
1018            .collect::<Vec<_>>();
1019        mina_core::debug!(redux::Timestamp::global_now(); "alive_ledgers_after_commit: {alive_ledgers:#?}");
1020
1021        if !alive.is_empty() {
1022            bug_condition!(
1023                "masks alive which are no longer part of the ledger service: {alive:#?}"
1024            );
1025        }
1026    }
1027
1028    pub fn get_num_accounts(
1029        &mut self,
1030        ledger_hash: v2::LedgerHash,
1031    ) -> Option<(u64, v2::LedgerHash)> {
1032        let (mask, _) = self
1033            .mask(&ledger_hash)
1034            .filter(|(_, is_synced)| *is_synced)?;
1035        // fix(binier): incorrect ledger hash, must be a hash of a populated subtree.
1036
1037        let num_accounts = mask.num_accounts() as u64;
1038        let first_node_addr = ledger::Address::first(
1039            LEDGER_DEPTH.saturating_sub(super::tree_height_for_num_accounts(num_accounts)),
1040        );
1041        let hash = LedgerHash::from_fp(mask.get_hash(first_node_addr)?);
1042        Some((num_accounts, hash))
1043    }
1044
1045    pub fn get_child_hashes(
1046        &mut self,
1047        ledger_hash: v2::LedgerHash,
1048        addr: LedgerAddress,
1049    ) -> Option<(v2::LedgerHash, v2::LedgerHash)> {
1050        let (mask, is_synced) = self.mask(&ledger_hash)?;
1051        let get_hash = |addr: LedgerAddress| {
1052            let depth = addr.length();
1053            mask.get_hash(addr)
1054                .map(|fp| MinaBaseLedgerHash0StableV1(fp.into()).into())
1055                .or_else(|| {
1056                    if is_synced {
1057                        Some(ledger_empty_hash_at_depth(depth))
1058                    } else {
1059                        None
1060                    }
1061                })
1062        };
1063        let (left, right) = (addr.child_left(), addr.child_right());
1064        Some((get_hash(left)?, get_hash(right)?))
1065    }
1066
1067    pub fn get_child_accounts(
1068        &mut self,
1069        ledger_hash: v2::LedgerHash,
1070        addr: LedgerAddress,
1071    ) -> Option<Vec<v2::MinaBaseAccountBinableArgStableV2>> {
1072        let (mask, _) = self
1073            .mask(&ledger_hash)
1074            .filter(|(_, is_synced)| *is_synced)?;
1075        // TODO(binier): SEC maybe we need to check addr depth?
1076        let accounts = mask
1077            .get_all_accounts_rooted_at(addr)?
1078            .into_iter()
1079            .map(|(_, account)| (&*account).into())
1080            .collect();
1081        Some(accounts)
1082    }
1083
1084    pub fn get_accounts(
1085        &mut self,
1086        ledger_hash: v2::LedgerHash,
1087        ids: Vec<AccountId>,
1088    ) -> Vec<Account> {
1089        let Some((mask, _)) = self.mask(&ledger_hash) else {
1090            mina_core::warn!(
1091                mina_core::log::system_time();
1092                kind = "LedgerService::get_accounts",
1093                summary = format!("Ledger not found: {ledger_hash:?}")
1094            );
1095            return Vec::new();
1096        };
1097        let addrs = mask
1098            .location_of_account_batch(&ids)
1099            .into_iter()
1100            .filter_map(|(_id, addr)| addr)
1101            .collect::<Vec<_>>();
1102
1103        mask.get_batch(&addrs)
1104            .into_iter()
1105            .filter_map(|(_, account)| account.map(|account| *account))
1106            .collect::<Vec<_>>()
1107    }
1108
1109    pub fn staged_ledger_aux_and_pending_coinbase(
1110        &mut self,
1111        ledger_hash: &MinaBaseStagedLedgerHashStableV1,
1112        protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
1113    ) -> Option<Arc<StagedLedgerAuxAndPendingCoinbases>> {
1114        let ledger = self.staged_ledger_mut(ledger_hash)?;
1115        let needed_blocks = ledger
1116            .scan_state()
1117            .required_state_hashes()
1118            .into_iter()
1119            .map(|fp| DataHashLibStateHashStableV1(fp.into()))
1120            .map(|hash| protocol_states.get(&hash.into()).ok_or(()).cloned())
1121            .collect::<Result<_, _>>()
1122            .ok()?;
1123        // Required so that we can perform the conversion bellow, which
1124        // will not work if the hash is not available already.
1125        ledger.pending_coinbase_collection_merkle_root();
1126        Some(
1127            StagedLedgerAuxAndPendingCoinbases {
1128                scan_state: (ledger.scan_state()).into(),
1129                staged_ledger_hash: ledger_hash.non_snark.ledger_hash.clone(),
1130                pending_coinbase: (ledger.pending_coinbase_collection()).into(),
1131                needed_blocks,
1132            }
1133            .into(),
1134        )
1135    }
1136
1137    #[allow(clippy::too_many_arguments)]
1138    pub fn staged_ledger_diff_create(
1139        &mut self,
1140        pred_block: AppliedBlock,
1141        global_slot_since_genesis: v2::MinaNumbersGlobalSlotSinceGenesisMStableV1,
1142        is_new_epoch: bool,
1143        producer: NonZeroCurvePoint,
1144        delegator: NonZeroCurvePoint,
1145        coinbase_receiver: NonZeroCurvePoint,
1146        completed_snarks: BTreeMap<SnarkJobId, Snark>,
1147        supercharge_coinbase: bool,
1148        transactions_by_fee: Vec<valid::UserCommand>,
1149    ) -> Result<StagedLedgerDiffCreateOutput, String> {
1150        let mut staged_ledger = self
1151            .staged_ledger_mut(pred_block.staged_ledger_hashes())
1152            .ok_or_else(|| {
1153                format!(
1154                    "parent staged ledger missing: {}",
1155                    pred_block.merkle_root_hash()
1156                )
1157            })?
1158            .clone();
1159
1160        // calculate merkle root hash, otherwise `MinaBasePendingCoinbaseStableV2::from` fails.
1161        staged_ledger.hash();
1162        let pending_coinbase_witness =
1163            MinaBasePendingCoinbaseStableV2::from(staged_ledger.pending_coinbase_collection());
1164
1165        let protocol_state_view =
1166            protocol_state_view(&pred_block.header().protocol_state).map_err(error_to_string)?;
1167
1168        // TODO(binier): include `invalid_txns` in output.
1169        let (pre_diff, _invalid_txns) = staged_ledger
1170            .create_diff(
1171                constraint_constants(),
1172                (&global_slot_since_genesis).into(),
1173                Some(true),
1174                (&coinbase_receiver).try_into().map_err(error_to_string)?,
1175                (),
1176                &protocol_state_view,
1177                transactions_by_fee,
1178                |stmt| {
1179                    let job_id = SnarkJobId::from(stmt);
1180                    match completed_snarks.get(&job_id) {
1181                        Some(snark) => snark.try_into().ok(),
1182                        None => None,
1183                    }
1184                },
1185                supercharge_coinbase,
1186            )
1187            .map_err(|err| format!("{err:?}"))?;
1188
1189        // TODO(binier): maybe here, check if block reward is above threshold.
1190        // <https://github.com/minaprotocol/mina/blob/b3d418a8c0ae4370738886c2b26f0ec7bdb49303/src/lib/block_producer/block_producer.ml#L222>
1191
1192        let pred_body_hash = pred_block
1193            .header()
1194            .protocol_state
1195            .body
1196            .try_hash()
1197            .map_err(error_to_string)?;
1198        let diff = (&pre_diff).into();
1199
1200        let res = staged_ledger
1201            .apply_diff_unchecked(
1202                constraint_constants(),
1203                (&global_slot_since_genesis).into(),
1204                pre_diff,
1205                (),
1206                &protocol_state_view,
1207                (
1208                    pred_block.hash().0.to_field().map_err(error_to_string)?,
1209                    pred_body_hash.0.to_field().map_err(error_to_string)?,
1210                ),
1211                (&coinbase_receiver).try_into().map_err(error_to_string)?,
1212                supercharge_coinbase,
1213            )
1214            .map_err(|err| format!("{err:?}"))?;
1215
1216        let diff_hash = block_body_hash(&diff).map_err(|err| format!("{err:?}"))?;
1217        let staking_ledger_hash = if is_new_epoch {
1218            pred_block.next_epoch_ledger_hash()
1219        } else {
1220            pred_block.staking_epoch_ledger_hash()
1221        };
1222
1223        Ok(StagedLedgerDiffCreateOutput {
1224            diff,
1225            diff_hash,
1226            staged_ledger_hash: (&res.hash_after_applying).into(),
1227            emitted_ledger_proof: res
1228                .ledger_proof
1229                .map(|(proof, ..)| (&proof).into())
1230                .map(Arc::new),
1231            pending_coinbase_update: (&res.pending_coinbase_update.1).into(),
1232            pending_coinbase_witness: MinaBasePendingCoinbaseWitnessStableV2 {
1233                pending_coinbases: pending_coinbase_witness,
1234                is_new_stack: res.pending_coinbase_update.0,
1235            },
1236            stake_proof_sparse_ledger: self
1237                .stake_proof_sparse_ledger(staking_ledger_hash, &producer, &delegator)
1238                .map_err(error_to_string)?,
1239        })
1240    }
1241
1242    pub fn stake_proof_sparse_ledger(
1243        &mut self,
1244        staking_ledger: &LedgerHash,
1245        producer: &NonZeroCurvePoint,
1246        delegator: &NonZeroCurvePoint,
1247    ) -> Result<v2::MinaBaseSparseLedgerBaseStableV2, InvalidBigInt> {
1248        let mask = self.snarked_ledgers.get(staking_ledger).unwrap();
1249        let producer_id = ledger::AccountId::new(producer.try_into()?, ledger::TokenId::default());
1250        let delegator_id =
1251            ledger::AccountId::new(delegator.try_into()?, ledger::TokenId::default());
1252        let sparse_ledger = ledger::sparse_ledger::SparseLedger::of_ledger_subset_exn(
1253            mask.clone(),
1254            &[producer_id, delegator_id],
1255        );
1256        Ok((&sparse_ledger).into())
1257    }
1258
1259    pub fn scan_state_summary(
1260        &self,
1261        staged_ledger_hash: &MinaBaseStagedLedgerHashStableV1,
1262    ) -> Result<Vec<Vec<RpcScanStateSummaryScanStateJob>>, String> {
1263        use ledger::scan_state::scan_state::JobValue;
1264
1265        let ledger = self.staged_ledgers.get(staged_ledger_hash);
1266        let Some(ledger) = ledger else {
1267            return Ok(Vec::new());
1268        };
1269        ledger
1270            .scan_state()
1271            .view()
1272            .map(|jobs| {
1273                let jobs = jobs.collect::<Vec<JobValueWithIndex<'_>>>();
1274                let mut iter = jobs.iter().peekable();
1275                let mut res = Vec::with_capacity(jobs.len());
1276
1277                loop {
1278                    let Some(job) = iter.next() else { break };
1279
1280                    let (stmt, seq_no, job_kind, is_done) = match &job.job {
1281                        JobValue::Leaf(JobValueBase::Empty)
1282                        | JobValue::Node(JobValueMerge::Empty)
1283                        | JobValue::Node(JobValueMerge::Part(_)) => {
1284                            res.push(RpcScanStateSummaryScanStateJob::Empty);
1285                            continue;
1286                        }
1287                        JobValue::Leaf(JobValueBase::Full(job)) => {
1288                            let stmt = &job.job.statement;
1289                            let tx = job.job.transaction_with_info.transaction();
1290                            let status = (&tx.status).into();
1291                            let tx = MinaTransactionTransactionStableV2::from(&tx.data);
1292                            let kind = RpcScanStateSummaryScanStateJobKind::Base(
1293                                RpcScanStateSummaryBlockTransaction {
1294                                    hash: tx.hash().ok(),
1295                                    kind: (&tx).into(),
1296                                    status,
1297                                },
1298                            );
1299                            let seq_no = job.seq_no.as_u64();
1300                            (stmt.clone(), seq_no, kind, job.state.is_done())
1301                        }
1302                        JobValue::Node(JobValueMerge::Full(job)) => {
1303                            let stmt = job
1304                                .left
1305                                .proof
1306                                .statement()
1307                                .merge(&job.right.proof.statement())
1308                                .unwrap();
1309                            let kind = RpcScanStateSummaryScanStateJobKind::Merge;
1310                            let seq_no = job.seq_no.as_u64();
1311                            (stmt, seq_no, kind, job.state.is_done())
1312                        }
1313                    };
1314                    let stmt: MinaStateBlockchainStateValueStableV2LedgerProofStatement =
1315                        (&stmt).into();
1316                    let job_id: SnarkJobId = (&stmt.source, &stmt.target).into();
1317
1318                    let bundle =
1319                        job.bundle_sibling()
1320                            .and_then(|(sibling_index, is_sibling_left)| {
1321                                let sibling_job = jobs.get(sibling_index)?;
1322                                let sibling_stmt: MinaStateBlockchainStateValueStableV2LedgerProofStatement = match &sibling_job.job {
1323                                    JobValue::Leaf(JobValueBase::Full(job)) => {
1324                                        (&job.job.statement).into()
1325                                    }
1326                                    JobValue::Node(JobValueMerge::Full(job)) => (&job
1327                                        .left
1328                                        .proof
1329                                        .statement()
1330                                        .merge(&job.right.proof.statement())
1331                                        .unwrap()).into(),
1332                                    _ => return None,
1333                                };
1334                                let bundle_job_id: SnarkJobId = match is_sibling_left {
1335                                    false => (&stmt.source, &sibling_stmt.target).into(),
1336                                    true => (&sibling_stmt.source, &stmt.target).into(),
1337                                };
1338                                Some((bundle_job_id, is_sibling_left))
1339                            });
1340
1341                    let bundle_job_id = bundle
1342                        .as_ref()
1343                        .map_or_else(|| job_id.clone(), |(id, _)| id.clone());
1344
1345                    if is_done {
1346                        let is_left =
1347                            bundle.map_or_else(|| true, |(_, is_sibling_left)| !is_sibling_left);
1348                        let parent = job.parent().ok_or_else(|| format!("job(depth: {}, index: {}) has no parent", job.depth(), job.index()))?;
1349                        let sok_message: MinaBaseSokMessageStableV1 = {
1350                                let job = jobs.get(parent).ok_or_else(|| format!("job(depth: {}, index: {}) parent not found", job.depth(), job.index()))?;
1351                                match &job.job {
1352                                    JobValue::Node(JobValueMerge::Part(job)) if is_left => {
1353                                        (&job.sok_message).into()
1354                                    }
1355                                    JobValue::Node(JobValueMerge::Full(job)) => {
1356                                        if is_left {
1357                                            (&job.left.sok_message).into()
1358                                        } else {
1359                                            (&job.right.sok_message).into()
1360                                        }
1361                                    }
1362                                    _state => {
1363                                        // Parent of a `Done` job can't be in this state.
1364                                        // But we are bug-compatible with the OCaml node here, in which sometimes for
1365                                        // some reason there is an empty row in the scan state trees, so Empty
1366                                        // is used instead.
1367                                        res.push(RpcScanStateSummaryScanStateJob::Empty);
1368                                        continue;
1369                                    }
1370                                }
1371                        };
1372                        res.push(RpcScanStateSummaryScanStateJob::Done {
1373                            job_id,
1374                            bundle_job_id,
1375                            job: Box::new(job_kind),
1376                            seq_no,
1377                            snark: Box::new(RpcSnarkPoolJobSnarkWorkDone {
1378                                snarker: sok_message.prover,
1379                                fee: sok_message.fee,
1380                            }),
1381                        });
1382                    } else {
1383                        res.push(RpcScanStateSummaryScanStateJob::Todo {
1384                            job_id,
1385                            bundle_job_id,
1386                            job: job_kind,
1387                            seq_no,
1388                        });
1389                    }
1390                }
1391                Ok(res)
1392            })
1393            .collect()
1394    }
1395}
1396
1397impl LedgerSyncState {
1398    fn mask(&self, hash: &LedgerHash) -> Option<(Mask, bool)> {
1399        self.snarked_ledgers
1400            .get(hash)
1401            .cloned()
1402            .map(|mask| (mask, false))
1403            .or_else(|| Some((self.staged_ledgers.get_mask(hash)?, true)))
1404    }
1405
1406    fn pending_sync_snarked_ledger_mask(&self, hash: &LedgerHash) -> Result<Mask, String> {
1407        self.snarked_ledgers
1408            .get(hash)
1409            .cloned()
1410            .ok_or_else(|| format!("Missing sync snarked ledger {}", hash))
1411    }
1412
1413    /// Returns a [Mask] instance for the snarked ledger with `hash`. If it doesn't
1414    /// exist a new instance is created.
1415    fn snarked_ledger_mut(&mut self, hash: LedgerHash) -> Result<&mut Mask, InvalidBigInt> {
1416        let hash_fp = hash.to_field()?;
1417        Ok(self.snarked_ledgers.entry(hash.clone()).or_insert_with(|| {
1418            let mut ledger = Mask::create(LEDGER_DEPTH);
1419            ledger.set_cached_hash_unchecked(&LedgerAddress::root(), hash_fp);
1420            ledger
1421        }))
1422    }
1423
1424    fn staged_ledger_mut(
1425        &mut self,
1426        hash: &MinaBaseStagedLedgerHashStableV1,
1427    ) -> Option<&mut StagedLedger> {
1428        self.staged_ledgers.get_mut(hash)
1429    }
1430}
1431
1432fn staged_ledger_reconstruct(
1433    snarked_ledger: Mask,
1434    snarked_ledger_hash: LedgerHash,
1435    parts: Option<Arc<StagedLedgerAuxAndPendingCoinbasesValid>>,
1436) -> Result<(v2::LedgerHash, Result<StagedLedger, String>), InvalidBigInt> {
1437    let staged_ledger_hash = parts
1438        .as_ref()
1439        .map(|p| p.staged_ledger_hash.clone())
1440        .unwrap_or_else(|| snarked_ledger_hash.clone());
1441
1442    let ledger = snarked_ledger.make_child();
1443
1444    let mut result = if let Some(parts) = &parts {
1445        let states = parts
1446            .needed_blocks
1447            .iter()
1448            .map(|state| Ok((state.try_hash()?.to_field()?, state.clone())))
1449            .collect::<Result<BTreeMap<Fp, _>, _>>()?;
1450
1451        StagedLedger::of_scan_state_pending_coinbases_and_snarked_ledger(
1452            (),
1453            constraint_constants(),
1454            Verifier,
1455            (&parts.scan_state).try_into()?,
1456            ledger,
1457            LocalState::empty(),
1458            parts.staged_ledger_hash.to_field()?,
1459            (&parts.pending_coinbase).try_into()?,
1460            |key| states.get(&key).cloned().unwrap(),
1461        )
1462    } else {
1463        StagedLedger::create_exn(constraint_constants().clone(), ledger)
1464    };
1465
1466    match result.as_mut() {
1467        Ok(staged_ledger) => {
1468            staged_ledger.commit_and_reparent_to_root();
1469        }
1470        Err(_) => {
1471            if let Err(e) = dump_reconstruct_to_file(&snarked_ledger, &parts) {
1472                mina_core::error!(
1473                    mina_core::log::system_time();
1474                    kind = "LedgerService::dump - Failed reconstruct",
1475                    summary = format!("Failed to save reconstruction to file: {e:?}")
1476                );
1477            }
1478        }
1479    }
1480
1481    Ok((staged_ledger_hash, result))
1482}
1483
1484pub trait LedgerService: redux::Service {
1485    fn ledger_manager(&self) -> &LedgerManager;
1486    fn force_sync_calls(&self) -> bool {
1487        false
1488    }
1489
1490    fn write_init(&mut self, request: LedgerWriteRequest) {
1491        let request = LedgerRequest::Write(request);
1492        if self.force_sync_calls() {
1493            let _ = self.ledger_manager().call_sync(request);
1494        } else {
1495            self.ledger_manager().call(request);
1496        }
1497    }
1498
1499    fn read_init(&mut self, id: LedgerReadId, request: LedgerReadRequest) {
1500        let request = LedgerRequest::Read(id, request);
1501        if self.force_sync_calls() {
1502            let _ = self.ledger_manager().call_sync(request);
1503        } else {
1504            self.ledger_manager().call(request);
1505        }
1506    }
1507}
1508
1509/// Save reconstruction to file, when it fails.
1510/// So we can easily reproduce the application both in Rust and OCaml, to compare them.
1511fn dump_reconstruct_to_file(
1512    snarked_ledger: &Mask,
1513    parts: &Option<Arc<StagedLedgerAuxAndPendingCoinbasesValid>>,
1514) -> std::io::Result<()> {
1515    use mina_p2p_messages::binprot::{
1516        self,
1517        macros::{BinProtRead, BinProtWrite},
1518    };
1519
1520    #[derive(BinProtRead, BinProtWrite)]
1521    struct ReconstructContext {
1522        accounts: Vec<v2::MinaBaseAccountBinableArgStableV2>,
1523        scan_state: v2::TransactionSnarkScanStateStableV2,
1524        pending_coinbase: v2::MinaBasePendingCoinbaseStableV2,
1525        staged_ledger_hash: LedgerHash,
1526        states: List<v2::MinaStateProtocolStateValueStableV2>,
1527    }
1528
1529    let Some(parts) = parts else {
1530        return Err(std::io::ErrorKind::Other.into());
1531    };
1532
1533    let StagedLedgerAuxAndPendingCoinbasesValid {
1534        scan_state,
1535        staged_ledger_hash,
1536        pending_coinbase,
1537        needed_blocks,
1538    } = &**parts;
1539
1540    let reconstruct_context = ReconstructContext {
1541        accounts: snarked_ledger
1542            .to_list()
1543            .iter()
1544            .map(v2::MinaBaseAccountBinableArgStableV2::from)
1545            .collect(),
1546        scan_state: scan_state.clone(),
1547        pending_coinbase: pending_coinbase.clone(),
1548        staged_ledger_hash: staged_ledger_hash.clone(),
1549        states: needed_blocks.clone(),
1550    };
1551
1552    let debug_dir = mina_core::get_debug_dir();
1553    let filename = debug_dir
1554        .join("failed_reconstruct_ctx.binprot")
1555        .to_string_lossy()
1556        .to_string();
1557    std::fs::create_dir_all(&debug_dir)?;
1558
1559    use mina_p2p_messages::binprot::BinProtWrite;
1560    let mut file = std::fs::File::create(&filename)?;
1561    reconstruct_context.binprot_write(&mut file)?;
1562    file.sync_all()?;
1563
1564    mina_core::info!(
1565        mina_core::log::system_time();
1566        kind = "LedgerService::dump - Failed reconstruct",
1567        summary = format!("Reconstruction saved to: {filename:?}")
1568    );
1569
1570    Ok(())
1571}
1572
1573/// Save staged ledger and block to file, when the application fail.
1574/// So we can easily reproduce the application both in Rust and OCaml, to compare them.
1575/// - <https://github.com/o1-labs/mina-rust/blob/8e68037aafddd43842a54c8439baeafee4c6e1eb/ledger/src/staged_ledger/staged_ledger.rs#L5959>
1576/// - TODO: Find OCaml link, I remember having the same test in OCaml but I can't find where
1577fn dump_application_to_file(
1578    staged_ledger: &StagedLedger,
1579    block: ArcBlockWithHash,
1580    pred_block: AppliedBlock,
1581) -> std::io::Result<String> {
1582    use mina_p2p_messages::binprot::{
1583        self,
1584        macros::{BinProtRead, BinProtWrite},
1585    };
1586
1587    #[derive(BinProtRead, BinProtWrite)]
1588    struct ApplyContext {
1589        accounts: Vec<v2::MinaBaseAccountBinableArgStableV2>,
1590        scan_state: v2::TransactionSnarkScanStateStableV2,
1591        pending_coinbase: v2::MinaBasePendingCoinbaseStableV2,
1592        pred_block: v2::MinaBlockBlockStableV2,
1593        blocks: Vec<v2::MinaBlockBlockStableV2>,
1594    }
1595
1596    let cs = &block.block.header.protocol_state.body.consensus_state;
1597    let block_height = cs.blockchain_length.as_u32();
1598
1599    let apply_context = ApplyContext {
1600        accounts: staged_ledger
1601            .ledger()
1602            .to_list()
1603            .iter()
1604            .map(v2::MinaBaseAccountBinableArgStableV2::from)
1605            .collect::<Vec<_>>(),
1606        scan_state: staged_ledger.scan_state().into(),
1607        pending_coinbase: staged_ledger.pending_coinbase_collection().into(),
1608        pred_block: (**pred_block.block()).clone(),
1609        blocks: vec![(*block.block).clone()],
1610    };
1611
1612    let debug_dir = mina_core::get_debug_dir();
1613    let filename = debug_dir
1614        .join(format!("failed_application_ctx_{}.binprot", block_height))
1615        .to_string_lossy()
1616        .to_string();
1617    std::fs::create_dir_all(&debug_dir)?;
1618
1619    let mut file = std::fs::File::create(&filename)?;
1620
1621    use mina_p2p_messages::binprot::BinProtWrite;
1622    apply_context.binprot_write(&mut file)?;
1623    file.sync_all()?;
1624
1625    Ok(filename)
1626}
1627
1628#[cfg(test)]
1629mod tests {
1630    use mina_p2p_messages::v2::MinaBaseLedgerHash0StableV1;
1631
1632    use crate::ledger::hash_node_at_depth;
1633
1634    use super::*;
1635
1636    #[test]
1637    fn test_ledger_hash() {
1638        IntoIterator::into_iter([(
1639            LedgerAddress::root(),
1640            "jx5YAT36bv62M8mPcREYYfZWXaKqqMzDCP8wmc21uf4CfDKAHCr",
1641            "jxo5pSyt16XGwA9UeuAdiFDzrwFH3smbNTJF7fxq98w1y9Jem2m",
1642            "jwq3nCDr8XejL8HKDxR5qVhFJbKoUTGZgtLBZCp3MrqLTnqmjdP",
1643        )])
1644        .map(|(addr, expected_hash, left, right)| {
1645            let left: LedgerHash = left.parse().unwrap();
1646            let right: LedgerHash = right.parse().unwrap();
1647            (addr, expected_hash, left, right)
1648        })
1649        .for_each(|(address, expected_hash, left, right)| {
1650            let hash = hash_node_at_depth(
1651                address.length(),
1652                left.0.to_field().unwrap(),
1653                right.0.to_field().unwrap(),
1654            );
1655            let hash: LedgerHash = MinaBaseLedgerHash0StableV1(hash.into()).into();
1656            assert_eq!(hash.to_string(), expected_hash);
1657        });
1658    }
1659}