node/ledger/
ledger_service.rs

1use super::{
2    ledger_empty_hash_at_depth,
3    read::{LedgerReadId, LedgerReadRequest, LedgerReadResponse},
4    write::{CommitResult, LedgerWriteRequest, LedgerWriteResponse, LedgersToKeep},
5    LedgerAddress, LedgerEvent, LEDGER_DEPTH,
6};
7use crate::{
8    account::AccountPublicKey,
9    block_producer_effectful::StagedLedgerDiffCreateOutput,
10    ledger::{
11        ledger_manager::{LedgerManager, LedgerRequest},
12        write::{BlockApplyResult, BlockApplyResultArchive},
13    },
14    p2p::channels::rpc::StagedLedgerAuxAndPendingCoinbases,
15    rpc::{
16        RpcScanStateSummaryBlockTransaction, RpcScanStateSummaryScanStateJob,
17        RpcScanStateSummaryScanStateJobKind, RpcSnarkPoolJobSnarkWorkDone,
18    },
19    transition_frontier::{
20        genesis::empty_pending_coinbase_hash,
21        sync::{
22            ledger::staged::StagedLedgerAuxAndPendingCoinbasesValid,
23            TransitionFrontierRootSnarkedLedgerUpdates,
24        },
25    },
26};
27use ark_ff::fields::arithmetic::InvalidBigInt;
28use ledger::{
29    scan_state::{
30        currency::Slot,
31        scan_state::{AvailableJobMessage, JobValueBase, JobValueMerge, JobValueWithIndex, Pass},
32        transaction_logic::{
33            local_state::LocalState,
34            protocol_state::{protocol_state_view, ProtocolStateView},
35            transaction_partially_applied::TransactionPartiallyApplied,
36            valid,
37            zkapp_command::AccessedOrNot,
38            Transaction, TransactionStatus, UserCommand,
39        },
40    },
41    sparse_ledger::SparseLedger,
42    staged_ledger::{
43        diff::Diff,
44        staged_ledger::{SkipVerification, StagedLedger},
45        validate_block::block_body_hash,
46    },
47    verifier::Verifier,
48    Account, AccountId, AccountIndex, BaseLedger, Database, Mask, TokenId, UnregisterBehavior,
49};
50use mina_curves::pasta::Fp;
51use mina_p2p_messages::{
52    binprot::BinProtRead,
53    list::List,
54    v2::{
55        self, DataHashLibStateHashStableV1, LedgerHash, MinaBaseLedgerHash0StableV1,
56        MinaBasePendingCoinbaseStableV2, MinaBasePendingCoinbaseWitnessStableV2,
57        MinaBaseSokMessageStableV1, MinaBaseStagedLedgerHashStableV1,
58        MinaStateBlockchainStateValueStableV2LedgerProofStatement,
59        MinaStateProtocolStateValueStableV2, MinaTransactionTransactionStableV2, NonZeroCurvePoint,
60        StateHash,
61    },
62};
63use mina_signer::CompressedPubKey;
64use openmina_core::{
65    block::{AppliedBlock, ArcBlockWithHash},
66    bug_condition,
67    constants::constraint_constants,
68    snark::{Snark, SnarkJobId},
69    thread,
70};
71use std::{
72    collections::{BTreeMap, BTreeSet},
73    path::Path,
74    sync::Arc,
75};
76
77fn merkle_root(mask: &mut Mask) -> LedgerHash {
78    MinaBaseLedgerHash0StableV1(mask.merkle_root().into()).into()
79}
80
81fn error_to_string(e: InvalidBigInt) -> String {
82    format!("{:?}", e)
83}
84
85/// Indexing `StagedLedger` both by their "merkle root hash" and their "staged ledger hash"
86#[derive(Default)]
87struct StagedLedgersStorage {
88    /// 1 merkle root hash can refer to 1 or more `StagedLedger`
89    by_merkle_root_hash: BTreeMap<LedgerHash, Vec<Arc<MinaBaseStagedLedgerHashStableV1>>>,
90    staged_ledgers: BTreeMap<Arc<MinaBaseStagedLedgerHashStableV1>, StagedLedger>,
91}
92
93impl StagedLedgersStorage {
94    /// Slow, it will recompute the full staged ledger hash
95    /// Prefer `Self::insert` when you have the "staged ledger hash" around
96    fn insert_by_recomputing_hash(&mut self, mut staged_ledger: StagedLedger) {
97        let staged_ledger_hash: MinaBaseStagedLedgerHashStableV1 = (&staged_ledger.hash()).into();
98        self.insert(Arc::new(staged_ledger_hash), staged_ledger);
99    }
100
101    fn insert(
102        &mut self,
103        staged_ledger_hash: Arc<MinaBaseStagedLedgerHashStableV1>,
104        staged_ledger: StagedLedger,
105    ) {
106        let merkle_root_hash: LedgerHash = merkle_root(&mut staged_ledger.ledger());
107        self.by_merkle_root_hash
108            .entry(merkle_root_hash.clone())
109            .or_insert_with(|| Vec::with_capacity(1))
110            .extend([staged_ledger_hash.clone()]);
111        self.staged_ledgers
112            .insert(staged_ledger_hash, staged_ledger);
113    }
114
115    fn get_mask(&self, root_hash: &LedgerHash) -> Option<Mask> {
116        let staged_ledger_hashes: &Vec<_> = self.by_merkle_root_hash.get(root_hash)?;
117        // Note: there can be multiple `staged_ledger_hashes`, but they all have the same
118        // mask, so we just take the 1st one
119        self.staged_ledgers
120            .get(staged_ledger_hashes.first()?)
121            .map(|staged_ledger| staged_ledger.ledger())
122    }
123
124    fn get(&self, staged_ledger_hash: &MinaBaseStagedLedgerHashStableV1) -> Option<&StagedLedger> {
125        self.staged_ledgers.get(staged_ledger_hash)
126    }
127
128    fn get_mut(
129        &mut self,
130        staged_ledger_hash: &MinaBaseStagedLedgerHashStableV1,
131    ) -> Option<&mut StagedLedger> {
132        self.staged_ledgers.get_mut(staged_ledger_hash)
133    }
134
135    fn retain<F>(&mut self, fun: F)
136    where
137        F: Fn(&MinaBaseStagedLedgerHashStableV1) -> bool,
138    {
139        self.by_merkle_root_hash.retain(|_, staged_ledger_hashes| {
140            staged_ledger_hashes.retain(|hash| {
141                if fun(hash) {
142                    return true;
143                }
144                self.staged_ledgers.remove(hash);
145                false
146            });
147            !staged_ledger_hashes.is_empty()
148        });
149    }
150
151    fn extend<I>(&mut self, iterator: I)
152    where
153        I: IntoIterator<Item = (Arc<MinaBaseStagedLedgerHashStableV1>, StagedLedger)>,
154    {
155        for (hash, staged_ledger) in iterator.into_iter() {
156            self.insert(hash, staged_ledger);
157        }
158    }
159
160    fn take(&mut self) -> BTreeMap<Arc<MinaBaseStagedLedgerHashStableV1>, StagedLedger> {
161        let Self {
162            by_merkle_root_hash,
163            staged_ledgers,
164        } = self;
165
166        let _ = std::mem::take(by_merkle_root_hash);
167        std::mem::take(staged_ledgers)
168    }
169}
170
171#[derive(Default)]
172pub struct LedgerCtx {
173    snarked_ledgers: BTreeMap<LedgerHash, Mask>,
174    /// Additional snarked ledgers specified at startup (loaded from disk)
175    additional_snarked_ledgers: BTreeMap<LedgerHash, Mask>,
176    staged_ledgers: StagedLedgersStorage,
177    sync: LedgerSyncState,
178    /// Returns more data on block application necessary for archive node
179    archive_mode: bool,
180    event_sender:
181        Option<openmina_core::channels::mpsc::UnboundedSender<crate::event_source::Event>>,
182}
183
184#[derive(Default)]
185struct LedgerSyncState {
186    snarked_ledgers: BTreeMap<LedgerHash, Mask>,
187    staged_ledgers: StagedLedgersStorage,
188}
189
190impl LedgerCtx {
191    pub fn new_with_additional_snarked_ledgers<P>(path: P) -> Self
192    where
193        P: AsRef<Path>,
194    {
195        use std::fs;
196
197        let Ok(dir) = fs::read_dir(path) else {
198            return Self::default();
199        };
200
201        let additional_snarked_ledgers = dir
202            .filter_map(|entry| {
203                let entry = entry.ok()?;
204                let hash = entry.file_name().to_str()?.parse().ok()?;
205                let mut file = fs::File::open(entry.path()).ok()?;
206
207                let _ = Option::<LedgerHash>::binprot_read(&mut file).ok()?;
208
209                let accounts = Vec::<Account>::binprot_read(&mut file).ok()?;
210                let mut mask = Mask::new_root(Database::create(35));
211                for account in accounts {
212                    let account_id = account.id();
213                    mask.get_or_create_account(account_id, account).unwrap();
214                }
215                Some((hash, mask))
216            })
217            .collect();
218
219        LedgerCtx {
220            additional_snarked_ledgers,
221            ..Default::default()
222        }
223    }
224
225    pub fn set_archive_mode(&mut self) {
226        self.archive_mode = true;
227    }
228
229    // TODO(tizoc): Only used for the current workaround to make staged ledger
230    // reconstruction async, can be removed when the ledger services are made async
231    pub fn set_event_sender(
232        &mut self,
233        event_sender: openmina_core::channels::mpsc::UnboundedSender<crate::event_source::Event>,
234    ) {
235        self.event_sender = Some(event_sender);
236    }
237
238    pub(super) fn send_event(&self, event: LedgerEvent) {
239        if let Some(tx) = self.event_sender.as_ref() {
240            let _ = tx.send(event.into());
241        }
242    }
243
244    pub(super) fn send_write_response(&self, resp: LedgerWriteResponse) {
245        self.send_event(LedgerEvent::Write(resp))
246    }
247
248    pub(super) fn send_read_response(&self, id: LedgerReadId, resp: LedgerReadResponse) {
249        self.send_event(LedgerEvent::Read(id, resp))
250    }
251
252    pub fn insert_genesis_ledger(&mut self, mut mask: Mask) {
253        let merkle_root_hash = merkle_root(&mut mask);
254        let staged_ledger =
255            StagedLedger::create_exn(constraint_constants().clone(), mask.copy()).unwrap();
256        self.snarked_ledgers.insert(merkle_root_hash.clone(), mask);
257        // The genesis ledger is a specific case, some of its hashes are zero
258        let staged_ledger_hash =
259            MinaBaseStagedLedgerHashStableV1::zero(merkle_root_hash, empty_pending_coinbase_hash());
260        self.staged_ledgers
261            .insert(Arc::new(staged_ledger_hash), staged_ledger);
262    }
263
264    pub fn staged_ledger_reconstruct_result_store(&mut self, ledger: StagedLedger) {
265        self.staged_ledgers.insert_by_recomputing_hash(ledger);
266    }
267
268    // TODO(adonagy): Uh-oh, clean this up
269    pub fn get_accounts_for_rpc(
270        &self,
271        ledger_hash: LedgerHash,
272        requested_public_key: Option<AccountPublicKey>,
273    ) -> Vec<Account> {
274        if let Some((mask, _)) = self.mask(&ledger_hash) {
275            let mut accounts = Vec::new();
276            let mut single_account = Vec::new();
277
278            mask.iter(|account| {
279                accounts.push(account.clone());
280                if let Some(public_key) = requested_public_key.as_ref() {
281                    if public_key == &AccountPublicKey::from(account.public_key.clone()) {
282                        single_account.push(account.clone());
283                    }
284                }
285            });
286
287            if requested_public_key.is_some() {
288                single_account
289            } else {
290                accounts
291            }
292        } else {
293            vec![]
294        }
295    }
296
297    // TODO(tizoc): explain when `is_synced` is `true` and when it is `false`. Also use something else than a boolean.
298    /// Returns a tuple of `(mask, is_synced)` for a [Mask] with the specified `hash` if it exists or `None` otherwise.
299    pub fn mask(&self, hash: &LedgerHash) -> Option<(Mask, bool)> {
300        self.snarked_ledgers
301            .get(hash)
302            .cloned()
303            .map(|mask| (mask, true))
304            .or_else(|| Some((self.staged_ledgers.get_mask(hash)?, true)))
305            .or_else(|| self.sync.mask(hash))
306            .or_else(|| {
307                self.additional_snarked_ledgers
308                    .get(hash)
309                    .map(|l| (l.clone(), true))
310            })
311    }
312
313    pub fn contains_snarked_ledger(&self, hash: &LedgerHash) -> bool {
314        self.snarked_ledgers.contains_key(hash)
315    }
316
317    /// Returns the mask for a snarked ledger being synchronized or an error if it is not present
318    pub fn pending_sync_snarked_ledger_mask(&self, hash: &LedgerHash) -> Result<Mask, String> {
319        self.sync.pending_sync_snarked_ledger_mask(hash)
320    }
321
322    /// Copies the contents of an existing snarked ledger into the target
323    /// hash under the pending sync snarked ledgers state.
324    pub fn copy_snarked_ledger_contents_for_sync(
325        &mut self,
326        origin_snarked_ledger_hash: LedgerHash,
327        target_snarked_ledger_hash: LedgerHash,
328        overwrite: bool,
329    ) -> Result<bool, String> {
330        if !overwrite
331            && self
332                .sync
333                .snarked_ledgers
334                .contains_key(&target_snarked_ledger_hash)
335        {
336            return Ok(false);
337        }
338
339        let origin = self
340            .snarked_ledgers
341            .get(&origin_snarked_ledger_hash)
342            .or_else(|| {
343                // If it doesn't exist in completed ledgers, it may be
344                // an in-progress ledger from a previous attempt that we can reuse
345                self.sync.snarked_ledgers.get(&origin_snarked_ledger_hash)
346            })
347            .ok_or(format!(
348                "Tried to copy from non-existing snarked ledger with hash: {}",
349                origin_snarked_ledger_hash
350            ))?;
351
352        let target = origin.copy();
353        self.sync
354            .snarked_ledgers
355            .insert(target_snarked_ledger_hash, target);
356
357        Ok(true)
358    }
359
360    pub fn compute_snarked_ledger_hashes(
361        &mut self,
362        snarked_ledger_hash: &LedgerHash,
363    ) -> Result<(), String> {
364        let origin = self
365            .snarked_ledgers
366            .get_mut(snarked_ledger_hash)
367            .or_else(|| self.sync.snarked_ledgers.get_mut(snarked_ledger_hash))
368            .ok_or(format!(
369                "Cannot hash non-existing snarked ledger: {}",
370                snarked_ledger_hash
371            ))?;
372
373        // Our ledger is lazy when it comes to hashing, but retrieving the
374        // merkle root hash forces all pending hashes to be computed.
375        let _force_hashing = origin.merkle_root();
376
377        Ok(())
378    }
379
380    /// Returns a mutable reference to the [StagedLedger] with the specified `hash` if it exists or `None` otherwise.
381    fn staged_ledger_mut(
382        &mut self,
383        hash: &MinaBaseStagedLedgerHashStableV1,
384    ) -> Option<&mut StagedLedger> {
385        match self.staged_ledgers.get_mut(hash) {
386            Some(v) => Some(v),
387            None => self.sync.staged_ledger_mut(hash),
388        }
389    }
390
391    fn recreate_snarked_ledger(
392        &mut self,
393        root_snarked_ledger_updates: &TransitionFrontierRootSnarkedLedgerUpdates,
394        needed_protocol_states: &BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
395        snarked_ledger_hash: &LedgerHash,
396    ) -> Result<(), String> {
397        let Some(update) = root_snarked_ledger_updates.get(snarked_ledger_hash) else {
398            return Ok(());
399        };
400        self.recreate_snarked_ledger(
401            root_snarked_ledger_updates,
402            needed_protocol_states,
403            &update.parent,
404        )?;
405
406        self.push_snarked_ledger(
407            needed_protocol_states,
408            &update.parent,
409            snarked_ledger_hash,
410            &update.staged_ledger_hash,
411        )
412    }
413
414    fn push_snarked_ledger(
415        &mut self,
416        protocol_states: &BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
417        old_root_snarked_ledger_hash: &LedgerHash,
418        new_root_snarked_ledger_hash: &LedgerHash,
419        new_root_staged_ledger_hash: &MinaBaseStagedLedgerHashStableV1,
420    ) -> Result<(), String> {
421        openmina_core::debug!(openmina_core::log::system_time();
422            kind = "LedgerService::push_snarked_ledger",
423            summary = format!("{old_root_snarked_ledger_hash} -> {new_root_snarked_ledger_hash}"));
424        // Steps 4-7 from https://github.com/openmina/mina/blob/bc812dc9b90e05898c0c36ac76ba51ccf6cac137/src/lib/transition_frontier/full_frontier/full_frontier.ml#L354-L392
425        let constraint_constants = &constraint_constants();
426
427        // Step 4: create a new temporary mask `mt` with `s` as it's parent
428        let root_snarked_ledger = self
429            .snarked_ledgers
430            .get(old_root_snarked_ledger_hash)
431            .or_else(|| self.sync.snarked_ledgers.get(old_root_snarked_ledger_hash))
432            .ok_or_else(|| {
433                format!(
434                    "push_snarked_ledger: could not find old root snarked ledger: {}",
435                    old_root_snarked_ledger_hash,
436                )
437            })?;
438        let mut mt = root_snarked_ledger.make_child();
439
440        // Step 5: apply any transactions to `mt` that appear in the transition between `s` and `s'`
441        let apply_first_pass = |global_slot: Slot,
442                                txn_state_view: &ProtocolStateView,
443                                ledger: &mut Mask,
444                                transaction: &Transaction| {
445            ledger::scan_state::transaction_logic::apply_transaction_first_pass(
446                constraint_constants,
447                global_slot,
448                txn_state_view,
449                ledger,
450                transaction,
451            )
452        };
453
454        let apply_second_pass = |ledger: &mut Mask, tx: TransactionPartiallyApplied<Mask>| {
455            ledger::scan_state::transaction_logic::apply_transaction_second_pass(
456                constraint_constants,
457                ledger,
458                tx,
459            )
460        };
461
462        let apply_first_pass_sparse_ledger =
463            |global_slot: Slot,
464             txn_state_view: &ProtocolStateView,
465             sparse_ledger: &mut SparseLedger,
466             transaction: &Transaction| {
467                ledger::scan_state::transaction_logic::apply_transaction_first_pass(
468                    constraint_constants,
469                    global_slot,
470                    txn_state_view,
471                    sparse_ledger,
472                    transaction,
473                )
474            };
475
476        let get_protocol_state = |state_hash: Fp| {
477            let state_hash = StateHash::from_fp(state_hash);
478            if let Some(s) = protocol_states.get(&state_hash) {
479                Ok(s.clone())
480            } else {
481                Err(format!(
482                    "Failed to find protocol state for state hash: {}",
483                    state_hash
484                ))
485            }
486        };
487
488        let scan_state = self
489            .staged_ledger_mut(new_root_staged_ledger_hash)
490            .ok_or_else(|| {
491                format!(
492                    "Failed to find staged ledger with hash: {:#?}",
493                    new_root_staged_ledger_hash
494                )
495            })?
496            .scan_state();
497
498        let Pass::FirstPassLedgerHash(_first_pass_ledger_target) = scan_state
499            .get_snarked_ledger_sync(
500                &mut mt,
501                get_protocol_state,
502                apply_first_pass,
503                apply_second_pass,
504                apply_first_pass_sparse_ledger,
505            )?;
506
507        // Assert that the obtained ledger is the one we expect
508        let expected_hash = new_root_snarked_ledger_hash;
509        let obtained_hash = LedgerHash::from_fp(mt.merkle_root());
510
511        if expected_hash != &obtained_hash {
512            return Err(format!(
513                "Expected to obtain snarked root ledger hash {} but got {}",
514                expected_hash, obtained_hash
515            ));
516        }
517
518        self.sync
519            .snarked_ledgers
520            .insert(new_root_snarked_ledger_hash.clone(), mt);
521
522        Ok(())
523    }
524
525    pub fn get_account_delegators(
526        &self,
527        ledger_hash: &LedgerHash,
528        account_id: &AccountId,
529    ) -> Option<Vec<Account>> {
530        let (mask, _) = self.mask(ledger_hash)?;
531        let mut accounts = Vec::new();
532
533        mask.iter(|account| {
534            if account.delegate == Some(account_id.public_key.clone()) {
535                accounts.push(account.clone());
536            }
537        });
538
539        Some(accounts)
540    }
541
542    #[allow(clippy::type_complexity)]
543    pub fn producers_with_delegates<F: FnMut(&CompressedPubKey) -> bool>(
544        &self,
545        ledger_hash: &LedgerHash,
546        mut filter: F,
547    ) -> Option<BTreeMap<AccountPublicKey, Vec<(ledger::AccountIndex, AccountPublicKey, u64)>>>
548    {
549        let (mask, _) = self.mask(ledger_hash)?;
550        let mut accounts = Vec::new();
551
552        mask.iter(|account| {
553            if filter(account.delegate.as_ref().unwrap_or(&account.public_key)) {
554                accounts.push((
555                    account.id(),
556                    account.delegate.clone(),
557                    account.balance.as_u64(),
558                ))
559            }
560        });
561
562        let producers = accounts.into_iter().fold(
563            BTreeMap::<_, Vec<_>>::new(),
564            |mut producers, (id, delegate, balance)| {
565                let index = mask.index_of_account(id.clone()).unwrap();
566                let pub_key = AccountPublicKey::from(id.public_key);
567                let producer = delegate.map(Into::into).unwrap_or(pub_key.clone());
568                producers
569                    .entry(producer)
570                    .or_default()
571                    .push((index, pub_key, balance));
572                producers
573            },
574        );
575        Some(producers)
576    }
577
578    pub fn child_hashes_get(
579        &mut self,
580        snarked_ledger_hash: LedgerHash,
581        parent: &LedgerAddress,
582    ) -> Result<(LedgerHash, LedgerHash), String> {
583        let mut mask = self.pending_sync_snarked_ledger_mask(&snarked_ledger_hash)?;
584        let left_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.child_left())?);
585        let right_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.child_right())?);
586
587        Ok((left_hash, right_hash))
588    }
589
590    pub fn accounts_set(
591        &mut self,
592        snarked_ledger_hash: LedgerHash,
593        parent: &LedgerAddress,
594        accounts: Vec<v2::MinaBaseAccountBinableArgStableV2>,
595    ) -> Result<LedgerHash, String> {
596        let mut mask = self.pending_sync_snarked_ledger_mask(&snarked_ledger_hash)?;
597        let accounts: Vec<_> = accounts
598            .into_iter()
599            .map(|account| Ok(Box::new((&account).try_into()?)))
600            .collect::<Result<Vec<_>, InvalidBigInt>>()
601            .map_err(error_to_string)?;
602
603        mask.set_all_accounts_rooted_at(parent.clone(), &accounts)
604            .map_err(|_| "Failed when setting accounts".to_owned())?;
605
606        let computed_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.clone())?);
607
608        Ok(computed_hash)
609    }
610
611    pub fn staged_ledger_reconstruct<F>(
612        &mut self,
613        snarked_ledger_hash: LedgerHash,
614        parts: Option<Arc<StagedLedgerAuxAndPendingCoinbasesValid>>,
615        callback: F,
616    ) -> Result<(), InvalidBigInt>
617    where
618        F: 'static + FnOnce(v2::LedgerHash, Result<StagedLedger, String>) + Send,
619    {
620        let snarked_ledger = self
621            .sync
622            .snarked_ledger_mut(snarked_ledger_hash.clone())?
623            .copy();
624
625        thread::Builder::new()
626            .name("staged-ledger-reconstruct".into())
627            .spawn(move || {
628                match staged_ledger_reconstruct(snarked_ledger, snarked_ledger_hash, parts) {
629                    Ok((staged_ledger_hash, result)) => {
630                        callback(staged_ledger_hash, result);
631                    }
632                    Err(e) => callback(v2::LedgerHash::zero(), Err(format!("{:?}", e))),
633                }
634            })
635            .expect("Failed: staged ledger reconstruct thread");
636
637        Ok(())
638    }
639
640    pub fn staged_ledger_reconstruct_sync(
641        &mut self,
642        snarked_ledger_hash: LedgerHash,
643        parts: Option<Arc<StagedLedgerAuxAndPendingCoinbasesValid>>,
644    ) -> Result<(v2::LedgerHash, Result<(), String>), InvalidBigInt> {
645        let snarked_ledger = self
646            .sync
647            .snarked_ledger_mut(snarked_ledger_hash.clone())?
648            .copy();
649        let (staged_ledger_hash, result) =
650            staged_ledger_reconstruct(snarked_ledger, snarked_ledger_hash, parts)?;
651        let result = match result {
652            Err(err) => Err(err),
653            Ok(staged_ledger) => {
654                self.staged_ledger_reconstruct_result_store(staged_ledger);
655                Ok(())
656            }
657        };
658
659        Ok((staged_ledger_hash, result))
660    }
661
662    pub fn block_apply(
663        &mut self,
664        block: ArcBlockWithHash,
665        pred_block: AppliedBlock,
666        skip_verification: Option<SkipVerification>,
667    ) -> Result<BlockApplyResult, String> {
668        openmina_core::info!(openmina_core::log::system_time();
669            kind = "LedgerService::block_apply",
670            summary = format!("{}, {} <- {}", block.height(), block.hash(), block.pred_hash()),
671            pred_staged_ledger_hash = pred_block.merkle_root_hash().to_string(),
672            staged_ledger_hash = block.merkle_root_hash().to_string(),
673        );
674        let mut staged_ledger = self
675            .staged_ledger_mut(pred_block.staged_ledger_hashes())
676            .ok_or_else(|| {
677                format!(
678                    "parent staged ledger missing: {:#?}",
679                    pred_block.staged_ledger_hashes()
680                )
681            })?
682            .clone();
683
684        let global_slot = block.global_slot_since_genesis();
685        let prev_protocol_state = &pred_block.header().protocol_state;
686        let prev_state_view = protocol_state_view(prev_protocol_state).map_err(error_to_string)?;
687
688        let consensus_state = &block.header().protocol_state.body.consensus_state;
689        let coinbase_receiver: CompressedPubKey = (&consensus_state.coinbase_receiver)
690            .try_into()
691            .map_err(error_to_string)?;
692        let supercharge_coinbase = consensus_state.supercharge_coinbase;
693
694        let diff: Diff = (&block.body().staged_ledger_diff)
695            .try_into()
696            .map_err(error_to_string)?;
697
698        let prev_protocol_state: ledger::proofs::block::ProtocolState =
699            prev_protocol_state.try_into()?;
700
701        let result = staged_ledger
702            .apply(
703                skip_verification,
704                constraint_constants(),
705                Slot::from_u32(global_slot),
706                diff,
707                (),
708                &Verifier,
709                &prev_state_view,
710                prev_protocol_state.hashes(),
711                coinbase_receiver.clone(),
712                supercharge_coinbase,
713            )
714            .map_err(|err| format!("{err:?}"))?;
715        let just_emitted_a_proof = result.ledger_proof.is_some();
716        let ledger_hashes = MinaBaseStagedLedgerHashStableV1::from(&result.hash_after_applying);
717
718        // TODO(binier): return error if not matching.
719        let expected_ledger_hashes = block.staged_ledger_hashes();
720        if &ledger_hashes != expected_ledger_hashes {
721            let staged_ledger = self
722                .staged_ledger_mut(pred_block.staged_ledger_hashes())
723                .unwrap(); // We already know the ledger exists, see the same call a few lines above
724
725            match dump_application_to_file(staged_ledger, block.clone(), pred_block) {
726                Ok(filename) => openmina_core::info!(
727                    openmina_core::log::system_time();
728                    kind = "LedgerService::dump - Failed application",
729                    summary = format!("StagedLedger and block saved to: {filename:?}")
730                ),
731                Err(e) => openmina_core::error!(
732                    openmina_core::log::system_time();
733                    kind = "LedgerService::dump - Failed application",
734                    summary = format!("Failed to save block application to file: {e:?}")
735                ),
736            }
737
738            panic!("staged ledger hash mismatch. found: {ledger_hashes:#?}, expected: {expected_ledger_hashes:#?}");
739        }
740
741        let archive_data = if self.archive_mode {
742            let senders = block
743                .body()
744                .transactions()
745                .filter_map(|tx| UserCommand::try_from(tx).ok().map(|cmd| cmd.fee_payer()))
746                .collect::<BTreeSet<_>>()
747                .into_iter();
748
749            let coinbase_receiver_id = AccountId::new(coinbase_receiver, TokenId::default());
750
751            // <https://github.com/MinaProtocol/mina/blob/85149735ca3a76d026e8cf36b8ff22941a048e31/src/app/archive/lib/diff.ml#L78>
752            let (accessed, not_accessed): (BTreeSet<_>, BTreeSet<_>) = block
753                .body()
754                .tranasctions_with_status()
755                .flat_map(|(tx, status)| {
756                    let status: TransactionStatus = status.into();
757                    UserCommand::try_from(tx)
758                        .ok()
759                        .map(|cmd| cmd.account_access_statuses(&status))
760                        .into_iter()
761                        .flatten()
762                })
763                .partition(|(_, status)| *status == AccessedOrNot::Accessed);
764
765            let mut account_ids_accessed: BTreeSet<_> =
766                accessed.into_iter().map(|(id, _)| id).collect();
767            let mut account_ids_not_accessed: BTreeSet<_> =
768                not_accessed.into_iter().map(|(id, _)| id).collect();
769
770            // Coinbase receiver is included only when the block has a coinbase transaction
771            // Note: If for whatever reason the network has set the coinbase amount to zero,
772            // to mimic the behavior of the ocaml node, we still include the coinbase receiver
773            // in the accessed accounts as a coinbase transaction is created regardless of the coinbase amount.
774            // <https://github.com/MinaProtocol/mina/blob/b595a2bf00ae138d745737da628bd94bb2bd91e2/src/lib/staged_ledger/pre_diff_info.ml#L139>
775            let has_coinbase = block.body().has_coinbase();
776
777            if has_coinbase {
778                account_ids_accessed.insert(coinbase_receiver_id);
779            } else {
780                account_ids_not_accessed.insert(coinbase_receiver_id);
781            }
782
783            // Include the coinbase fee transfer accounts
784            let fee_transfer_accounts =
785                block.body().coinbase_fee_transfers_iter().filter_map(|cb| {
786                    let receiver: CompressedPubKey = cb.receiver_pk.inner().try_into().ok()?;
787                    let account_id = AccountId::new(receiver, TokenId::default());
788                    Some(account_id)
789                });
790            account_ids_accessed.extend(fee_transfer_accounts);
791
792            // TODO(adonagy): Create a struct instead of tuple
793            let accounts_accessed: Vec<(AccountIndex, Account)> = account_ids_accessed
794                .iter()
795                .filter_map(|id| {
796                    staged_ledger
797                        .ledger()
798                        .index_of_account(id.clone())
799                        .and_then(|index| {
800                            staged_ledger
801                                .ledger()
802                                .get_at_index(index)
803                                .map(|account| (index, *account))
804                        })
805                })
806                .collect();
807
808            let account_creation_fee = constraint_constants().account_creation_fee;
809
810            // TODO(adonagy): Create a struct instead of tuple
811            let accounts_created: Vec<(AccountId, u64)> = staged_ledger
812                .latest_block_accounts_created(pred_block.hash().to_field()?)
813                .iter()
814                .map(|id| (id.clone(), account_creation_fee))
815                .collect();
816
817            // A token is used regardless of txn status
818            // <https://github.com/MinaProtocol/mina/blob/85149735ca3a76d026e8cf36b8ff22941a048e31/src/app/archive/lib/diff.ml#L114>
819            let all_account_ids: BTreeSet<_> = account_ids_accessed
820                .union(&account_ids_not_accessed)
821                .collect();
822            let tokens_used: BTreeSet<(TokenId, Option<AccountId>)> = if has_coinbase {
823                all_account_ids
824                    .iter()
825                    .map(|id| {
826                        let token_id = id.token_id.clone();
827                        let token_owner = staged_ledger.ledger().token_owner(token_id.clone());
828                        (token_id, token_owner)
829                    })
830                    .collect()
831            } else {
832                BTreeSet::new()
833            };
834
835            let sender_receipt_chains_from_parent_ledger = senders
836                .filter_map(|sender| {
837                    if let Some(location) = staged_ledger.ledger().location_of_account(&sender) {
838                        staged_ledger.ledger().get(location).map(|account| {
839                            (
840                                sender,
841                                v2::ReceiptChainHash::from(account.receipt_chain_hash),
842                            )
843                        })
844                    } else {
845                        None
846                    }
847                })
848                .collect();
849            Some(BlockApplyResultArchive {
850                accounts_accessed,
851                accounts_created,
852                tokens_used,
853                sender_receipt_chains_from_parent_ledger,
854            })
855        } else {
856            None
857        };
858
859        self.sync
860            .staged_ledgers
861            .insert(Arc::new(ledger_hashes), staged_ledger);
862
863        // staged_ledger.ledger().get_at_index(index)
864
865        Ok(BlockApplyResult {
866            block,
867            just_emitted_a_proof,
868            archive_data,
869        })
870    }
871
872    pub fn commit(
873        &mut self,
874        ledgers_to_keep: LedgersToKeep,
875        root_snarked_ledger_updates: TransitionFrontierRootSnarkedLedgerUpdates,
876        needed_protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
877        new_root: &ArcBlockWithHash,
878        new_best_tip: &ArcBlockWithHash,
879    ) -> CommitResult {
880        openmina_core::debug!(openmina_core::log::system_time();
881            kind = "LedgerService::commit",
882            summary = format!("commit {}, {}", new_best_tip.height(), new_best_tip.hash()),
883            new_root = format!("{}, {}", new_root.height(), new_root.hash()),
884            new_root_staking_epoch_ledger = new_root.staking_epoch_ledger_hash().to_string(),
885            new_root_next_epoch_ledger = new_root.next_epoch_ledger_hash().to_string(),
886            new_root_snarked_ledger = new_root.snarked_ledger_hash().to_string(),
887        );
888        self.recreate_snarked_ledger(
889            &root_snarked_ledger_updates,
890            &needed_protocol_states,
891            new_root.snarked_ledger_hash(),
892        )
893        .unwrap();
894
895        self.snarked_ledgers.retain(|hash, _| {
896            let keep = ledgers_to_keep.contains(hash);
897            if !keep {
898                openmina_core::debug!(openmina_core::log::system_time();
899                    kind = "LedgerService::commit - snarked_ledgers.drop",
900                    summary = format!("drop snarked ledger {hash}"));
901            }
902            keep
903        });
904        self.snarked_ledgers.extend(
905            std::mem::take(&mut self.sync.snarked_ledgers)
906                .into_iter()
907                .filter(|(hash, _)| {
908                    let keep = ledgers_to_keep.contains(hash);
909                    if !keep {
910                        openmina_core::debug!(openmina_core::log::system_time();
911                            kind = "LedgerService::commit - snarked_ledgers.drop",
912                            summary = format!("drop snarked ledger {hash}"));
913                    }
914                    keep
915                }),
916        );
917
918        self.staged_ledgers
919            .retain(|hash| ledgers_to_keep.contains(hash));
920        self.staged_ledgers.extend(
921            self.sync
922                .staged_ledgers
923                .take()
924                .into_iter()
925                .filter(|(hash, _)| ledgers_to_keep.contains(&**hash)),
926        );
927
928        for ledger_hash in [
929            new_best_tip.staking_epoch_ledger_hash(),
930            new_root.snarked_ledger_hash(),
931            new_root.merkle_root_hash(),
932        ] {
933            if let Some((mut mask, is_synced)) = self.mask(ledger_hash) {
934                if !is_synced {
935                    panic!("ledger mask expected to be synced: {ledger_hash}");
936                }
937                let calculated = merkle_root(&mut mask);
938                assert_eq!(ledger_hash, &calculated, "ledger mask hash mismatch");
939            } else {
940                panic!("ledger mask is missing: {ledger_hash}");
941            }
942        }
943
944        for (ledger_hash, snarked_ledger) in self.snarked_ledgers.iter_mut() {
945            while let Some((parent_hash, parent)) = snarked_ledger
946                .get_parent()
947                .map(|mut parent| (merkle_root(&mut parent), parent))
948                .filter(|(parent_hash, _)| !ledgers_to_keep.contains(parent_hash))
949            {
950                openmina_core::debug!(openmina_core::log::system_time();
951                    kind = "LedgerService::commit - mask.commit_and_reparent",
952                    summary = format!("{ledger_hash} -> {parent_hash}"));
953                snarked_ledger.commit();
954                snarked_ledger.unregister_mask(UnregisterBehavior::Check);
955                *snarked_ledger = parent;
956            }
957        }
958
959        // TODO(tizoc): should this fail silently?
960        let Some(new_root_ledger) = self.staged_ledgers.get_mut(new_root.staged_ledger_hashes())
961        else {
962            return Default::default();
963        };
964
965        // Make staged ledger mask new root.
966        new_root_ledger.commit_and_reparent_to_root();
967
968        let needed_protocol_states = self
969            .staged_ledger_mut(new_root.staged_ledger_hashes())
970            .map(|l| {
971                l.scan_state()
972                    .required_state_hashes()
973                    .into_iter()
974                    .map(|fp| DataHashLibStateHashStableV1(fp.into()).into())
975                    .collect()
976            })
977            .unwrap_or_default();
978
979        let available_jobs = Arc::new(
980            self.staged_ledger_mut(new_best_tip.staged_ledger_hashes())
981                .map(|l| {
982                    l.scan_state()
983                        .all_job_pairs_iter()
984                        .map(|job| job.map(|single| AvailableJobMessage::from(single)))
985                        .collect()
986                })
987                .unwrap_or_default(),
988        );
989
990        // self.check_alive_masks();
991
992        CommitResult {
993            alive_masks: ::ledger::mask::alive_len(),
994            available_jobs,
995            needed_protocol_states,
996        }
997    }
998
999    #[allow(dead_code)]
1000    fn check_alive_masks(&mut self) {
1001        let mut alive: BTreeSet<_> = ::ledger::mask::alive_collect();
1002        let staged_ledgers = self
1003            .staged_ledgers
1004            .staged_ledgers
1005            .iter()
1006            .map(|(hash, ledger)| (&hash.non_snark.ledger_hash, ledger.ledger_ref()));
1007
1008        let alive_ledgers = self
1009            .snarked_ledgers
1010            .iter()
1011            .chain(staged_ledgers)
1012            .map(|(hash, mask)| {
1013                let uuid = mask.get_uuid();
1014                if !alive.remove(&uuid) {
1015                    bug_condition!("mask not found among alive masks! uuid: {uuid}, hash: {hash}");
1016                }
1017                (uuid, hash)
1018            })
1019            .collect::<Vec<_>>();
1020        openmina_core::debug!(redux::Timestamp::global_now(); "alive_ledgers_after_commit: {alive_ledgers:#?}");
1021
1022        if !alive.is_empty() {
1023            bug_condition!(
1024                "masks alive which are no longer part of the ledger service: {alive:#?}"
1025            );
1026        }
1027    }
1028
1029    pub fn get_num_accounts(
1030        &mut self,
1031        ledger_hash: v2::LedgerHash,
1032    ) -> Option<(u64, v2::LedgerHash)> {
1033        let (mask, _) = self
1034            .mask(&ledger_hash)
1035            .filter(|(_, is_synced)| *is_synced)?;
1036        // fix(binier): incorrect ledger hash, must be a hash of a populated subtree.
1037
1038        let num_accounts = mask.num_accounts() as u64;
1039        let first_node_addr = ledger::Address::first(
1040            LEDGER_DEPTH.saturating_sub(super::tree_height_for_num_accounts(num_accounts)),
1041        );
1042        let hash = LedgerHash::from_fp(mask.get_hash(first_node_addr)?);
1043        Some((num_accounts, hash))
1044    }
1045
1046    pub fn get_child_hashes(
1047        &mut self,
1048        ledger_hash: v2::LedgerHash,
1049        addr: LedgerAddress,
1050    ) -> Option<(v2::LedgerHash, v2::LedgerHash)> {
1051        let (mask, is_synced) = self.mask(&ledger_hash)?;
1052        let get_hash = |addr: LedgerAddress| {
1053            let depth = addr.length();
1054            mask.get_hash(addr)
1055                .map(|fp| MinaBaseLedgerHash0StableV1(fp.into()).into())
1056                .or_else(|| {
1057                    if is_synced {
1058                        Some(ledger_empty_hash_at_depth(depth))
1059                    } else {
1060                        None
1061                    }
1062                })
1063        };
1064        let (left, right) = (addr.child_left(), addr.child_right());
1065        Some((get_hash(left)?, get_hash(right)?))
1066    }
1067
1068    pub fn get_child_accounts(
1069        &mut self,
1070        ledger_hash: v2::LedgerHash,
1071        addr: LedgerAddress,
1072    ) -> Option<Vec<v2::MinaBaseAccountBinableArgStableV2>> {
1073        let (mask, _) = self
1074            .mask(&ledger_hash)
1075            .filter(|(_, is_synced)| *is_synced)?;
1076        // TODO(binier): SEC maybe we need to check addr depth?
1077        let accounts = mask
1078            .get_all_accounts_rooted_at(addr)?
1079            .into_iter()
1080            .map(|(_, account)| (&*account).into())
1081            .collect();
1082        Some(accounts)
1083    }
1084
1085    pub fn get_accounts(
1086        &mut self,
1087        ledger_hash: v2::LedgerHash,
1088        ids: Vec<AccountId>,
1089    ) -> Vec<Account> {
1090        let Some((mask, _)) = self.mask(&ledger_hash) else {
1091            openmina_core::warn!(
1092                openmina_core::log::system_time();
1093                kind = "LedgerService::get_accounts",
1094                summary = format!("Ledger not found: {ledger_hash:?}")
1095            );
1096            return Vec::new();
1097        };
1098        let addrs = mask
1099            .location_of_account_batch(&ids)
1100            .into_iter()
1101            .filter_map(|(_id, addr)| addr)
1102            .collect::<Vec<_>>();
1103
1104        mask.get_batch(&addrs)
1105            .into_iter()
1106            .filter_map(|(_, account)| account.map(|account| *account))
1107            .collect::<Vec<_>>()
1108    }
1109
1110    pub fn staged_ledger_aux_and_pending_coinbase(
1111        &mut self,
1112        ledger_hash: &MinaBaseStagedLedgerHashStableV1,
1113        protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
1114    ) -> Option<Arc<StagedLedgerAuxAndPendingCoinbases>> {
1115        let ledger = self.staged_ledger_mut(ledger_hash)?;
1116        let needed_blocks = ledger
1117            .scan_state()
1118            .required_state_hashes()
1119            .into_iter()
1120            .map(|fp| DataHashLibStateHashStableV1(fp.into()))
1121            .map(|hash| protocol_states.get(&hash.into()).ok_or(()).cloned())
1122            .collect::<Result<_, _>>()
1123            .ok()?;
1124        // Required so that we can perform the conversion bellow, which
1125        // will not work if the hash is not available already.
1126        ledger.pending_coinbase_collection_merkle_root();
1127        Some(
1128            StagedLedgerAuxAndPendingCoinbases {
1129                scan_state: (ledger.scan_state()).into(),
1130                staged_ledger_hash: ledger_hash.non_snark.ledger_hash.clone(),
1131                pending_coinbase: (ledger.pending_coinbase_collection()).into(),
1132                needed_blocks,
1133            }
1134            .into(),
1135        )
1136    }
1137
1138    #[allow(clippy::too_many_arguments)]
1139    pub fn staged_ledger_diff_create(
1140        &mut self,
1141        pred_block: AppliedBlock,
1142        global_slot_since_genesis: v2::MinaNumbersGlobalSlotSinceGenesisMStableV1,
1143        is_new_epoch: bool,
1144        producer: NonZeroCurvePoint,
1145        delegator: NonZeroCurvePoint,
1146        coinbase_receiver: NonZeroCurvePoint,
1147        completed_snarks: BTreeMap<SnarkJobId, Snark>,
1148        supercharge_coinbase: bool,
1149        transactions_by_fee: Vec<valid::UserCommand>,
1150    ) -> Result<StagedLedgerDiffCreateOutput, String> {
1151        let mut staged_ledger = self
1152            .staged_ledger_mut(pred_block.staged_ledger_hashes())
1153            .ok_or_else(|| {
1154                format!(
1155                    "parent staged ledger missing: {}",
1156                    pred_block.merkle_root_hash()
1157                )
1158            })?
1159            .clone();
1160
1161        // calculate merkle root hash, otherwise `MinaBasePendingCoinbaseStableV2::from` fails.
1162        staged_ledger.hash();
1163        let pending_coinbase_witness =
1164            MinaBasePendingCoinbaseStableV2::from(staged_ledger.pending_coinbase_collection());
1165
1166        let protocol_state_view =
1167            protocol_state_view(&pred_block.header().protocol_state).map_err(error_to_string)?;
1168
1169        // TODO(binier): include `invalid_txns` in output.
1170        let (pre_diff, _invalid_txns) = staged_ledger
1171            .create_diff(
1172                constraint_constants(),
1173                (&global_slot_since_genesis).into(),
1174                Some(true),
1175                (&coinbase_receiver).try_into().map_err(error_to_string)?,
1176                (),
1177                &protocol_state_view,
1178                transactions_by_fee,
1179                |stmt| {
1180                    let job_id = SnarkJobId::from(stmt);
1181                    match completed_snarks.get(&job_id) {
1182                        Some(snark) => snark.try_into().ok(),
1183                        None => None,
1184                    }
1185                },
1186                supercharge_coinbase,
1187            )
1188            .map_err(|err| format!("{err:?}"))?;
1189
1190        // TODO(binier): maybe here, check if block reward is above threshold.
1191        // <https://github.com/minaprotocol/mina/blob/b3d418a8c0ae4370738886c2b26f0ec7bdb49303/src/lib/block_producer/block_producer.ml#L222>
1192
1193        let pred_body_hash = pred_block
1194            .header()
1195            .protocol_state
1196            .body
1197            .try_hash()
1198            .map_err(error_to_string)?;
1199        let diff = (&pre_diff).into();
1200
1201        let res = staged_ledger
1202            .apply_diff_unchecked(
1203                constraint_constants(),
1204                (&global_slot_since_genesis).into(),
1205                pre_diff,
1206                (),
1207                &protocol_state_view,
1208                (
1209                    pred_block.hash().0.to_field().map_err(error_to_string)?,
1210                    pred_body_hash.0.to_field().map_err(error_to_string)?,
1211                ),
1212                (&coinbase_receiver).try_into().map_err(error_to_string)?,
1213                supercharge_coinbase,
1214            )
1215            .map_err(|err| format!("{err:?}"))?;
1216
1217        let diff_hash = block_body_hash(&diff).map_err(|err| format!("{err:?}"))?;
1218        let staking_ledger_hash = if is_new_epoch {
1219            pred_block.next_epoch_ledger_hash()
1220        } else {
1221            pred_block.staking_epoch_ledger_hash()
1222        };
1223
1224        Ok(StagedLedgerDiffCreateOutput {
1225            diff,
1226            diff_hash,
1227            staged_ledger_hash: (&res.hash_after_applying).into(),
1228            emitted_ledger_proof: res
1229                .ledger_proof
1230                .map(|(proof, ..)| (&proof).into())
1231                .map(Arc::new),
1232            pending_coinbase_update: (&res.pending_coinbase_update.1).into(),
1233            pending_coinbase_witness: MinaBasePendingCoinbaseWitnessStableV2 {
1234                pending_coinbases: pending_coinbase_witness,
1235                is_new_stack: res.pending_coinbase_update.0,
1236            },
1237            stake_proof_sparse_ledger: self
1238                .stake_proof_sparse_ledger(staking_ledger_hash, &producer, &delegator)
1239                .map_err(error_to_string)?,
1240        })
1241    }
1242
1243    pub fn stake_proof_sparse_ledger(
1244        &mut self,
1245        staking_ledger: &LedgerHash,
1246        producer: &NonZeroCurvePoint,
1247        delegator: &NonZeroCurvePoint,
1248    ) -> Result<v2::MinaBaseSparseLedgerBaseStableV2, InvalidBigInt> {
1249        let mask = self.snarked_ledgers.get(staking_ledger).unwrap();
1250        let producer_id = ledger::AccountId::new(producer.try_into()?, ledger::TokenId::default());
1251        let delegator_id =
1252            ledger::AccountId::new(delegator.try_into()?, ledger::TokenId::default());
1253        let sparse_ledger = ledger::sparse_ledger::SparseLedger::of_ledger_subset_exn(
1254            mask.clone(),
1255            &[producer_id, delegator_id],
1256        );
1257        Ok((&sparse_ledger).into())
1258    }
1259
1260    pub fn scan_state_summary(
1261        &self,
1262        staged_ledger_hash: &MinaBaseStagedLedgerHashStableV1,
1263    ) -> Result<Vec<Vec<RpcScanStateSummaryScanStateJob>>, String> {
1264        use ledger::scan_state::scan_state::JobValue;
1265
1266        let ledger = self.staged_ledgers.get(staged_ledger_hash);
1267        let Some(ledger) = ledger else {
1268            return Ok(Vec::new());
1269        };
1270        ledger
1271            .scan_state()
1272            .view()
1273            .map(|jobs| {
1274                let jobs = jobs.collect::<Vec<JobValueWithIndex<'_>>>();
1275                let mut iter = jobs.iter().peekable();
1276                let mut res = Vec::with_capacity(jobs.len());
1277
1278                loop {
1279                    let Some(job) = iter.next() else { break };
1280
1281                    let (stmt, seq_no, job_kind, is_done) = match &job.job {
1282                        JobValue::Leaf(JobValueBase::Empty)
1283                        | JobValue::Node(JobValueMerge::Empty)
1284                        | JobValue::Node(JobValueMerge::Part(_)) => {
1285                            res.push(RpcScanStateSummaryScanStateJob::Empty);
1286                            continue;
1287                        }
1288                        JobValue::Leaf(JobValueBase::Full(job)) => {
1289                            let stmt = &job.job.statement;
1290                            let tx = job.job.transaction_with_info.transaction();
1291                            let status = (&tx.status).into();
1292                            let tx = MinaTransactionTransactionStableV2::from(&tx.data);
1293                            let kind = RpcScanStateSummaryScanStateJobKind::Base(
1294                                RpcScanStateSummaryBlockTransaction {
1295                                    hash: tx.hash().ok(),
1296                                    kind: (&tx).into(),
1297                                    status,
1298                                },
1299                            );
1300                            let seq_no = job.seq_no.as_u64();
1301                            (stmt.clone(), seq_no, kind, job.state.is_done())
1302                        }
1303                        JobValue::Node(JobValueMerge::Full(job)) => {
1304                            let stmt = job
1305                                .left
1306                                .proof
1307                                .statement()
1308                                .merge(&job.right.proof.statement())
1309                                .unwrap();
1310                            let kind = RpcScanStateSummaryScanStateJobKind::Merge;
1311                            let seq_no = job.seq_no.as_u64();
1312                            (stmt, seq_no, kind, job.state.is_done())
1313                        }
1314                    };
1315                    let stmt: MinaStateBlockchainStateValueStableV2LedgerProofStatement =
1316                        (&stmt).into();
1317                    let job_id: SnarkJobId = (&stmt.source, &stmt.target).into();
1318
1319                    let bundle =
1320                        job.bundle_sibling()
1321                            .and_then(|(sibling_index, is_sibling_left)| {
1322                                let sibling_job = jobs.get(sibling_index)?;
1323                                let sibling_stmt: MinaStateBlockchainStateValueStableV2LedgerProofStatement = match &sibling_job.job {
1324                                    JobValue::Leaf(JobValueBase::Full(job)) => {
1325                                        (&job.job.statement).into()
1326                                    }
1327                                    JobValue::Node(JobValueMerge::Full(job)) => (&job
1328                                        .left
1329                                        .proof
1330                                        .statement()
1331                                        .merge(&job.right.proof.statement())
1332                                        .unwrap()).into(),
1333                                    _ => return None,
1334                                };
1335                                let bundle_job_id: SnarkJobId = match is_sibling_left {
1336                                    false => (&stmt.source, &sibling_stmt.target).into(),
1337                                    true => (&sibling_stmt.source, &stmt.target).into(),
1338                                };
1339                                Some((bundle_job_id, is_sibling_left))
1340                            });
1341
1342                    let bundle_job_id = bundle
1343                        .as_ref()
1344                        .map_or_else(|| job_id.clone(), |(id, _)| id.clone());
1345
1346                    if is_done {
1347                        let is_left =
1348                            bundle.map_or_else(|| true, |(_, is_sibling_left)| !is_sibling_left);
1349                        let parent = job.parent().ok_or_else(|| format!("job(depth: {}, index: {}) has no parent", job.depth(), job.index()))?;
1350                        let sok_message: MinaBaseSokMessageStableV1 = {
1351                                let job = jobs.get(parent).ok_or_else(|| format!("job(depth: {}, index: {}) parent not found", job.depth(), job.index()))?;
1352                                match &job.job {
1353                                    JobValue::Node(JobValueMerge::Part(job)) if is_left => {
1354                                        (&job.sok_message).into()
1355                                    }
1356                                    JobValue::Node(JobValueMerge::Full(job)) => {
1357                                        if is_left {
1358                                            (&job.left.sok_message).into()
1359                                        } else {
1360                                            (&job.right.sok_message).into()
1361                                        }
1362                                    }
1363                                    _state => {
1364                                        // Parent of a `Done` job can't be in this state.
1365                                        // But we are bug-compatible with the OCaml node here, in which sometimes for
1366                                        // some reason there is an empty row in the scan state trees, so Empty
1367                                        // is used instead.
1368                                        res.push(RpcScanStateSummaryScanStateJob::Empty);
1369                                        continue;
1370                                    }
1371                                }
1372                        };
1373                        res.push(RpcScanStateSummaryScanStateJob::Done {
1374                            job_id,
1375                            bundle_job_id,
1376                            job: Box::new(job_kind),
1377                            seq_no,
1378                            snark: Box::new(RpcSnarkPoolJobSnarkWorkDone {
1379                                snarker: sok_message.prover,
1380                                fee: sok_message.fee,
1381                            }),
1382                        });
1383                    } else {
1384                        res.push(RpcScanStateSummaryScanStateJob::Todo {
1385                            job_id,
1386                            bundle_job_id,
1387                            job: job_kind,
1388                            seq_no,
1389                        });
1390                    }
1391                }
1392                Ok(res)
1393            })
1394            .collect()
1395    }
1396}
1397
1398impl LedgerSyncState {
1399    fn mask(&self, hash: &LedgerHash) -> Option<(Mask, bool)> {
1400        self.snarked_ledgers
1401            .get(hash)
1402            .cloned()
1403            .map(|mask| (mask, false))
1404            .or_else(|| Some((self.staged_ledgers.get_mask(hash)?, true)))
1405    }
1406
1407    fn pending_sync_snarked_ledger_mask(&self, hash: &LedgerHash) -> Result<Mask, String> {
1408        self.snarked_ledgers
1409            .get(hash)
1410            .cloned()
1411            .ok_or_else(|| format!("Missing sync snarked ledger {}", hash))
1412    }
1413
1414    /// Returns a [Mask] instance for the snarked ledger with `hash`. If it doesn't
1415    /// exist a new instance is created.
1416    fn snarked_ledger_mut(&mut self, hash: LedgerHash) -> Result<&mut Mask, InvalidBigInt> {
1417        let hash_fp = hash.to_field()?;
1418        Ok(self.snarked_ledgers.entry(hash.clone()).or_insert_with(|| {
1419            let mut ledger = Mask::create(LEDGER_DEPTH);
1420            ledger.set_cached_hash_unchecked(&LedgerAddress::root(), hash_fp);
1421            ledger
1422        }))
1423    }
1424
1425    fn staged_ledger_mut(
1426        &mut self,
1427        hash: &MinaBaseStagedLedgerHashStableV1,
1428    ) -> Option<&mut StagedLedger> {
1429        self.staged_ledgers.get_mut(hash)
1430    }
1431}
1432
1433fn staged_ledger_reconstruct(
1434    snarked_ledger: Mask,
1435    snarked_ledger_hash: LedgerHash,
1436    parts: Option<Arc<StagedLedgerAuxAndPendingCoinbasesValid>>,
1437) -> Result<(v2::LedgerHash, Result<StagedLedger, String>), InvalidBigInt> {
1438    let staged_ledger_hash = parts
1439        .as_ref()
1440        .map(|p| p.staged_ledger_hash.clone())
1441        .unwrap_or_else(|| snarked_ledger_hash.clone());
1442
1443    let ledger = snarked_ledger.make_child();
1444
1445    let mut result = if let Some(parts) = &parts {
1446        let states = parts
1447            .needed_blocks
1448            .iter()
1449            .map(|state| Ok((state.try_hash()?.to_field()?, state.clone())))
1450            .collect::<Result<BTreeMap<Fp, _>, _>>()?;
1451
1452        StagedLedger::of_scan_state_pending_coinbases_and_snarked_ledger(
1453            (),
1454            constraint_constants(),
1455            Verifier,
1456            (&parts.scan_state).try_into()?,
1457            ledger,
1458            LocalState::empty(),
1459            parts.staged_ledger_hash.to_field()?,
1460            (&parts.pending_coinbase).try_into()?,
1461            |key| states.get(&key).cloned().unwrap(),
1462        )
1463    } else {
1464        StagedLedger::create_exn(constraint_constants().clone(), ledger)
1465    };
1466
1467    match result.as_mut() {
1468        Ok(staged_ledger) => {
1469            staged_ledger.commit_and_reparent_to_root();
1470        }
1471        Err(_) => {
1472            if let Err(e) = dump_reconstruct_to_file(&snarked_ledger, &parts) {
1473                openmina_core::error!(
1474                    openmina_core::log::system_time();
1475                    kind = "LedgerService::dump - Failed reconstruct",
1476                    summary = format!("Failed to save reconstruction to file: {e:?}")
1477                );
1478            }
1479        }
1480    }
1481
1482    Ok((staged_ledger_hash, result))
1483}
1484
1485pub trait LedgerService: redux::Service {
1486    fn ledger_manager(&self) -> &LedgerManager;
1487    fn force_sync_calls(&self) -> bool {
1488        false
1489    }
1490
1491    fn write_init(&mut self, request: LedgerWriteRequest) {
1492        let request = LedgerRequest::Write(request);
1493        if self.force_sync_calls() {
1494            let _ = self.ledger_manager().call_sync(request);
1495        } else {
1496            self.ledger_manager().call(request);
1497        }
1498    }
1499
1500    fn read_init(&mut self, id: LedgerReadId, request: LedgerReadRequest) {
1501        let request = LedgerRequest::Read(id, request);
1502        if self.force_sync_calls() {
1503            let _ = self.ledger_manager().call_sync(request);
1504        } else {
1505            self.ledger_manager().call(request);
1506        }
1507    }
1508}
1509
1510/// Save reconstruction to file, when it fails.
1511/// So we can easily reproduce the application both in Rust and OCaml, to compare them.
1512fn dump_reconstruct_to_file(
1513    snarked_ledger: &Mask,
1514    parts: &Option<Arc<StagedLedgerAuxAndPendingCoinbasesValid>>,
1515) -> std::io::Result<()> {
1516    use mina_p2p_messages::binprot::{
1517        self,
1518        macros::{BinProtRead, BinProtWrite},
1519    };
1520
1521    #[derive(BinProtRead, BinProtWrite)]
1522    struct ReconstructContext {
1523        accounts: Vec<v2::MinaBaseAccountBinableArgStableV2>,
1524        scan_state: v2::TransactionSnarkScanStateStableV2,
1525        pending_coinbase: v2::MinaBasePendingCoinbaseStableV2,
1526        staged_ledger_hash: LedgerHash,
1527        states: List<v2::MinaStateProtocolStateValueStableV2>,
1528    }
1529
1530    let Some(parts) = parts else {
1531        return Err(std::io::ErrorKind::Other.into());
1532    };
1533
1534    let StagedLedgerAuxAndPendingCoinbasesValid {
1535        scan_state,
1536        staged_ledger_hash,
1537        pending_coinbase,
1538        needed_blocks,
1539    } = &**parts;
1540
1541    let reconstruct_context = ReconstructContext {
1542        accounts: snarked_ledger
1543            .to_list()
1544            .iter()
1545            .map(v2::MinaBaseAccountBinableArgStableV2::from)
1546            .collect(),
1547        scan_state: scan_state.clone(),
1548        pending_coinbase: pending_coinbase.clone(),
1549        staged_ledger_hash: staged_ledger_hash.clone(),
1550        states: needed_blocks.clone(),
1551    };
1552
1553    let debug_dir = openmina_core::get_debug_dir();
1554    let filename = debug_dir
1555        .join("failed_reconstruct_ctx.binprot")
1556        .to_string_lossy()
1557        .to_string();
1558    std::fs::create_dir_all(&debug_dir)?;
1559
1560    use mina_p2p_messages::binprot::BinProtWrite;
1561    let mut file = std::fs::File::create(&filename)?;
1562    reconstruct_context.binprot_write(&mut file)?;
1563    file.sync_all()?;
1564
1565    openmina_core::info!(
1566        openmina_core::log::system_time();
1567        kind = "LedgerService::dump - Failed reconstruct",
1568        summary = format!("Reconstruction saved to: {filename:?}")
1569    );
1570
1571    Ok(())
1572}
1573
1574/// Save staged ledger and block to file, when the application fail.
1575/// So we can easily reproduce the application both in Rust and OCaml, to compare them.
1576/// - <https://github.com/openmina/openmina/blob/8e68037aafddd43842a54c8439baeafee4c6e1eb/ledger/src/staged_ledger/staged_ledger.rs#L5959>
1577/// - TODO: Find OCaml link, I remember having the same test in OCaml but I can't find where
1578fn dump_application_to_file(
1579    staged_ledger: &StagedLedger,
1580    block: ArcBlockWithHash,
1581    pred_block: AppliedBlock,
1582) -> std::io::Result<String> {
1583    use mina_p2p_messages::binprot::{
1584        self,
1585        macros::{BinProtRead, BinProtWrite},
1586    };
1587
1588    #[derive(BinProtRead, BinProtWrite)]
1589    struct ApplyContext {
1590        accounts: Vec<v2::MinaBaseAccountBinableArgStableV2>,
1591        scan_state: v2::TransactionSnarkScanStateStableV2,
1592        pending_coinbase: v2::MinaBasePendingCoinbaseStableV2,
1593        pred_block: v2::MinaBlockBlockStableV2,
1594        blocks: Vec<v2::MinaBlockBlockStableV2>,
1595    }
1596
1597    let cs = &block.block.header.protocol_state.body.consensus_state;
1598    let block_height = cs.blockchain_length.as_u32();
1599
1600    let apply_context = ApplyContext {
1601        accounts: staged_ledger
1602            .ledger()
1603            .to_list()
1604            .iter()
1605            .map(v2::MinaBaseAccountBinableArgStableV2::from)
1606            .collect::<Vec<_>>(),
1607        scan_state: staged_ledger.scan_state().into(),
1608        pending_coinbase: staged_ledger.pending_coinbase_collection().into(),
1609        pred_block: (**pred_block.block()).clone(),
1610        blocks: vec![(*block.block).clone()],
1611    };
1612
1613    let debug_dir = openmina_core::get_debug_dir();
1614    let filename = debug_dir
1615        .join(format!("failed_application_ctx_{}.binprot", block_height))
1616        .to_string_lossy()
1617        .to_string();
1618    std::fs::create_dir_all(&debug_dir)?;
1619
1620    let mut file = std::fs::File::create(&filename)?;
1621
1622    use mina_p2p_messages::binprot::BinProtWrite;
1623    apply_context.binprot_write(&mut file)?;
1624    file.sync_all()?;
1625
1626    Ok(filename)
1627}
1628
1629#[cfg(test)]
1630mod tests {
1631    use mina_p2p_messages::v2::MinaBaseLedgerHash0StableV1;
1632
1633    use crate::ledger::hash_node_at_depth;
1634
1635    use super::*;
1636
1637    #[test]
1638    fn test_ledger_hash() {
1639        IntoIterator::into_iter([(
1640            LedgerAddress::root(),
1641            "jx5YAT36bv62M8mPcREYYfZWXaKqqMzDCP8wmc21uf4CfDKAHCr",
1642            "jxo5pSyt16XGwA9UeuAdiFDzrwFH3smbNTJF7fxq98w1y9Jem2m",
1643            "jwq3nCDr8XejL8HKDxR5qVhFJbKoUTGZgtLBZCp3MrqLTnqmjdP",
1644        )])
1645        .map(|(addr, expected_hash, left, right)| {
1646            let left: LedgerHash = left.parse().unwrap();
1647            let right: LedgerHash = right.parse().unwrap();
1648            (addr, expected_hash, left, right)
1649        })
1650        .for_each(|(address, expected_hash, left, right)| {
1651            let hash = hash_node_at_depth(
1652                address.length(),
1653                left.0.to_field().unwrap(),
1654                right.0.to_field().unwrap(),
1655            );
1656            let hash: LedgerHash = MinaBaseLedgerHash0StableV1(hash.into()).into();
1657            assert_eq!(hash.to_string(), expected_hash);
1658        });
1659    }
1660}