1use super::{
2 ledger_empty_hash_at_depth,
3 read::{LedgerReadId, LedgerReadRequest, LedgerReadResponse},
4 write::{CommitResult, LedgerWriteRequest, LedgerWriteResponse, LedgersToKeep},
5 LedgerAddress, LedgerEvent, LEDGER_DEPTH,
6};
7use crate::{
8 account::AccountPublicKey,
9 block_producer_effectful::StagedLedgerDiffCreateOutput,
10 ledger::{
11 ledger_manager::{LedgerManager, LedgerRequest},
12 write::{BlockApplyResult, BlockApplyResultArchive},
13 },
14 p2p::channels::rpc::StagedLedgerAuxAndPendingCoinbases,
15 rpc::{
16 RpcScanStateSummaryBlockTransaction, RpcScanStateSummaryScanStateJob,
17 RpcScanStateSummaryScanStateJobKind, RpcSnarkPoolJobSnarkWorkDone,
18 },
19 transition_frontier::{
20 genesis::empty_pending_coinbase_hash,
21 sync::{
22 ledger::staged::StagedLedgerAuxAndPendingCoinbasesValid,
23 TransitionFrontierRootSnarkedLedgerUpdates,
24 },
25 },
26};
27use ark_ff::fields::arithmetic::InvalidBigInt;
28use ledger::{
29 scan_state::{
30 currency::Slot,
31 scan_state::{AvailableJobMessage, JobValueBase, JobValueMerge, JobValueWithIndex, Pass},
32 transaction_logic::{
33 local_state::LocalState,
34 protocol_state::{protocol_state_view, ProtocolStateView},
35 transaction_partially_applied::TransactionPartiallyApplied,
36 valid,
37 zkapp_command::AccessedOrNot,
38 Transaction, TransactionStatus, UserCommand,
39 },
40 },
41 sparse_ledger::SparseLedger,
42 staged_ledger::{
43 diff::Diff,
44 staged_ledger::{SkipVerification, StagedLedger},
45 validate_block::block_body_hash,
46 },
47 verifier::Verifier,
48 Account, AccountId, AccountIndex, BaseLedger, Database, Mask, TokenId, UnregisterBehavior,
49};
50use mina_curves::pasta::Fp;
51use mina_p2p_messages::{
52 binprot::BinProtRead,
53 list::List,
54 v2::{
55 self, DataHashLibStateHashStableV1, LedgerHash, MinaBaseLedgerHash0StableV1,
56 MinaBasePendingCoinbaseStableV2, MinaBasePendingCoinbaseWitnessStableV2,
57 MinaBaseSokMessageStableV1, MinaBaseStagedLedgerHashStableV1,
58 MinaStateBlockchainStateValueStableV2LedgerProofStatement,
59 MinaStateProtocolStateValueStableV2, MinaTransactionTransactionStableV2, NonZeroCurvePoint,
60 StateHash,
61 },
62};
63use mina_signer::CompressedPubKey;
64use openmina_core::{
65 block::{AppliedBlock, ArcBlockWithHash},
66 bug_condition,
67 constants::constraint_constants,
68 snark::{Snark, SnarkJobId},
69 thread,
70};
71use std::{
72 collections::{BTreeMap, BTreeSet},
73 path::Path,
74 sync::Arc,
75};
76
77fn merkle_root(mask: &mut Mask) -> LedgerHash {
78 MinaBaseLedgerHash0StableV1(mask.merkle_root().into()).into()
79}
80
81fn error_to_string(e: InvalidBigInt) -> String {
82 format!("{:?}", e)
83}
84
85#[derive(Default)]
87struct StagedLedgersStorage {
88 by_merkle_root_hash: BTreeMap<LedgerHash, Vec<Arc<MinaBaseStagedLedgerHashStableV1>>>,
90 staged_ledgers: BTreeMap<Arc<MinaBaseStagedLedgerHashStableV1>, StagedLedger>,
91}
92
93impl StagedLedgersStorage {
94 fn insert_by_recomputing_hash(&mut self, mut staged_ledger: StagedLedger) {
97 let staged_ledger_hash: MinaBaseStagedLedgerHashStableV1 = (&staged_ledger.hash()).into();
98 self.insert(Arc::new(staged_ledger_hash), staged_ledger);
99 }
100
101 fn insert(
102 &mut self,
103 staged_ledger_hash: Arc<MinaBaseStagedLedgerHashStableV1>,
104 staged_ledger: StagedLedger,
105 ) {
106 let merkle_root_hash: LedgerHash = merkle_root(&mut staged_ledger.ledger());
107 self.by_merkle_root_hash
108 .entry(merkle_root_hash.clone())
109 .or_insert_with(|| Vec::with_capacity(1))
110 .extend([staged_ledger_hash.clone()]);
111 self.staged_ledgers
112 .insert(staged_ledger_hash, staged_ledger);
113 }
114
115 fn get_mask(&self, root_hash: &LedgerHash) -> Option<Mask> {
116 let staged_ledger_hashes: &Vec<_> = self.by_merkle_root_hash.get(root_hash)?;
117 self.staged_ledgers
120 .get(staged_ledger_hashes.first()?)
121 .map(|staged_ledger| staged_ledger.ledger())
122 }
123
124 fn get(&self, staged_ledger_hash: &MinaBaseStagedLedgerHashStableV1) -> Option<&StagedLedger> {
125 self.staged_ledgers.get(staged_ledger_hash)
126 }
127
128 fn get_mut(
129 &mut self,
130 staged_ledger_hash: &MinaBaseStagedLedgerHashStableV1,
131 ) -> Option<&mut StagedLedger> {
132 self.staged_ledgers.get_mut(staged_ledger_hash)
133 }
134
135 fn retain<F>(&mut self, fun: F)
136 where
137 F: Fn(&MinaBaseStagedLedgerHashStableV1) -> bool,
138 {
139 self.by_merkle_root_hash.retain(|_, staged_ledger_hashes| {
140 staged_ledger_hashes.retain(|hash| {
141 if fun(hash) {
142 return true;
143 }
144 self.staged_ledgers.remove(hash);
145 false
146 });
147 !staged_ledger_hashes.is_empty()
148 });
149 }
150
151 fn extend<I>(&mut self, iterator: I)
152 where
153 I: IntoIterator<Item = (Arc<MinaBaseStagedLedgerHashStableV1>, StagedLedger)>,
154 {
155 for (hash, staged_ledger) in iterator.into_iter() {
156 self.insert(hash, staged_ledger);
157 }
158 }
159
160 fn take(&mut self) -> BTreeMap<Arc<MinaBaseStagedLedgerHashStableV1>, StagedLedger> {
161 let Self {
162 by_merkle_root_hash,
163 staged_ledgers,
164 } = self;
165
166 let _ = std::mem::take(by_merkle_root_hash);
167 std::mem::take(staged_ledgers)
168 }
169}
170
171#[derive(Default)]
172pub struct LedgerCtx {
173 snarked_ledgers: BTreeMap<LedgerHash, Mask>,
174 additional_snarked_ledgers: BTreeMap<LedgerHash, Mask>,
176 staged_ledgers: StagedLedgersStorage,
177 sync: LedgerSyncState,
178 archive_mode: bool,
180 event_sender:
181 Option<openmina_core::channels::mpsc::UnboundedSender<crate::event_source::Event>>,
182}
183
184#[derive(Default)]
185struct LedgerSyncState {
186 snarked_ledgers: BTreeMap<LedgerHash, Mask>,
187 staged_ledgers: StagedLedgersStorage,
188}
189
190impl LedgerCtx {
191 pub fn new_with_additional_snarked_ledgers<P>(path: P) -> Self
192 where
193 P: AsRef<Path>,
194 {
195 use std::fs;
196
197 let Ok(dir) = fs::read_dir(path) else {
198 return Self::default();
199 };
200
201 let additional_snarked_ledgers = dir
202 .filter_map(|entry| {
203 let entry = entry.ok()?;
204 let hash = entry.file_name().to_str()?.parse().ok()?;
205 let mut file = fs::File::open(entry.path()).ok()?;
206
207 let _ = Option::<LedgerHash>::binprot_read(&mut file).ok()?;
208
209 let accounts = Vec::<Account>::binprot_read(&mut file).ok()?;
210 let mut mask = Mask::new_root(Database::create(35));
211 for account in accounts {
212 let account_id = account.id();
213 mask.get_or_create_account(account_id, account).unwrap();
214 }
215 Some((hash, mask))
216 })
217 .collect();
218
219 LedgerCtx {
220 additional_snarked_ledgers,
221 ..Default::default()
222 }
223 }
224
225 pub fn set_archive_mode(&mut self) {
226 self.archive_mode = true;
227 }
228
229 pub fn set_event_sender(
232 &mut self,
233 event_sender: openmina_core::channels::mpsc::UnboundedSender<crate::event_source::Event>,
234 ) {
235 self.event_sender = Some(event_sender);
236 }
237
238 pub(super) fn send_event(&self, event: LedgerEvent) {
239 if let Some(tx) = self.event_sender.as_ref() {
240 let _ = tx.send(event.into());
241 }
242 }
243
244 pub(super) fn send_write_response(&self, resp: LedgerWriteResponse) {
245 self.send_event(LedgerEvent::Write(resp))
246 }
247
248 pub(super) fn send_read_response(&self, id: LedgerReadId, resp: LedgerReadResponse) {
249 self.send_event(LedgerEvent::Read(id, resp))
250 }
251
252 pub fn insert_genesis_ledger(&mut self, mut mask: Mask) {
253 let merkle_root_hash = merkle_root(&mut mask);
254 let staged_ledger =
255 StagedLedger::create_exn(constraint_constants().clone(), mask.copy()).unwrap();
256 self.snarked_ledgers.insert(merkle_root_hash.clone(), mask);
257 let staged_ledger_hash =
259 MinaBaseStagedLedgerHashStableV1::zero(merkle_root_hash, empty_pending_coinbase_hash());
260 self.staged_ledgers
261 .insert(Arc::new(staged_ledger_hash), staged_ledger);
262 }
263
264 pub fn staged_ledger_reconstruct_result_store(&mut self, ledger: StagedLedger) {
265 self.staged_ledgers.insert_by_recomputing_hash(ledger);
266 }
267
268 pub fn get_accounts_for_rpc(
270 &self,
271 ledger_hash: LedgerHash,
272 requested_public_key: Option<AccountPublicKey>,
273 ) -> Vec<Account> {
274 if let Some((mask, _)) = self.mask(&ledger_hash) {
275 let mut accounts = Vec::new();
276 let mut single_account = Vec::new();
277
278 mask.iter(|account| {
279 accounts.push(account.clone());
280 if let Some(public_key) = requested_public_key.as_ref() {
281 if public_key == &AccountPublicKey::from(account.public_key.clone()) {
282 single_account.push(account.clone());
283 }
284 }
285 });
286
287 if requested_public_key.is_some() {
288 single_account
289 } else {
290 accounts
291 }
292 } else {
293 vec![]
294 }
295 }
296
297 pub fn mask(&self, hash: &LedgerHash) -> Option<(Mask, bool)> {
300 self.snarked_ledgers
301 .get(hash)
302 .cloned()
303 .map(|mask| (mask, true))
304 .or_else(|| Some((self.staged_ledgers.get_mask(hash)?, true)))
305 .or_else(|| self.sync.mask(hash))
306 .or_else(|| {
307 self.additional_snarked_ledgers
308 .get(hash)
309 .map(|l| (l.clone(), true))
310 })
311 }
312
313 pub fn contains_snarked_ledger(&self, hash: &LedgerHash) -> bool {
314 self.snarked_ledgers.contains_key(hash)
315 }
316
317 pub fn pending_sync_snarked_ledger_mask(&self, hash: &LedgerHash) -> Result<Mask, String> {
319 self.sync.pending_sync_snarked_ledger_mask(hash)
320 }
321
322 pub fn copy_snarked_ledger_contents_for_sync(
325 &mut self,
326 origin_snarked_ledger_hash: LedgerHash,
327 target_snarked_ledger_hash: LedgerHash,
328 overwrite: bool,
329 ) -> Result<bool, String> {
330 if !overwrite
331 && self
332 .sync
333 .snarked_ledgers
334 .contains_key(&target_snarked_ledger_hash)
335 {
336 return Ok(false);
337 }
338
339 let origin = self
340 .snarked_ledgers
341 .get(&origin_snarked_ledger_hash)
342 .or_else(|| {
343 self.sync.snarked_ledgers.get(&origin_snarked_ledger_hash)
346 })
347 .ok_or(format!(
348 "Tried to copy from non-existing snarked ledger with hash: {}",
349 origin_snarked_ledger_hash
350 ))?;
351
352 let target = origin.copy();
353 self.sync
354 .snarked_ledgers
355 .insert(target_snarked_ledger_hash, target);
356
357 Ok(true)
358 }
359
360 pub fn compute_snarked_ledger_hashes(
361 &mut self,
362 snarked_ledger_hash: &LedgerHash,
363 ) -> Result<(), String> {
364 let origin = self
365 .snarked_ledgers
366 .get_mut(snarked_ledger_hash)
367 .or_else(|| self.sync.snarked_ledgers.get_mut(snarked_ledger_hash))
368 .ok_or(format!(
369 "Cannot hash non-existing snarked ledger: {}",
370 snarked_ledger_hash
371 ))?;
372
373 let _force_hashing = origin.merkle_root();
376
377 Ok(())
378 }
379
380 fn staged_ledger_mut(
382 &mut self,
383 hash: &MinaBaseStagedLedgerHashStableV1,
384 ) -> Option<&mut StagedLedger> {
385 match self.staged_ledgers.get_mut(hash) {
386 Some(v) => Some(v),
387 None => self.sync.staged_ledger_mut(hash),
388 }
389 }
390
391 fn recreate_snarked_ledger(
392 &mut self,
393 root_snarked_ledger_updates: &TransitionFrontierRootSnarkedLedgerUpdates,
394 needed_protocol_states: &BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
395 snarked_ledger_hash: &LedgerHash,
396 ) -> Result<(), String> {
397 let Some(update) = root_snarked_ledger_updates.get(snarked_ledger_hash) else {
398 return Ok(());
399 };
400 self.recreate_snarked_ledger(
401 root_snarked_ledger_updates,
402 needed_protocol_states,
403 &update.parent,
404 )?;
405
406 self.push_snarked_ledger(
407 needed_protocol_states,
408 &update.parent,
409 snarked_ledger_hash,
410 &update.staged_ledger_hash,
411 )
412 }
413
414 fn push_snarked_ledger(
415 &mut self,
416 protocol_states: &BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
417 old_root_snarked_ledger_hash: &LedgerHash,
418 new_root_snarked_ledger_hash: &LedgerHash,
419 new_root_staged_ledger_hash: &MinaBaseStagedLedgerHashStableV1,
420 ) -> Result<(), String> {
421 openmina_core::debug!(openmina_core::log::system_time();
422 kind = "LedgerService::push_snarked_ledger",
423 summary = format!("{old_root_snarked_ledger_hash} -> {new_root_snarked_ledger_hash}"));
424 let constraint_constants = &constraint_constants();
426
427 let root_snarked_ledger = self
429 .snarked_ledgers
430 .get(old_root_snarked_ledger_hash)
431 .or_else(|| self.sync.snarked_ledgers.get(old_root_snarked_ledger_hash))
432 .ok_or_else(|| {
433 format!(
434 "push_snarked_ledger: could not find old root snarked ledger: {}",
435 old_root_snarked_ledger_hash,
436 )
437 })?;
438 let mut mt = root_snarked_ledger.make_child();
439
440 let apply_first_pass = |global_slot: Slot,
442 txn_state_view: &ProtocolStateView,
443 ledger: &mut Mask,
444 transaction: &Transaction| {
445 ledger::scan_state::transaction_logic::apply_transaction_first_pass(
446 constraint_constants,
447 global_slot,
448 txn_state_view,
449 ledger,
450 transaction,
451 )
452 };
453
454 let apply_second_pass = |ledger: &mut Mask, tx: TransactionPartiallyApplied<Mask>| {
455 ledger::scan_state::transaction_logic::apply_transaction_second_pass(
456 constraint_constants,
457 ledger,
458 tx,
459 )
460 };
461
462 let apply_first_pass_sparse_ledger =
463 |global_slot: Slot,
464 txn_state_view: &ProtocolStateView,
465 sparse_ledger: &mut SparseLedger,
466 transaction: &Transaction| {
467 ledger::scan_state::transaction_logic::apply_transaction_first_pass(
468 constraint_constants,
469 global_slot,
470 txn_state_view,
471 sparse_ledger,
472 transaction,
473 )
474 };
475
476 let get_protocol_state = |state_hash: Fp| {
477 let state_hash = StateHash::from_fp(state_hash);
478 if let Some(s) = protocol_states.get(&state_hash) {
479 Ok(s.clone())
480 } else {
481 Err(format!(
482 "Failed to find protocol state for state hash: {}",
483 state_hash
484 ))
485 }
486 };
487
488 let scan_state = self
489 .staged_ledger_mut(new_root_staged_ledger_hash)
490 .ok_or_else(|| {
491 format!(
492 "Failed to find staged ledger with hash: {:#?}",
493 new_root_staged_ledger_hash
494 )
495 })?
496 .scan_state();
497
498 let Pass::FirstPassLedgerHash(_first_pass_ledger_target) = scan_state
499 .get_snarked_ledger_sync(
500 &mut mt,
501 get_protocol_state,
502 apply_first_pass,
503 apply_second_pass,
504 apply_first_pass_sparse_ledger,
505 )?;
506
507 let expected_hash = new_root_snarked_ledger_hash;
509 let obtained_hash = LedgerHash::from_fp(mt.merkle_root());
510
511 if expected_hash != &obtained_hash {
512 return Err(format!(
513 "Expected to obtain snarked root ledger hash {} but got {}",
514 expected_hash, obtained_hash
515 ));
516 }
517
518 self.sync
519 .snarked_ledgers
520 .insert(new_root_snarked_ledger_hash.clone(), mt);
521
522 Ok(())
523 }
524
525 pub fn get_account_delegators(
526 &self,
527 ledger_hash: &LedgerHash,
528 account_id: &AccountId,
529 ) -> Option<Vec<Account>> {
530 let (mask, _) = self.mask(ledger_hash)?;
531 let mut accounts = Vec::new();
532
533 mask.iter(|account| {
534 if account.delegate == Some(account_id.public_key.clone()) {
535 accounts.push(account.clone());
536 }
537 });
538
539 Some(accounts)
540 }
541
542 #[allow(clippy::type_complexity)]
543 pub fn producers_with_delegates<F: FnMut(&CompressedPubKey) -> bool>(
544 &self,
545 ledger_hash: &LedgerHash,
546 mut filter: F,
547 ) -> Option<BTreeMap<AccountPublicKey, Vec<(ledger::AccountIndex, AccountPublicKey, u64)>>>
548 {
549 let (mask, _) = self.mask(ledger_hash)?;
550 let mut accounts = Vec::new();
551
552 mask.iter(|account| {
553 if filter(account.delegate.as_ref().unwrap_or(&account.public_key)) {
554 accounts.push((
555 account.id(),
556 account.delegate.clone(),
557 account.balance.as_u64(),
558 ))
559 }
560 });
561
562 let producers = accounts.into_iter().fold(
563 BTreeMap::<_, Vec<_>>::new(),
564 |mut producers, (id, delegate, balance)| {
565 let index = mask.index_of_account(id.clone()).unwrap();
566 let pub_key = AccountPublicKey::from(id.public_key);
567 let producer = delegate.map(Into::into).unwrap_or(pub_key.clone());
568 producers
569 .entry(producer)
570 .or_default()
571 .push((index, pub_key, balance));
572 producers
573 },
574 );
575 Some(producers)
576 }
577
578 pub fn child_hashes_get(
579 &mut self,
580 snarked_ledger_hash: LedgerHash,
581 parent: &LedgerAddress,
582 ) -> Result<(LedgerHash, LedgerHash), String> {
583 let mut mask = self.pending_sync_snarked_ledger_mask(&snarked_ledger_hash)?;
584 let left_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.child_left())?);
585 let right_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.child_right())?);
586
587 Ok((left_hash, right_hash))
588 }
589
590 pub fn accounts_set(
591 &mut self,
592 snarked_ledger_hash: LedgerHash,
593 parent: &LedgerAddress,
594 accounts: Vec<v2::MinaBaseAccountBinableArgStableV2>,
595 ) -> Result<LedgerHash, String> {
596 let mut mask = self.pending_sync_snarked_ledger_mask(&snarked_ledger_hash)?;
597 let accounts: Vec<_> = accounts
598 .into_iter()
599 .map(|account| Ok(Box::new((&account).try_into()?)))
600 .collect::<Result<Vec<_>, InvalidBigInt>>()
601 .map_err(error_to_string)?;
602
603 mask.set_all_accounts_rooted_at(parent.clone(), &accounts)
604 .map_err(|_| "Failed when setting accounts".to_owned())?;
605
606 let computed_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.clone())?);
607
608 Ok(computed_hash)
609 }
610
611 pub fn staged_ledger_reconstruct<F>(
612 &mut self,
613 snarked_ledger_hash: LedgerHash,
614 parts: Option<Arc<StagedLedgerAuxAndPendingCoinbasesValid>>,
615 callback: F,
616 ) -> Result<(), InvalidBigInt>
617 where
618 F: 'static + FnOnce(v2::LedgerHash, Result<StagedLedger, String>) + Send,
619 {
620 let snarked_ledger = self
621 .sync
622 .snarked_ledger_mut(snarked_ledger_hash.clone())?
623 .copy();
624
625 thread::Builder::new()
626 .name("staged-ledger-reconstruct".into())
627 .spawn(move || {
628 match staged_ledger_reconstruct(snarked_ledger, snarked_ledger_hash, parts) {
629 Ok((staged_ledger_hash, result)) => {
630 callback(staged_ledger_hash, result);
631 }
632 Err(e) => callback(v2::LedgerHash::zero(), Err(format!("{:?}", e))),
633 }
634 })
635 .expect("Failed: staged ledger reconstruct thread");
636
637 Ok(())
638 }
639
640 pub fn staged_ledger_reconstruct_sync(
641 &mut self,
642 snarked_ledger_hash: LedgerHash,
643 parts: Option<Arc<StagedLedgerAuxAndPendingCoinbasesValid>>,
644 ) -> Result<(v2::LedgerHash, Result<(), String>), InvalidBigInt> {
645 let snarked_ledger = self
646 .sync
647 .snarked_ledger_mut(snarked_ledger_hash.clone())?
648 .copy();
649 let (staged_ledger_hash, result) =
650 staged_ledger_reconstruct(snarked_ledger, snarked_ledger_hash, parts)?;
651 let result = match result {
652 Err(err) => Err(err),
653 Ok(staged_ledger) => {
654 self.staged_ledger_reconstruct_result_store(staged_ledger);
655 Ok(())
656 }
657 };
658
659 Ok((staged_ledger_hash, result))
660 }
661
662 pub fn block_apply(
663 &mut self,
664 block: ArcBlockWithHash,
665 pred_block: AppliedBlock,
666 skip_verification: Option<SkipVerification>,
667 ) -> Result<BlockApplyResult, String> {
668 openmina_core::info!(openmina_core::log::system_time();
669 kind = "LedgerService::block_apply",
670 summary = format!("{}, {} <- {}", block.height(), block.hash(), block.pred_hash()),
671 pred_staged_ledger_hash = pred_block.merkle_root_hash().to_string(),
672 staged_ledger_hash = block.merkle_root_hash().to_string(),
673 );
674 let mut staged_ledger = self
675 .staged_ledger_mut(pred_block.staged_ledger_hashes())
676 .ok_or_else(|| {
677 format!(
678 "parent staged ledger missing: {:#?}",
679 pred_block.staged_ledger_hashes()
680 )
681 })?
682 .clone();
683
684 let global_slot = block.global_slot_since_genesis();
685 let prev_protocol_state = &pred_block.header().protocol_state;
686 let prev_state_view = protocol_state_view(prev_protocol_state).map_err(error_to_string)?;
687
688 let consensus_state = &block.header().protocol_state.body.consensus_state;
689 let coinbase_receiver: CompressedPubKey = (&consensus_state.coinbase_receiver)
690 .try_into()
691 .map_err(error_to_string)?;
692 let supercharge_coinbase = consensus_state.supercharge_coinbase;
693
694 let diff: Diff = (&block.body().staged_ledger_diff)
695 .try_into()
696 .map_err(error_to_string)?;
697
698 let prev_protocol_state: ledger::proofs::block::ProtocolState =
699 prev_protocol_state.try_into()?;
700
701 let result = staged_ledger
702 .apply(
703 skip_verification,
704 constraint_constants(),
705 Slot::from_u32(global_slot),
706 diff,
707 (),
708 &Verifier,
709 &prev_state_view,
710 prev_protocol_state.hashes(),
711 coinbase_receiver.clone(),
712 supercharge_coinbase,
713 )
714 .map_err(|err| format!("{err:?}"))?;
715 let just_emitted_a_proof = result.ledger_proof.is_some();
716 let ledger_hashes = MinaBaseStagedLedgerHashStableV1::from(&result.hash_after_applying);
717
718 let expected_ledger_hashes = block.staged_ledger_hashes();
720 if &ledger_hashes != expected_ledger_hashes {
721 let staged_ledger = self
722 .staged_ledger_mut(pred_block.staged_ledger_hashes())
723 .unwrap(); match dump_application_to_file(staged_ledger, block.clone(), pred_block) {
726 Ok(filename) => openmina_core::info!(
727 openmina_core::log::system_time();
728 kind = "LedgerService::dump - Failed application",
729 summary = format!("StagedLedger and block saved to: {filename:?}")
730 ),
731 Err(e) => openmina_core::error!(
732 openmina_core::log::system_time();
733 kind = "LedgerService::dump - Failed application",
734 summary = format!("Failed to save block application to file: {e:?}")
735 ),
736 }
737
738 panic!("staged ledger hash mismatch. found: {ledger_hashes:#?}, expected: {expected_ledger_hashes:#?}");
739 }
740
741 let archive_data = if self.archive_mode {
742 let senders = block
743 .body()
744 .transactions()
745 .filter_map(|tx| UserCommand::try_from(tx).ok().map(|cmd| cmd.fee_payer()))
746 .collect::<BTreeSet<_>>()
747 .into_iter();
748
749 let coinbase_receiver_id = AccountId::new(coinbase_receiver, TokenId::default());
750
751 let (accessed, not_accessed): (BTreeSet<_>, BTreeSet<_>) = block
753 .body()
754 .tranasctions_with_status()
755 .flat_map(|(tx, status)| {
756 let status: TransactionStatus = status.into();
757 UserCommand::try_from(tx)
758 .ok()
759 .map(|cmd| cmd.account_access_statuses(&status))
760 .into_iter()
761 .flatten()
762 })
763 .partition(|(_, status)| *status == AccessedOrNot::Accessed);
764
765 let mut account_ids_accessed: BTreeSet<_> =
766 accessed.into_iter().map(|(id, _)| id).collect();
767 let mut account_ids_not_accessed: BTreeSet<_> =
768 not_accessed.into_iter().map(|(id, _)| id).collect();
769
770 let has_coinbase = block.body().has_coinbase();
776
777 if has_coinbase {
778 account_ids_accessed.insert(coinbase_receiver_id);
779 } else {
780 account_ids_not_accessed.insert(coinbase_receiver_id);
781 }
782
783 let fee_transfer_accounts =
785 block.body().coinbase_fee_transfers_iter().filter_map(|cb| {
786 let receiver: CompressedPubKey = cb.receiver_pk.inner().try_into().ok()?;
787 let account_id = AccountId::new(receiver, TokenId::default());
788 Some(account_id)
789 });
790 account_ids_accessed.extend(fee_transfer_accounts);
791
792 let accounts_accessed: Vec<(AccountIndex, Account)> = account_ids_accessed
794 .iter()
795 .filter_map(|id| {
796 staged_ledger
797 .ledger()
798 .index_of_account(id.clone())
799 .and_then(|index| {
800 staged_ledger
801 .ledger()
802 .get_at_index(index)
803 .map(|account| (index, *account))
804 })
805 })
806 .collect();
807
808 let account_creation_fee = constraint_constants().account_creation_fee;
809
810 let accounts_created: Vec<(AccountId, u64)> = staged_ledger
812 .latest_block_accounts_created(pred_block.hash().to_field()?)
813 .iter()
814 .map(|id| (id.clone(), account_creation_fee))
815 .collect();
816
817 let all_account_ids: BTreeSet<_> = account_ids_accessed
820 .union(&account_ids_not_accessed)
821 .collect();
822 let tokens_used: BTreeSet<(TokenId, Option<AccountId>)> = if has_coinbase {
823 all_account_ids
824 .iter()
825 .map(|id| {
826 let token_id = id.token_id.clone();
827 let token_owner = staged_ledger.ledger().token_owner(token_id.clone());
828 (token_id, token_owner)
829 })
830 .collect()
831 } else {
832 BTreeSet::new()
833 };
834
835 let sender_receipt_chains_from_parent_ledger = senders
836 .filter_map(|sender| {
837 if let Some(location) = staged_ledger.ledger().location_of_account(&sender) {
838 staged_ledger.ledger().get(location).map(|account| {
839 (
840 sender,
841 v2::ReceiptChainHash::from(account.receipt_chain_hash),
842 )
843 })
844 } else {
845 None
846 }
847 })
848 .collect();
849 Some(BlockApplyResultArchive {
850 accounts_accessed,
851 accounts_created,
852 tokens_used,
853 sender_receipt_chains_from_parent_ledger,
854 })
855 } else {
856 None
857 };
858
859 self.sync
860 .staged_ledgers
861 .insert(Arc::new(ledger_hashes), staged_ledger);
862
863 Ok(BlockApplyResult {
866 block,
867 just_emitted_a_proof,
868 archive_data,
869 })
870 }
871
872 pub fn commit(
873 &mut self,
874 ledgers_to_keep: LedgersToKeep,
875 root_snarked_ledger_updates: TransitionFrontierRootSnarkedLedgerUpdates,
876 needed_protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
877 new_root: &ArcBlockWithHash,
878 new_best_tip: &ArcBlockWithHash,
879 ) -> CommitResult {
880 openmina_core::debug!(openmina_core::log::system_time();
881 kind = "LedgerService::commit",
882 summary = format!("commit {}, {}", new_best_tip.height(), new_best_tip.hash()),
883 new_root = format!("{}, {}", new_root.height(), new_root.hash()),
884 new_root_staking_epoch_ledger = new_root.staking_epoch_ledger_hash().to_string(),
885 new_root_next_epoch_ledger = new_root.next_epoch_ledger_hash().to_string(),
886 new_root_snarked_ledger = new_root.snarked_ledger_hash().to_string(),
887 );
888 self.recreate_snarked_ledger(
889 &root_snarked_ledger_updates,
890 &needed_protocol_states,
891 new_root.snarked_ledger_hash(),
892 )
893 .unwrap();
894
895 self.snarked_ledgers.retain(|hash, _| {
896 let keep = ledgers_to_keep.contains(hash);
897 if !keep {
898 openmina_core::debug!(openmina_core::log::system_time();
899 kind = "LedgerService::commit - snarked_ledgers.drop",
900 summary = format!("drop snarked ledger {hash}"));
901 }
902 keep
903 });
904 self.snarked_ledgers.extend(
905 std::mem::take(&mut self.sync.snarked_ledgers)
906 .into_iter()
907 .filter(|(hash, _)| {
908 let keep = ledgers_to_keep.contains(hash);
909 if !keep {
910 openmina_core::debug!(openmina_core::log::system_time();
911 kind = "LedgerService::commit - snarked_ledgers.drop",
912 summary = format!("drop snarked ledger {hash}"));
913 }
914 keep
915 }),
916 );
917
918 self.staged_ledgers
919 .retain(|hash| ledgers_to_keep.contains(hash));
920 self.staged_ledgers.extend(
921 self.sync
922 .staged_ledgers
923 .take()
924 .into_iter()
925 .filter(|(hash, _)| ledgers_to_keep.contains(&**hash)),
926 );
927
928 for ledger_hash in [
929 new_best_tip.staking_epoch_ledger_hash(),
930 new_root.snarked_ledger_hash(),
931 new_root.merkle_root_hash(),
932 ] {
933 if let Some((mut mask, is_synced)) = self.mask(ledger_hash) {
934 if !is_synced {
935 panic!("ledger mask expected to be synced: {ledger_hash}");
936 }
937 let calculated = merkle_root(&mut mask);
938 assert_eq!(ledger_hash, &calculated, "ledger mask hash mismatch");
939 } else {
940 panic!("ledger mask is missing: {ledger_hash}");
941 }
942 }
943
944 for (ledger_hash, snarked_ledger) in self.snarked_ledgers.iter_mut() {
945 while let Some((parent_hash, parent)) = snarked_ledger
946 .get_parent()
947 .map(|mut parent| (merkle_root(&mut parent), parent))
948 .filter(|(parent_hash, _)| !ledgers_to_keep.contains(parent_hash))
949 {
950 openmina_core::debug!(openmina_core::log::system_time();
951 kind = "LedgerService::commit - mask.commit_and_reparent",
952 summary = format!("{ledger_hash} -> {parent_hash}"));
953 snarked_ledger.commit();
954 snarked_ledger.unregister_mask(UnregisterBehavior::Check);
955 *snarked_ledger = parent;
956 }
957 }
958
959 let Some(new_root_ledger) = self.staged_ledgers.get_mut(new_root.staged_ledger_hashes())
961 else {
962 return Default::default();
963 };
964
965 new_root_ledger.commit_and_reparent_to_root();
967
968 let needed_protocol_states = self
969 .staged_ledger_mut(new_root.staged_ledger_hashes())
970 .map(|l| {
971 l.scan_state()
972 .required_state_hashes()
973 .into_iter()
974 .map(|fp| DataHashLibStateHashStableV1(fp.into()).into())
975 .collect()
976 })
977 .unwrap_or_default();
978
979 let available_jobs = Arc::new(
980 self.staged_ledger_mut(new_best_tip.staged_ledger_hashes())
981 .map(|l| {
982 l.scan_state()
983 .all_job_pairs_iter()
984 .map(|job| job.map(|single| AvailableJobMessage::from(single)))
985 .collect()
986 })
987 .unwrap_or_default(),
988 );
989
990 CommitResult {
993 alive_masks: ::ledger::mask::alive_len(),
994 available_jobs,
995 needed_protocol_states,
996 }
997 }
998
999 #[allow(dead_code)]
1000 fn check_alive_masks(&mut self) {
1001 let mut alive: BTreeSet<_> = ::ledger::mask::alive_collect();
1002 let staged_ledgers = self
1003 .staged_ledgers
1004 .staged_ledgers
1005 .iter()
1006 .map(|(hash, ledger)| (&hash.non_snark.ledger_hash, ledger.ledger_ref()));
1007
1008 let alive_ledgers = self
1009 .snarked_ledgers
1010 .iter()
1011 .chain(staged_ledgers)
1012 .map(|(hash, mask)| {
1013 let uuid = mask.get_uuid();
1014 if !alive.remove(&uuid) {
1015 bug_condition!("mask not found among alive masks! uuid: {uuid}, hash: {hash}");
1016 }
1017 (uuid, hash)
1018 })
1019 .collect::<Vec<_>>();
1020 openmina_core::debug!(redux::Timestamp::global_now(); "alive_ledgers_after_commit: {alive_ledgers:#?}");
1021
1022 if !alive.is_empty() {
1023 bug_condition!(
1024 "masks alive which are no longer part of the ledger service: {alive:#?}"
1025 );
1026 }
1027 }
1028
1029 pub fn get_num_accounts(
1030 &mut self,
1031 ledger_hash: v2::LedgerHash,
1032 ) -> Option<(u64, v2::LedgerHash)> {
1033 let (mask, _) = self
1034 .mask(&ledger_hash)
1035 .filter(|(_, is_synced)| *is_synced)?;
1036 let num_accounts = mask.num_accounts() as u64;
1039 let first_node_addr = ledger::Address::first(
1040 LEDGER_DEPTH.saturating_sub(super::tree_height_for_num_accounts(num_accounts)),
1041 );
1042 let hash = LedgerHash::from_fp(mask.get_hash(first_node_addr)?);
1043 Some((num_accounts, hash))
1044 }
1045
1046 pub fn get_child_hashes(
1047 &mut self,
1048 ledger_hash: v2::LedgerHash,
1049 addr: LedgerAddress,
1050 ) -> Option<(v2::LedgerHash, v2::LedgerHash)> {
1051 let (mask, is_synced) = self.mask(&ledger_hash)?;
1052 let get_hash = |addr: LedgerAddress| {
1053 let depth = addr.length();
1054 mask.get_hash(addr)
1055 .map(|fp| MinaBaseLedgerHash0StableV1(fp.into()).into())
1056 .or_else(|| {
1057 if is_synced {
1058 Some(ledger_empty_hash_at_depth(depth))
1059 } else {
1060 None
1061 }
1062 })
1063 };
1064 let (left, right) = (addr.child_left(), addr.child_right());
1065 Some((get_hash(left)?, get_hash(right)?))
1066 }
1067
1068 pub fn get_child_accounts(
1069 &mut self,
1070 ledger_hash: v2::LedgerHash,
1071 addr: LedgerAddress,
1072 ) -> Option<Vec<v2::MinaBaseAccountBinableArgStableV2>> {
1073 let (mask, _) = self
1074 .mask(&ledger_hash)
1075 .filter(|(_, is_synced)| *is_synced)?;
1076 let accounts = mask
1078 .get_all_accounts_rooted_at(addr)?
1079 .into_iter()
1080 .map(|(_, account)| (&*account).into())
1081 .collect();
1082 Some(accounts)
1083 }
1084
1085 pub fn get_accounts(
1086 &mut self,
1087 ledger_hash: v2::LedgerHash,
1088 ids: Vec<AccountId>,
1089 ) -> Vec<Account> {
1090 let Some((mask, _)) = self.mask(&ledger_hash) else {
1091 openmina_core::warn!(
1092 openmina_core::log::system_time();
1093 kind = "LedgerService::get_accounts",
1094 summary = format!("Ledger not found: {ledger_hash:?}")
1095 );
1096 return Vec::new();
1097 };
1098 let addrs = mask
1099 .location_of_account_batch(&ids)
1100 .into_iter()
1101 .filter_map(|(_id, addr)| addr)
1102 .collect::<Vec<_>>();
1103
1104 mask.get_batch(&addrs)
1105 .into_iter()
1106 .filter_map(|(_, account)| account.map(|account| *account))
1107 .collect::<Vec<_>>()
1108 }
1109
1110 pub fn staged_ledger_aux_and_pending_coinbase(
1111 &mut self,
1112 ledger_hash: &MinaBaseStagedLedgerHashStableV1,
1113 protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
1114 ) -> Option<Arc<StagedLedgerAuxAndPendingCoinbases>> {
1115 let ledger = self.staged_ledger_mut(ledger_hash)?;
1116 let needed_blocks = ledger
1117 .scan_state()
1118 .required_state_hashes()
1119 .into_iter()
1120 .map(|fp| DataHashLibStateHashStableV1(fp.into()))
1121 .map(|hash| protocol_states.get(&hash.into()).ok_or(()).cloned())
1122 .collect::<Result<_, _>>()
1123 .ok()?;
1124 ledger.pending_coinbase_collection_merkle_root();
1127 Some(
1128 StagedLedgerAuxAndPendingCoinbases {
1129 scan_state: (ledger.scan_state()).into(),
1130 staged_ledger_hash: ledger_hash.non_snark.ledger_hash.clone(),
1131 pending_coinbase: (ledger.pending_coinbase_collection()).into(),
1132 needed_blocks,
1133 }
1134 .into(),
1135 )
1136 }
1137
1138 #[allow(clippy::too_many_arguments)]
1139 pub fn staged_ledger_diff_create(
1140 &mut self,
1141 pred_block: AppliedBlock,
1142 global_slot_since_genesis: v2::MinaNumbersGlobalSlotSinceGenesisMStableV1,
1143 is_new_epoch: bool,
1144 producer: NonZeroCurvePoint,
1145 delegator: NonZeroCurvePoint,
1146 coinbase_receiver: NonZeroCurvePoint,
1147 completed_snarks: BTreeMap<SnarkJobId, Snark>,
1148 supercharge_coinbase: bool,
1149 transactions_by_fee: Vec<valid::UserCommand>,
1150 ) -> Result<StagedLedgerDiffCreateOutput, String> {
1151 let mut staged_ledger = self
1152 .staged_ledger_mut(pred_block.staged_ledger_hashes())
1153 .ok_or_else(|| {
1154 format!(
1155 "parent staged ledger missing: {}",
1156 pred_block.merkle_root_hash()
1157 )
1158 })?
1159 .clone();
1160
1161 staged_ledger.hash();
1163 let pending_coinbase_witness =
1164 MinaBasePendingCoinbaseStableV2::from(staged_ledger.pending_coinbase_collection());
1165
1166 let protocol_state_view =
1167 protocol_state_view(&pred_block.header().protocol_state).map_err(error_to_string)?;
1168
1169 let (pre_diff, _invalid_txns) = staged_ledger
1171 .create_diff(
1172 constraint_constants(),
1173 (&global_slot_since_genesis).into(),
1174 Some(true),
1175 (&coinbase_receiver).try_into().map_err(error_to_string)?,
1176 (),
1177 &protocol_state_view,
1178 transactions_by_fee,
1179 |stmt| {
1180 let job_id = SnarkJobId::from(stmt);
1181 match completed_snarks.get(&job_id) {
1182 Some(snark) => snark.try_into().ok(),
1183 None => None,
1184 }
1185 },
1186 supercharge_coinbase,
1187 )
1188 .map_err(|err| format!("{err:?}"))?;
1189
1190 let pred_body_hash = pred_block
1194 .header()
1195 .protocol_state
1196 .body
1197 .try_hash()
1198 .map_err(error_to_string)?;
1199 let diff = (&pre_diff).into();
1200
1201 let res = staged_ledger
1202 .apply_diff_unchecked(
1203 constraint_constants(),
1204 (&global_slot_since_genesis).into(),
1205 pre_diff,
1206 (),
1207 &protocol_state_view,
1208 (
1209 pred_block.hash().0.to_field().map_err(error_to_string)?,
1210 pred_body_hash.0.to_field().map_err(error_to_string)?,
1211 ),
1212 (&coinbase_receiver).try_into().map_err(error_to_string)?,
1213 supercharge_coinbase,
1214 )
1215 .map_err(|err| format!("{err:?}"))?;
1216
1217 let diff_hash = block_body_hash(&diff).map_err(|err| format!("{err:?}"))?;
1218 let staking_ledger_hash = if is_new_epoch {
1219 pred_block.next_epoch_ledger_hash()
1220 } else {
1221 pred_block.staking_epoch_ledger_hash()
1222 };
1223
1224 Ok(StagedLedgerDiffCreateOutput {
1225 diff,
1226 diff_hash,
1227 staged_ledger_hash: (&res.hash_after_applying).into(),
1228 emitted_ledger_proof: res
1229 .ledger_proof
1230 .map(|(proof, ..)| (&proof).into())
1231 .map(Arc::new),
1232 pending_coinbase_update: (&res.pending_coinbase_update.1).into(),
1233 pending_coinbase_witness: MinaBasePendingCoinbaseWitnessStableV2 {
1234 pending_coinbases: pending_coinbase_witness,
1235 is_new_stack: res.pending_coinbase_update.0,
1236 },
1237 stake_proof_sparse_ledger: self
1238 .stake_proof_sparse_ledger(staking_ledger_hash, &producer, &delegator)
1239 .map_err(error_to_string)?,
1240 })
1241 }
1242
1243 pub fn stake_proof_sparse_ledger(
1244 &mut self,
1245 staking_ledger: &LedgerHash,
1246 producer: &NonZeroCurvePoint,
1247 delegator: &NonZeroCurvePoint,
1248 ) -> Result<v2::MinaBaseSparseLedgerBaseStableV2, InvalidBigInt> {
1249 let mask = self.snarked_ledgers.get(staking_ledger).unwrap();
1250 let producer_id = ledger::AccountId::new(producer.try_into()?, ledger::TokenId::default());
1251 let delegator_id =
1252 ledger::AccountId::new(delegator.try_into()?, ledger::TokenId::default());
1253 let sparse_ledger = ledger::sparse_ledger::SparseLedger::of_ledger_subset_exn(
1254 mask.clone(),
1255 &[producer_id, delegator_id],
1256 );
1257 Ok((&sparse_ledger).into())
1258 }
1259
1260 pub fn scan_state_summary(
1261 &self,
1262 staged_ledger_hash: &MinaBaseStagedLedgerHashStableV1,
1263 ) -> Result<Vec<Vec<RpcScanStateSummaryScanStateJob>>, String> {
1264 use ledger::scan_state::scan_state::JobValue;
1265
1266 let ledger = self.staged_ledgers.get(staged_ledger_hash);
1267 let Some(ledger) = ledger else {
1268 return Ok(Vec::new());
1269 };
1270 ledger
1271 .scan_state()
1272 .view()
1273 .map(|jobs| {
1274 let jobs = jobs.collect::<Vec<JobValueWithIndex<'_>>>();
1275 let mut iter = jobs.iter().peekable();
1276 let mut res = Vec::with_capacity(jobs.len());
1277
1278 loop {
1279 let Some(job) = iter.next() else { break };
1280
1281 let (stmt, seq_no, job_kind, is_done) = match &job.job {
1282 JobValue::Leaf(JobValueBase::Empty)
1283 | JobValue::Node(JobValueMerge::Empty)
1284 | JobValue::Node(JobValueMerge::Part(_)) => {
1285 res.push(RpcScanStateSummaryScanStateJob::Empty);
1286 continue;
1287 }
1288 JobValue::Leaf(JobValueBase::Full(job)) => {
1289 let stmt = &job.job.statement;
1290 let tx = job.job.transaction_with_info.transaction();
1291 let status = (&tx.status).into();
1292 let tx = MinaTransactionTransactionStableV2::from(&tx.data);
1293 let kind = RpcScanStateSummaryScanStateJobKind::Base(
1294 RpcScanStateSummaryBlockTransaction {
1295 hash: tx.hash().ok(),
1296 kind: (&tx).into(),
1297 status,
1298 },
1299 );
1300 let seq_no = job.seq_no.as_u64();
1301 (stmt.clone(), seq_no, kind, job.state.is_done())
1302 }
1303 JobValue::Node(JobValueMerge::Full(job)) => {
1304 let stmt = job
1305 .left
1306 .proof
1307 .statement()
1308 .merge(&job.right.proof.statement())
1309 .unwrap();
1310 let kind = RpcScanStateSummaryScanStateJobKind::Merge;
1311 let seq_no = job.seq_no.as_u64();
1312 (stmt, seq_no, kind, job.state.is_done())
1313 }
1314 };
1315 let stmt: MinaStateBlockchainStateValueStableV2LedgerProofStatement =
1316 (&stmt).into();
1317 let job_id: SnarkJobId = (&stmt.source, &stmt.target).into();
1318
1319 let bundle =
1320 job.bundle_sibling()
1321 .and_then(|(sibling_index, is_sibling_left)| {
1322 let sibling_job = jobs.get(sibling_index)?;
1323 let sibling_stmt: MinaStateBlockchainStateValueStableV2LedgerProofStatement = match &sibling_job.job {
1324 JobValue::Leaf(JobValueBase::Full(job)) => {
1325 (&job.job.statement).into()
1326 }
1327 JobValue::Node(JobValueMerge::Full(job)) => (&job
1328 .left
1329 .proof
1330 .statement()
1331 .merge(&job.right.proof.statement())
1332 .unwrap()).into(),
1333 _ => return None,
1334 };
1335 let bundle_job_id: SnarkJobId = match is_sibling_left {
1336 false => (&stmt.source, &sibling_stmt.target).into(),
1337 true => (&sibling_stmt.source, &stmt.target).into(),
1338 };
1339 Some((bundle_job_id, is_sibling_left))
1340 });
1341
1342 let bundle_job_id = bundle
1343 .as_ref()
1344 .map_or_else(|| job_id.clone(), |(id, _)| id.clone());
1345
1346 if is_done {
1347 let is_left =
1348 bundle.map_or_else(|| true, |(_, is_sibling_left)| !is_sibling_left);
1349 let parent = job.parent().ok_or_else(|| format!("job(depth: {}, index: {}) has no parent", job.depth(), job.index()))?;
1350 let sok_message: MinaBaseSokMessageStableV1 = {
1351 let job = jobs.get(parent).ok_or_else(|| format!("job(depth: {}, index: {}) parent not found", job.depth(), job.index()))?;
1352 match &job.job {
1353 JobValue::Node(JobValueMerge::Part(job)) if is_left => {
1354 (&job.sok_message).into()
1355 }
1356 JobValue::Node(JobValueMerge::Full(job)) => {
1357 if is_left {
1358 (&job.left.sok_message).into()
1359 } else {
1360 (&job.right.sok_message).into()
1361 }
1362 }
1363 _state => {
1364 res.push(RpcScanStateSummaryScanStateJob::Empty);
1369 continue;
1370 }
1371 }
1372 };
1373 res.push(RpcScanStateSummaryScanStateJob::Done {
1374 job_id,
1375 bundle_job_id,
1376 job: Box::new(job_kind),
1377 seq_no,
1378 snark: Box::new(RpcSnarkPoolJobSnarkWorkDone {
1379 snarker: sok_message.prover,
1380 fee: sok_message.fee,
1381 }),
1382 });
1383 } else {
1384 res.push(RpcScanStateSummaryScanStateJob::Todo {
1385 job_id,
1386 bundle_job_id,
1387 job: job_kind,
1388 seq_no,
1389 });
1390 }
1391 }
1392 Ok(res)
1393 })
1394 .collect()
1395 }
1396}
1397
1398impl LedgerSyncState {
1399 fn mask(&self, hash: &LedgerHash) -> Option<(Mask, bool)> {
1400 self.snarked_ledgers
1401 .get(hash)
1402 .cloned()
1403 .map(|mask| (mask, false))
1404 .or_else(|| Some((self.staged_ledgers.get_mask(hash)?, true)))
1405 }
1406
1407 fn pending_sync_snarked_ledger_mask(&self, hash: &LedgerHash) -> Result<Mask, String> {
1408 self.snarked_ledgers
1409 .get(hash)
1410 .cloned()
1411 .ok_or_else(|| format!("Missing sync snarked ledger {}", hash))
1412 }
1413
1414 fn snarked_ledger_mut(&mut self, hash: LedgerHash) -> Result<&mut Mask, InvalidBigInt> {
1417 let hash_fp = hash.to_field()?;
1418 Ok(self.snarked_ledgers.entry(hash.clone()).or_insert_with(|| {
1419 let mut ledger = Mask::create(LEDGER_DEPTH);
1420 ledger.set_cached_hash_unchecked(&LedgerAddress::root(), hash_fp);
1421 ledger
1422 }))
1423 }
1424
1425 fn staged_ledger_mut(
1426 &mut self,
1427 hash: &MinaBaseStagedLedgerHashStableV1,
1428 ) -> Option<&mut StagedLedger> {
1429 self.staged_ledgers.get_mut(hash)
1430 }
1431}
1432
1433fn staged_ledger_reconstruct(
1434 snarked_ledger: Mask,
1435 snarked_ledger_hash: LedgerHash,
1436 parts: Option<Arc<StagedLedgerAuxAndPendingCoinbasesValid>>,
1437) -> Result<(v2::LedgerHash, Result<StagedLedger, String>), InvalidBigInt> {
1438 let staged_ledger_hash = parts
1439 .as_ref()
1440 .map(|p| p.staged_ledger_hash.clone())
1441 .unwrap_or_else(|| snarked_ledger_hash.clone());
1442
1443 let ledger = snarked_ledger.make_child();
1444
1445 let mut result = if let Some(parts) = &parts {
1446 let states = parts
1447 .needed_blocks
1448 .iter()
1449 .map(|state| Ok((state.try_hash()?.to_field()?, state.clone())))
1450 .collect::<Result<BTreeMap<Fp, _>, _>>()?;
1451
1452 StagedLedger::of_scan_state_pending_coinbases_and_snarked_ledger(
1453 (),
1454 constraint_constants(),
1455 Verifier,
1456 (&parts.scan_state).try_into()?,
1457 ledger,
1458 LocalState::empty(),
1459 parts.staged_ledger_hash.to_field()?,
1460 (&parts.pending_coinbase).try_into()?,
1461 |key| states.get(&key).cloned().unwrap(),
1462 )
1463 } else {
1464 StagedLedger::create_exn(constraint_constants().clone(), ledger)
1465 };
1466
1467 match result.as_mut() {
1468 Ok(staged_ledger) => {
1469 staged_ledger.commit_and_reparent_to_root();
1470 }
1471 Err(_) => {
1472 if let Err(e) = dump_reconstruct_to_file(&snarked_ledger, &parts) {
1473 openmina_core::error!(
1474 openmina_core::log::system_time();
1475 kind = "LedgerService::dump - Failed reconstruct",
1476 summary = format!("Failed to save reconstruction to file: {e:?}")
1477 );
1478 }
1479 }
1480 }
1481
1482 Ok((staged_ledger_hash, result))
1483}
1484
1485pub trait LedgerService: redux::Service {
1486 fn ledger_manager(&self) -> &LedgerManager;
1487 fn force_sync_calls(&self) -> bool {
1488 false
1489 }
1490
1491 fn write_init(&mut self, request: LedgerWriteRequest) {
1492 let request = LedgerRequest::Write(request);
1493 if self.force_sync_calls() {
1494 let _ = self.ledger_manager().call_sync(request);
1495 } else {
1496 self.ledger_manager().call(request);
1497 }
1498 }
1499
1500 fn read_init(&mut self, id: LedgerReadId, request: LedgerReadRequest) {
1501 let request = LedgerRequest::Read(id, request);
1502 if self.force_sync_calls() {
1503 let _ = self.ledger_manager().call_sync(request);
1504 } else {
1505 self.ledger_manager().call(request);
1506 }
1507 }
1508}
1509
1510fn dump_reconstruct_to_file(
1513 snarked_ledger: &Mask,
1514 parts: &Option<Arc<StagedLedgerAuxAndPendingCoinbasesValid>>,
1515) -> std::io::Result<()> {
1516 use mina_p2p_messages::binprot::{
1517 self,
1518 macros::{BinProtRead, BinProtWrite},
1519 };
1520
1521 #[derive(BinProtRead, BinProtWrite)]
1522 struct ReconstructContext {
1523 accounts: Vec<v2::MinaBaseAccountBinableArgStableV2>,
1524 scan_state: v2::TransactionSnarkScanStateStableV2,
1525 pending_coinbase: v2::MinaBasePendingCoinbaseStableV2,
1526 staged_ledger_hash: LedgerHash,
1527 states: List<v2::MinaStateProtocolStateValueStableV2>,
1528 }
1529
1530 let Some(parts) = parts else {
1531 return Err(std::io::ErrorKind::Other.into());
1532 };
1533
1534 let StagedLedgerAuxAndPendingCoinbasesValid {
1535 scan_state,
1536 staged_ledger_hash,
1537 pending_coinbase,
1538 needed_blocks,
1539 } = &**parts;
1540
1541 let reconstruct_context = ReconstructContext {
1542 accounts: snarked_ledger
1543 .to_list()
1544 .iter()
1545 .map(v2::MinaBaseAccountBinableArgStableV2::from)
1546 .collect(),
1547 scan_state: scan_state.clone(),
1548 pending_coinbase: pending_coinbase.clone(),
1549 staged_ledger_hash: staged_ledger_hash.clone(),
1550 states: needed_blocks.clone(),
1551 };
1552
1553 let debug_dir = openmina_core::get_debug_dir();
1554 let filename = debug_dir
1555 .join("failed_reconstruct_ctx.binprot")
1556 .to_string_lossy()
1557 .to_string();
1558 std::fs::create_dir_all(&debug_dir)?;
1559
1560 use mina_p2p_messages::binprot::BinProtWrite;
1561 let mut file = std::fs::File::create(&filename)?;
1562 reconstruct_context.binprot_write(&mut file)?;
1563 file.sync_all()?;
1564
1565 openmina_core::info!(
1566 openmina_core::log::system_time();
1567 kind = "LedgerService::dump - Failed reconstruct",
1568 summary = format!("Reconstruction saved to: {filename:?}")
1569 );
1570
1571 Ok(())
1572}
1573
1574fn dump_application_to_file(
1579 staged_ledger: &StagedLedger,
1580 block: ArcBlockWithHash,
1581 pred_block: AppliedBlock,
1582) -> std::io::Result<String> {
1583 use mina_p2p_messages::binprot::{
1584 self,
1585 macros::{BinProtRead, BinProtWrite},
1586 };
1587
1588 #[derive(BinProtRead, BinProtWrite)]
1589 struct ApplyContext {
1590 accounts: Vec<v2::MinaBaseAccountBinableArgStableV2>,
1591 scan_state: v2::TransactionSnarkScanStateStableV2,
1592 pending_coinbase: v2::MinaBasePendingCoinbaseStableV2,
1593 pred_block: v2::MinaBlockBlockStableV2,
1594 blocks: Vec<v2::MinaBlockBlockStableV2>,
1595 }
1596
1597 let cs = &block.block.header.protocol_state.body.consensus_state;
1598 let block_height = cs.blockchain_length.as_u32();
1599
1600 let apply_context = ApplyContext {
1601 accounts: staged_ledger
1602 .ledger()
1603 .to_list()
1604 .iter()
1605 .map(v2::MinaBaseAccountBinableArgStableV2::from)
1606 .collect::<Vec<_>>(),
1607 scan_state: staged_ledger.scan_state().into(),
1608 pending_coinbase: staged_ledger.pending_coinbase_collection().into(),
1609 pred_block: (**pred_block.block()).clone(),
1610 blocks: vec![(*block.block).clone()],
1611 };
1612
1613 let debug_dir = openmina_core::get_debug_dir();
1614 let filename = debug_dir
1615 .join(format!("failed_application_ctx_{}.binprot", block_height))
1616 .to_string_lossy()
1617 .to_string();
1618 std::fs::create_dir_all(&debug_dir)?;
1619
1620 let mut file = std::fs::File::create(&filename)?;
1621
1622 use mina_p2p_messages::binprot::BinProtWrite;
1623 apply_context.binprot_write(&mut file)?;
1624 file.sync_all()?;
1625
1626 Ok(filename)
1627}
1628
1629#[cfg(test)]
1630mod tests {
1631 use mina_p2p_messages::v2::MinaBaseLedgerHash0StableV1;
1632
1633 use crate::ledger::hash_node_at_depth;
1634
1635 use super::*;
1636
1637 #[test]
1638 fn test_ledger_hash() {
1639 IntoIterator::into_iter([(
1640 LedgerAddress::root(),
1641 "jx5YAT36bv62M8mPcREYYfZWXaKqqMzDCP8wmc21uf4CfDKAHCr",
1642 "jxo5pSyt16XGwA9UeuAdiFDzrwFH3smbNTJF7fxq98w1y9Jem2m",
1643 "jwq3nCDr8XejL8HKDxR5qVhFJbKoUTGZgtLBZCp3MrqLTnqmjdP",
1644 )])
1645 .map(|(addr, expected_hash, left, right)| {
1646 let left: LedgerHash = left.parse().unwrap();
1647 let right: LedgerHash = right.parse().unwrap();
1648 (addr, expected_hash, left, right)
1649 })
1650 .for_each(|(address, expected_hash, left, right)| {
1651 let hash = hash_node_at_depth(
1652 address.length(),
1653 left.0.to_field().unwrap(),
1654 right.0.to_field().unwrap(),
1655 );
1656 let hash: LedgerHash = MinaBaseLedgerHash0StableV1(hash.into()).into();
1657 assert_eq!(hash.to_string(), expected_hash);
1658 });
1659 }
1660}