1use super::{
2 ledger_empty_hash_at_depth,
3 read::{LedgerReadId, LedgerReadRequest, LedgerReadResponse},
4 write::{CommitResult, LedgerWriteRequest, LedgerWriteResponse, LedgersToKeep},
5 LedgerAddress, LedgerEvent, LEDGER_DEPTH,
6};
7use crate::{
8 account::AccountPublicKey,
9 block_producer_effectful::StagedLedgerDiffCreateOutput,
10 ledger::{
11 ledger_manager::{LedgerManager, LedgerRequest},
12 write::{BlockApplyResult, BlockApplyResultArchive},
13 },
14 p2p::channels::rpc::StagedLedgerAuxAndPendingCoinbases,
15 rpc::{
16 RpcScanStateSummaryBlockTransaction, RpcScanStateSummaryScanStateJob,
17 RpcScanStateSummaryScanStateJobKind, RpcSnarkPoolJobSnarkWorkDone,
18 },
19 transition_frontier::{
20 genesis::empty_pending_coinbase_hash,
21 sync::{
22 ledger::staged::StagedLedgerAuxAndPendingCoinbasesValid,
23 TransitionFrontierRootSnarkedLedgerUpdates,
24 },
25 },
26};
27use ledger::{
28 scan_state::{
29 currency::Slot,
30 scan_state::{AvailableJobMessage, JobValueBase, JobValueMerge, JobValueWithIndex, Pass},
31 transaction_logic::{
32 local_state::LocalState,
33 protocol_state::{protocol_state_view, ProtocolStateView},
34 transaction_partially_applied::TransactionPartiallyApplied,
35 valid,
36 zkapp_command::AccessedOrNot,
37 Transaction, TransactionStatus, UserCommand,
38 },
39 },
40 sparse_ledger::SparseLedger,
41 staged_ledger::{
42 diff::Diff,
43 staged_ledger::{SkipVerification, StagedLedger},
44 validate_block::block_body_hash,
45 },
46 verifier::Verifier,
47 Account, AccountId, AccountIndex, BaseLedger, Database, Mask, TokenId, UnregisterBehavior,
48};
49use mina_core::{
50 block::{AppliedBlock, ArcBlockWithHash},
51 bug_condition,
52 constants::constraint_constants,
53 snark::{Snark, SnarkJobId},
54 thread,
55};
56use mina_curves::pasta::Fp;
57use mina_p2p_messages::{
58 bigint::InvalidBigInt,
59 binprot::BinProtRead,
60 list::List,
61 v2::{
62 self, DataHashLibStateHashStableV1, LedgerHash, MinaBaseLedgerHash0StableV1,
63 MinaBasePendingCoinbaseStableV2, MinaBasePendingCoinbaseWitnessStableV2,
64 MinaBaseSokMessageStableV1, MinaBaseStagedLedgerHashStableV1,
65 MinaStateBlockchainStateValueStableV2LedgerProofStatement,
66 MinaStateProtocolStateValueStableV2, MinaTransactionTransactionStableV2, NonZeroCurvePoint,
67 StateHash,
68 },
69};
70use mina_signer::CompressedPubKey;
71use std::{
72 collections::{BTreeMap, BTreeSet},
73 path::Path,
74 sync::Arc,
75};
76
77fn merkle_root(mask: &mut Mask) -> LedgerHash {
78 MinaBaseLedgerHash0StableV1(mask.merkle_root().into()).into()
79}
80
81fn error_to_string(e: InvalidBigInt) -> String {
82 format!("{:?}", e)
83}
84
85#[derive(Default)]
87struct StagedLedgersStorage {
88 by_merkle_root_hash: BTreeMap<LedgerHash, Vec<Arc<MinaBaseStagedLedgerHashStableV1>>>,
90 staged_ledgers: BTreeMap<Arc<MinaBaseStagedLedgerHashStableV1>, StagedLedger>,
91}
92
93impl StagedLedgersStorage {
94 fn insert_by_recomputing_hash(&mut self, mut staged_ledger: StagedLedger) {
97 let staged_ledger_hash: MinaBaseStagedLedgerHashStableV1 = (&staged_ledger.hash()).into();
98 self.insert(Arc::new(staged_ledger_hash), staged_ledger);
99 }
100
101 fn insert(
102 &mut self,
103 staged_ledger_hash: Arc<MinaBaseStagedLedgerHashStableV1>,
104 staged_ledger: StagedLedger,
105 ) {
106 let merkle_root_hash: LedgerHash = merkle_root(&mut staged_ledger.ledger());
107 self.by_merkle_root_hash
108 .entry(merkle_root_hash.clone())
109 .or_insert_with(|| Vec::with_capacity(1))
110 .extend([staged_ledger_hash.clone()]);
111 self.staged_ledgers
112 .insert(staged_ledger_hash, staged_ledger);
113 }
114
115 fn get_mask(&self, root_hash: &LedgerHash) -> Option<Mask> {
116 let staged_ledger_hashes: &Vec<_> = self.by_merkle_root_hash.get(root_hash)?;
117 self.staged_ledgers
120 .get(staged_ledger_hashes.first()?)
121 .map(|staged_ledger| staged_ledger.ledger())
122 }
123
124 fn get(&self, staged_ledger_hash: &MinaBaseStagedLedgerHashStableV1) -> Option<&StagedLedger> {
125 self.staged_ledgers.get(staged_ledger_hash)
126 }
127
128 fn get_mut(
129 &mut self,
130 staged_ledger_hash: &MinaBaseStagedLedgerHashStableV1,
131 ) -> Option<&mut StagedLedger> {
132 self.staged_ledgers.get_mut(staged_ledger_hash)
133 }
134
135 fn retain<F>(&mut self, fun: F)
136 where
137 F: Fn(&MinaBaseStagedLedgerHashStableV1) -> bool,
138 {
139 self.by_merkle_root_hash.retain(|_, staged_ledger_hashes| {
140 staged_ledger_hashes.retain(|hash| {
141 if fun(hash) {
142 return true;
143 }
144 self.staged_ledgers.remove(hash);
145 false
146 });
147 !staged_ledger_hashes.is_empty()
148 });
149 }
150
151 fn extend<I>(&mut self, iterator: I)
152 where
153 I: IntoIterator<Item = (Arc<MinaBaseStagedLedgerHashStableV1>, StagedLedger)>,
154 {
155 for (hash, staged_ledger) in iterator.into_iter() {
156 self.insert(hash, staged_ledger);
157 }
158 }
159
160 fn take(&mut self) -> BTreeMap<Arc<MinaBaseStagedLedgerHashStableV1>, StagedLedger> {
161 let Self {
162 by_merkle_root_hash,
163 staged_ledgers,
164 } = self;
165
166 let _ = std::mem::take(by_merkle_root_hash);
167 std::mem::take(staged_ledgers)
168 }
169}
170
171#[derive(Default)]
172pub struct LedgerCtx {
173 snarked_ledgers: BTreeMap<LedgerHash, Mask>,
174 additional_snarked_ledgers: BTreeMap<LedgerHash, Mask>,
176 staged_ledgers: StagedLedgersStorage,
177 sync: LedgerSyncState,
178 archive_mode: bool,
180 event_sender: Option<mina_core::channels::mpsc::UnboundedSender<crate::event_source::Event>>,
181}
182
183#[derive(Default)]
184struct LedgerSyncState {
185 snarked_ledgers: BTreeMap<LedgerHash, Mask>,
186 staged_ledgers: StagedLedgersStorage,
187}
188
189impl LedgerCtx {
190 pub fn new_with_additional_snarked_ledgers<P>(path: P) -> Self
191 where
192 P: AsRef<Path>,
193 {
194 use std::fs;
195
196 let Ok(dir) = fs::read_dir(path) else {
197 return Self::default();
198 };
199
200 let additional_snarked_ledgers = dir
201 .filter_map(|entry| {
202 let entry = entry.ok()?;
203 let hash = entry.file_name().to_str()?.parse().ok()?;
204 let mut file = fs::File::open(entry.path()).ok()?;
205
206 let _ = Option::<LedgerHash>::binprot_read(&mut file).ok()?;
207
208 let accounts = Vec::<Account>::binprot_read(&mut file).ok()?;
209 let mut mask = Mask::new_root(Database::create(35));
210 for account in accounts {
211 let account_id = account.id();
212 mask.get_or_create_account(account_id, account).unwrap();
213 }
214 Some((hash, mask))
215 })
216 .collect();
217
218 LedgerCtx {
219 additional_snarked_ledgers,
220 ..Default::default()
221 }
222 }
223
224 pub fn set_archive_mode(&mut self) {
225 self.archive_mode = true;
226 }
227
228 pub fn set_event_sender(
231 &mut self,
232 event_sender: mina_core::channels::mpsc::UnboundedSender<crate::event_source::Event>,
233 ) {
234 self.event_sender = Some(event_sender);
235 }
236
237 pub(super) fn send_event(&self, event: LedgerEvent) {
238 if let Some(tx) = self.event_sender.as_ref() {
239 let _ = tx.send(event.into());
240 }
241 }
242
243 pub(super) fn send_write_response(&self, resp: LedgerWriteResponse) {
244 self.send_event(LedgerEvent::Write(resp))
245 }
246
247 pub(super) fn send_read_response(&self, id: LedgerReadId, resp: LedgerReadResponse) {
248 self.send_event(LedgerEvent::Read(id, resp))
249 }
250
251 pub fn insert_genesis_ledger(&mut self, mut mask: Mask) {
252 let merkle_root_hash = merkle_root(&mut mask);
253 let staged_ledger =
254 StagedLedger::create_exn(constraint_constants().clone(), mask.copy()).unwrap();
255 self.snarked_ledgers.insert(merkle_root_hash.clone(), mask);
256 let staged_ledger_hash =
258 MinaBaseStagedLedgerHashStableV1::zero(merkle_root_hash, empty_pending_coinbase_hash());
259 self.staged_ledgers
260 .insert(Arc::new(staged_ledger_hash), staged_ledger);
261 }
262
263 pub fn staged_ledger_reconstruct_result_store(&mut self, ledger: StagedLedger) {
264 self.staged_ledgers.insert_by_recomputing_hash(ledger);
265 }
266
267 pub fn get_accounts_for_rpc(
269 &self,
270 ledger_hash: LedgerHash,
271 requested_public_key: Option<AccountPublicKey>,
272 ) -> Vec<Account> {
273 if let Some((mask, _)) = self.mask(&ledger_hash) {
274 let mut accounts = Vec::new();
275 let mut single_account = Vec::new();
276
277 mask.iter(|account| {
278 accounts.push(account.clone());
279 if let Some(public_key) = requested_public_key.as_ref() {
280 if public_key == &AccountPublicKey::from(account.public_key.clone()) {
281 single_account.push(account.clone());
282 }
283 }
284 });
285
286 if requested_public_key.is_some() {
287 single_account
288 } else {
289 accounts
290 }
291 } else {
292 vec![]
293 }
294 }
295
296 pub fn mask(&self, hash: &LedgerHash) -> Option<(Mask, bool)> {
299 self.snarked_ledgers
300 .get(hash)
301 .cloned()
302 .map(|mask| (mask, true))
303 .or_else(|| Some((self.staged_ledgers.get_mask(hash)?, true)))
304 .or_else(|| self.sync.mask(hash))
305 .or_else(|| {
306 self.additional_snarked_ledgers
307 .get(hash)
308 .map(|l| (l.clone(), true))
309 })
310 }
311
312 pub fn contains_snarked_ledger(&self, hash: &LedgerHash) -> bool {
313 self.snarked_ledgers.contains_key(hash)
314 }
315
316 pub fn pending_sync_snarked_ledger_mask(&self, hash: &LedgerHash) -> Result<Mask, String> {
318 self.sync.pending_sync_snarked_ledger_mask(hash)
319 }
320
321 pub fn copy_snarked_ledger_contents_for_sync(
324 &mut self,
325 origin_snarked_ledger_hash: LedgerHash,
326 target_snarked_ledger_hash: LedgerHash,
327 overwrite: bool,
328 ) -> Result<bool, String> {
329 if !overwrite
330 && self
331 .sync
332 .snarked_ledgers
333 .contains_key(&target_snarked_ledger_hash)
334 {
335 return Ok(false);
336 }
337
338 let origin = self
339 .snarked_ledgers
340 .get(&origin_snarked_ledger_hash)
341 .or_else(|| {
342 self.sync.snarked_ledgers.get(&origin_snarked_ledger_hash)
345 })
346 .ok_or(format!(
347 "Tried to copy from non-existing snarked ledger with hash: {}",
348 origin_snarked_ledger_hash
349 ))?;
350
351 let target = origin.copy();
352 self.sync
353 .snarked_ledgers
354 .insert(target_snarked_ledger_hash, target);
355
356 Ok(true)
357 }
358
359 pub fn compute_snarked_ledger_hashes(
360 &mut self,
361 snarked_ledger_hash: &LedgerHash,
362 ) -> Result<(), String> {
363 let origin = self
364 .snarked_ledgers
365 .get_mut(snarked_ledger_hash)
366 .or_else(|| self.sync.snarked_ledgers.get_mut(snarked_ledger_hash))
367 .ok_or(format!(
368 "Cannot hash non-existing snarked ledger: {}",
369 snarked_ledger_hash
370 ))?;
371
372 let _force_hashing = origin.merkle_root();
375
376 Ok(())
377 }
378
379 fn staged_ledger_mut(
381 &mut self,
382 hash: &MinaBaseStagedLedgerHashStableV1,
383 ) -> Option<&mut StagedLedger> {
384 match self.staged_ledgers.get_mut(hash) {
385 Some(v) => Some(v),
386 None => self.sync.staged_ledger_mut(hash),
387 }
388 }
389
390 fn recreate_snarked_ledger(
391 &mut self,
392 root_snarked_ledger_updates: &TransitionFrontierRootSnarkedLedgerUpdates,
393 needed_protocol_states: &BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
394 snarked_ledger_hash: &LedgerHash,
395 ) -> Result<(), String> {
396 let Some(update) = root_snarked_ledger_updates.get(snarked_ledger_hash) else {
397 return Ok(());
398 };
399 self.recreate_snarked_ledger(
400 root_snarked_ledger_updates,
401 needed_protocol_states,
402 &update.parent,
403 )?;
404
405 self.push_snarked_ledger(
406 needed_protocol_states,
407 &update.parent,
408 snarked_ledger_hash,
409 &update.staged_ledger_hash,
410 )
411 }
412
413 fn push_snarked_ledger(
414 &mut self,
415 protocol_states: &BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
416 old_root_snarked_ledger_hash: &LedgerHash,
417 new_root_snarked_ledger_hash: &LedgerHash,
418 new_root_staged_ledger_hash: &MinaBaseStagedLedgerHashStableV1,
419 ) -> Result<(), String> {
420 mina_core::debug!(mina_core::log::system_time();
421 kind = "LedgerService::push_snarked_ledger",
422 summary = format!("{old_root_snarked_ledger_hash} -> {new_root_snarked_ledger_hash}"));
423 let constraint_constants = &constraint_constants();
425
426 let root_snarked_ledger = self
428 .snarked_ledgers
429 .get(old_root_snarked_ledger_hash)
430 .or_else(|| self.sync.snarked_ledgers.get(old_root_snarked_ledger_hash))
431 .ok_or_else(|| {
432 format!(
433 "push_snarked_ledger: could not find old root snarked ledger: {}",
434 old_root_snarked_ledger_hash,
435 )
436 })?;
437 let mut mt = root_snarked_ledger.make_child();
438
439 let apply_first_pass = |global_slot: Slot,
441 txn_state_view: &ProtocolStateView,
442 ledger: &mut Mask,
443 transaction: &Transaction| {
444 ledger::scan_state::transaction_logic::apply_transaction_first_pass(
445 constraint_constants,
446 global_slot,
447 txn_state_view,
448 ledger,
449 transaction,
450 )
451 };
452
453 let apply_second_pass = |ledger: &mut Mask, tx: TransactionPartiallyApplied<Mask>| {
454 ledger::scan_state::transaction_logic::apply_transaction_second_pass(
455 constraint_constants,
456 ledger,
457 tx,
458 )
459 };
460
461 let apply_first_pass_sparse_ledger =
462 |global_slot: Slot,
463 txn_state_view: &ProtocolStateView,
464 sparse_ledger: &mut SparseLedger,
465 transaction: &Transaction| {
466 ledger::scan_state::transaction_logic::apply_transaction_first_pass(
467 constraint_constants,
468 global_slot,
469 txn_state_view,
470 sparse_ledger,
471 transaction,
472 )
473 };
474
475 let get_protocol_state = |state_hash: Fp| {
476 let state_hash = StateHash::from_fp(state_hash);
477 if let Some(s) = protocol_states.get(&state_hash) {
478 Ok(s.clone())
479 } else {
480 Err(format!(
481 "Failed to find protocol state for state hash: {}",
482 state_hash
483 ))
484 }
485 };
486
487 let scan_state = self
488 .staged_ledger_mut(new_root_staged_ledger_hash)
489 .ok_or_else(|| {
490 format!(
491 "Failed to find staged ledger with hash: {:#?}",
492 new_root_staged_ledger_hash
493 )
494 })?
495 .scan_state();
496
497 let Pass::FirstPassLedgerHash(_first_pass_ledger_target) = scan_state
498 .get_snarked_ledger_sync(
499 &mut mt,
500 get_protocol_state,
501 apply_first_pass,
502 apply_second_pass,
503 apply_first_pass_sparse_ledger,
504 )?;
505
506 let expected_hash = new_root_snarked_ledger_hash;
508 let obtained_hash = LedgerHash::from_fp(mt.merkle_root());
509
510 if expected_hash != &obtained_hash {
511 return Err(format!(
512 "Expected to obtain snarked root ledger hash {} but got {}",
513 expected_hash, obtained_hash
514 ));
515 }
516
517 self.sync
518 .snarked_ledgers
519 .insert(new_root_snarked_ledger_hash.clone(), mt);
520
521 Ok(())
522 }
523
524 pub fn get_account_delegators(
525 &self,
526 ledger_hash: &LedgerHash,
527 account_id: &AccountId,
528 ) -> Option<Vec<Account>> {
529 let (mask, _) = self.mask(ledger_hash)?;
530 let mut accounts = Vec::new();
531
532 mask.iter(|account| {
533 if account.delegate == Some(account_id.public_key.clone()) {
534 accounts.push(account.clone());
535 }
536 });
537
538 Some(accounts)
539 }
540
541 #[allow(clippy::type_complexity)]
542 pub fn producers_with_delegates<F: FnMut(&CompressedPubKey) -> bool>(
543 &self,
544 ledger_hash: &LedgerHash,
545 mut filter: F,
546 ) -> Option<BTreeMap<AccountPublicKey, Vec<(ledger::AccountIndex, AccountPublicKey, u64)>>>
547 {
548 let (mask, _) = self.mask(ledger_hash)?;
549 let mut accounts = Vec::new();
550
551 mask.iter(|account| {
552 if filter(account.delegate.as_ref().unwrap_or(&account.public_key)) {
553 accounts.push((
554 account.id(),
555 account.delegate.clone(),
556 account.balance.as_u64(),
557 ))
558 }
559 });
560
561 let producers = accounts.into_iter().fold(
562 BTreeMap::<_, Vec<_>>::new(),
563 |mut producers, (id, delegate, balance)| {
564 let index = mask.index_of_account(id.clone()).unwrap();
565 let pub_key = AccountPublicKey::from(id.public_key);
566 let producer = delegate.map(Into::into).unwrap_or(pub_key.clone());
567 producers
568 .entry(producer)
569 .or_default()
570 .push((index, pub_key, balance));
571 producers
572 },
573 );
574 Some(producers)
575 }
576
577 pub fn child_hashes_get(
578 &mut self,
579 snarked_ledger_hash: LedgerHash,
580 parent: &LedgerAddress,
581 ) -> Result<(LedgerHash, LedgerHash), String> {
582 let mut mask = self.pending_sync_snarked_ledger_mask(&snarked_ledger_hash)?;
583 let left_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.child_left())?);
584 let right_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.child_right())?);
585
586 Ok((left_hash, right_hash))
587 }
588
589 pub fn accounts_set(
590 &mut self,
591 snarked_ledger_hash: LedgerHash,
592 parent: &LedgerAddress,
593 accounts: Vec<v2::MinaBaseAccountBinableArgStableV2>,
594 ) -> Result<LedgerHash, String> {
595 let mut mask = self.pending_sync_snarked_ledger_mask(&snarked_ledger_hash)?;
596 let accounts: Vec<_> = accounts
597 .into_iter()
598 .map(|account| Ok(Box::new((&account).try_into()?)))
599 .collect::<Result<Vec<_>, InvalidBigInt>>()
600 .map_err(error_to_string)?;
601
602 mask.set_all_accounts_rooted_at(parent.clone(), &accounts)
603 .map_err(|_| "Failed when setting accounts".to_owned())?;
604
605 let computed_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.clone())?);
606
607 Ok(computed_hash)
608 }
609
610 pub fn staged_ledger_reconstruct<F>(
611 &mut self,
612 snarked_ledger_hash: LedgerHash,
613 parts: Option<Arc<StagedLedgerAuxAndPendingCoinbasesValid>>,
614 callback: F,
615 ) -> Result<(), InvalidBigInt>
616 where
617 F: 'static + FnOnce(v2::LedgerHash, Result<StagedLedger, String>) + Send,
618 {
619 let snarked_ledger = self
620 .sync
621 .snarked_ledger_mut(snarked_ledger_hash.clone())?
622 .copy();
623
624 thread::Builder::new()
625 .name("staged-ledger-reconstruct".into())
626 .spawn(move || {
627 match staged_ledger_reconstruct(snarked_ledger, snarked_ledger_hash, parts) {
628 Ok((staged_ledger_hash, result)) => {
629 callback(staged_ledger_hash, result);
630 }
631 Err(e) => callback(v2::LedgerHash::zero(), Err(format!("{:?}", e))),
632 }
633 })
634 .expect("Failed: staged ledger reconstruct thread");
635
636 Ok(())
637 }
638
639 pub fn staged_ledger_reconstruct_sync(
640 &mut self,
641 snarked_ledger_hash: LedgerHash,
642 parts: Option<Arc<StagedLedgerAuxAndPendingCoinbasesValid>>,
643 ) -> Result<(v2::LedgerHash, Result<(), String>), InvalidBigInt> {
644 let snarked_ledger = self
645 .sync
646 .snarked_ledger_mut(snarked_ledger_hash.clone())?
647 .copy();
648 let (staged_ledger_hash, result) =
649 staged_ledger_reconstruct(snarked_ledger, snarked_ledger_hash, parts)?;
650 let result = match result {
651 Err(err) => Err(err),
652 Ok(staged_ledger) => {
653 self.staged_ledger_reconstruct_result_store(staged_ledger);
654 Ok(())
655 }
656 };
657
658 Ok((staged_ledger_hash, result))
659 }
660
661 pub fn block_apply(
662 &mut self,
663 block: ArcBlockWithHash,
664 pred_block: AppliedBlock,
665 skip_verification: Option<SkipVerification>,
666 ) -> Result<BlockApplyResult, String> {
667 mina_core::info!(mina_core::log::system_time();
668 kind = "LedgerService::block_apply",
669 summary = format!("{}, {} <- {}", block.height(), block.hash(), block.pred_hash()),
670 pred_staged_ledger_hash = pred_block.merkle_root_hash().to_string(),
671 staged_ledger_hash = block.merkle_root_hash().to_string(),
672 );
673 let mut staged_ledger = self
674 .staged_ledger_mut(pred_block.staged_ledger_hashes())
675 .ok_or_else(|| {
676 format!(
677 "parent staged ledger missing: {:#?}",
678 pred_block.staged_ledger_hashes()
679 )
680 })?
681 .clone();
682
683 let global_slot = block.global_slot_since_genesis();
684 let prev_protocol_state = &pred_block.header().protocol_state;
685 let prev_state_view = protocol_state_view(prev_protocol_state).map_err(error_to_string)?;
686
687 let consensus_state = &block.header().protocol_state.body.consensus_state;
688 let coinbase_receiver: CompressedPubKey = (&consensus_state.coinbase_receiver)
689 .try_into()
690 .map_err(error_to_string)?;
691 let supercharge_coinbase = consensus_state.supercharge_coinbase;
692
693 let diff: Diff = (&block.body().staged_ledger_diff)
694 .try_into()
695 .map_err(error_to_string)?;
696
697 let prev_protocol_state: ledger::proofs::block::ProtocolState =
698 prev_protocol_state.try_into()?;
699
700 let result = staged_ledger
701 .apply(
702 skip_verification,
703 constraint_constants(),
704 Slot::from_u32(global_slot),
705 diff,
706 (),
707 &Verifier,
708 &prev_state_view,
709 prev_protocol_state.hashes(),
710 coinbase_receiver.clone(),
711 supercharge_coinbase,
712 )
713 .map_err(|err| format!("{err:?}"))?;
714 let just_emitted_a_proof = result.ledger_proof.is_some();
715 let ledger_hashes = MinaBaseStagedLedgerHashStableV1::from(&result.hash_after_applying);
716
717 let expected_ledger_hashes = block.staged_ledger_hashes();
719 if &ledger_hashes != expected_ledger_hashes {
720 let staged_ledger = self
721 .staged_ledger_mut(pred_block.staged_ledger_hashes())
722 .unwrap(); match dump_application_to_file(staged_ledger, block.clone(), pred_block) {
725 Ok(filename) => mina_core::info!(
726 mina_core::log::system_time();
727 kind = "LedgerService::dump - Failed application",
728 summary = format!("StagedLedger and block saved to: {filename:?}")
729 ),
730 Err(e) => mina_core::error!(
731 mina_core::log::system_time();
732 kind = "LedgerService::dump - Failed application",
733 summary = format!("Failed to save block application to file: {e:?}")
734 ),
735 }
736
737 panic!("staged ledger hash mismatch. found: {ledger_hashes:#?}, expected: {expected_ledger_hashes:#?}");
738 }
739
740 let archive_data = if self.archive_mode {
741 let senders = block
742 .body()
743 .transactions()
744 .filter_map(|tx| UserCommand::try_from(tx).ok().map(|cmd| cmd.fee_payer()))
745 .collect::<BTreeSet<_>>()
746 .into_iter();
747
748 let coinbase_receiver_id = AccountId::new(coinbase_receiver, TokenId::default());
749
750 let (accessed, not_accessed): (BTreeSet<_>, BTreeSet<_>) = block
752 .body()
753 .tranasctions_with_status()
754 .flat_map(|(tx, status)| {
755 let status: TransactionStatus = status.into();
756 UserCommand::try_from(tx)
757 .ok()
758 .map(|cmd| cmd.account_access_statuses(&status))
759 .into_iter()
760 .flatten()
761 })
762 .partition(|(_, status)| *status == AccessedOrNot::Accessed);
763
764 let mut account_ids_accessed: BTreeSet<_> =
765 accessed.into_iter().map(|(id, _)| id).collect();
766 let mut account_ids_not_accessed: BTreeSet<_> =
767 not_accessed.into_iter().map(|(id, _)| id).collect();
768
769 let has_coinbase = block.body().has_coinbase();
775
776 if has_coinbase {
777 account_ids_accessed.insert(coinbase_receiver_id);
778 } else {
779 account_ids_not_accessed.insert(coinbase_receiver_id);
780 }
781
782 let fee_transfer_accounts =
784 block.body().coinbase_fee_transfers_iter().filter_map(|cb| {
785 let receiver: CompressedPubKey = cb.receiver_pk.inner().try_into().ok()?;
786 let account_id = AccountId::new(receiver, TokenId::default());
787 Some(account_id)
788 });
789 account_ids_accessed.extend(fee_transfer_accounts);
790
791 let accounts_accessed: Vec<(AccountIndex, Account)> = account_ids_accessed
793 .iter()
794 .filter_map(|id| {
795 staged_ledger
796 .ledger()
797 .index_of_account(id.clone())
798 .and_then(|index| {
799 staged_ledger
800 .ledger()
801 .get_at_index(index)
802 .map(|account| (index, *account))
803 })
804 })
805 .collect();
806
807 let account_creation_fee = constraint_constants().account_creation_fee;
808
809 let accounts_created: Vec<(AccountId, u64)> = staged_ledger
811 .latest_block_accounts_created(pred_block.hash().to_field()?)
812 .iter()
813 .map(|id| (id.clone(), account_creation_fee))
814 .collect();
815
816 let all_account_ids: BTreeSet<_> = account_ids_accessed
819 .union(&account_ids_not_accessed)
820 .collect();
821 let tokens_used: BTreeSet<(TokenId, Option<AccountId>)> = if has_coinbase {
822 all_account_ids
823 .iter()
824 .map(|id| {
825 let token_id = id.token_id.clone();
826 let token_owner = staged_ledger.ledger().token_owner(token_id.clone());
827 (token_id, token_owner)
828 })
829 .collect()
830 } else {
831 BTreeSet::new()
832 };
833
834 let sender_receipt_chains_from_parent_ledger = senders
835 .filter_map(|sender| {
836 if let Some(location) = staged_ledger.ledger().location_of_account(&sender) {
837 staged_ledger.ledger().get(location).map(|account| {
838 (
839 sender,
840 v2::ReceiptChainHash::from(account.receipt_chain_hash),
841 )
842 })
843 } else {
844 None
845 }
846 })
847 .collect();
848 Some(BlockApplyResultArchive {
849 accounts_accessed,
850 accounts_created,
851 tokens_used,
852 sender_receipt_chains_from_parent_ledger,
853 })
854 } else {
855 None
856 };
857
858 self.sync
859 .staged_ledgers
860 .insert(Arc::new(ledger_hashes), staged_ledger);
861
862 Ok(BlockApplyResult {
865 block,
866 just_emitted_a_proof,
867 archive_data,
868 })
869 }
870
871 pub fn commit(
872 &mut self,
873 ledgers_to_keep: LedgersToKeep,
874 root_snarked_ledger_updates: TransitionFrontierRootSnarkedLedgerUpdates,
875 needed_protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
876 new_root: &ArcBlockWithHash,
877 new_best_tip: &ArcBlockWithHash,
878 ) -> CommitResult {
879 mina_core::debug!(mina_core::log::system_time();
880 kind = "LedgerService::commit",
881 summary = format!("commit {}, {}", new_best_tip.height(), new_best_tip.hash()),
882 new_root = format!("{}, {}", new_root.height(), new_root.hash()),
883 new_root_staking_epoch_ledger = new_root.staking_epoch_ledger_hash().to_string(),
884 new_root_next_epoch_ledger = new_root.next_epoch_ledger_hash().to_string(),
885 new_root_snarked_ledger = new_root.snarked_ledger_hash().to_string(),
886 );
887 self.recreate_snarked_ledger(
888 &root_snarked_ledger_updates,
889 &needed_protocol_states,
890 new_root.snarked_ledger_hash(),
891 )
892 .unwrap();
893
894 self.snarked_ledgers.retain(|hash, _| {
895 let keep = ledgers_to_keep.contains(hash);
896 if !keep {
897 mina_core::debug!(mina_core::log::system_time();
898 kind = "LedgerService::commit - snarked_ledgers.drop",
899 summary = format!("drop snarked ledger {hash}"));
900 }
901 keep
902 });
903 self.snarked_ledgers.extend(
904 std::mem::take(&mut self.sync.snarked_ledgers)
905 .into_iter()
906 .filter(|(hash, _)| {
907 let keep = ledgers_to_keep.contains(hash);
908 if !keep {
909 mina_core::debug!(mina_core::log::system_time();
910 kind = "LedgerService::commit - snarked_ledgers.drop",
911 summary = format!("drop snarked ledger {hash}"));
912 }
913 keep
914 }),
915 );
916
917 self.staged_ledgers
918 .retain(|hash| ledgers_to_keep.contains(hash));
919 self.staged_ledgers.extend(
920 self.sync
921 .staged_ledgers
922 .take()
923 .into_iter()
924 .filter(|(hash, _)| ledgers_to_keep.contains(&**hash)),
925 );
926
927 for ledger_hash in [
928 new_best_tip.staking_epoch_ledger_hash(),
929 new_root.snarked_ledger_hash(),
930 new_root.merkle_root_hash(),
931 ] {
932 if let Some((mut mask, is_synced)) = self.mask(ledger_hash) {
933 if !is_synced {
934 panic!("ledger mask expected to be synced: {ledger_hash}");
935 }
936 let calculated = merkle_root(&mut mask);
937 assert_eq!(ledger_hash, &calculated, "ledger mask hash mismatch");
938 } else {
939 panic!("ledger mask is missing: {ledger_hash}");
940 }
941 }
942
943 for (ledger_hash, snarked_ledger) in self.snarked_ledgers.iter_mut() {
944 while let Some((parent_hash, parent)) = snarked_ledger
945 .get_parent()
946 .map(|mut parent| (merkle_root(&mut parent), parent))
947 .filter(|(parent_hash, _)| !ledgers_to_keep.contains(parent_hash))
948 {
949 mina_core::debug!(mina_core::log::system_time();
950 kind = "LedgerService::commit - mask.commit_and_reparent",
951 summary = format!("{ledger_hash} -> {parent_hash}"));
952 snarked_ledger.commit();
953 snarked_ledger.unregister_mask(UnregisterBehavior::Check);
954 *snarked_ledger = parent;
955 }
956 }
957
958 let Some(new_root_ledger) = self.staged_ledgers.get_mut(new_root.staged_ledger_hashes())
960 else {
961 return Default::default();
962 };
963
964 new_root_ledger.commit_and_reparent_to_root();
966
967 let needed_protocol_states = self
968 .staged_ledger_mut(new_root.staged_ledger_hashes())
969 .map(|l| {
970 l.scan_state()
971 .required_state_hashes()
972 .into_iter()
973 .map(|fp| DataHashLibStateHashStableV1(fp.into()).into())
974 .collect()
975 })
976 .unwrap_or_default();
977
978 let available_jobs = Arc::new(
979 self.staged_ledger_mut(new_best_tip.staged_ledger_hashes())
980 .map(|l| {
981 l.scan_state()
982 .all_job_pairs_iter()
983 .map(|job| job.map(|single| AvailableJobMessage::from(single)))
984 .collect()
985 })
986 .unwrap_or_default(),
987 );
988
989 CommitResult {
992 alive_masks: ::ledger::mask::alive_len(),
993 available_jobs,
994 needed_protocol_states,
995 }
996 }
997
998 #[allow(dead_code)]
999 fn check_alive_masks(&mut self) {
1000 let mut alive: BTreeSet<_> = ::ledger::mask::alive_collect();
1001 let staged_ledgers = self
1002 .staged_ledgers
1003 .staged_ledgers
1004 .iter()
1005 .map(|(hash, ledger)| (&hash.non_snark.ledger_hash, ledger.ledger_ref()));
1006
1007 let alive_ledgers = self
1008 .snarked_ledgers
1009 .iter()
1010 .chain(staged_ledgers)
1011 .map(|(hash, mask)| {
1012 let uuid = mask.get_uuid();
1013 if !alive.remove(&uuid) {
1014 bug_condition!("mask not found among alive masks! uuid: {uuid}, hash: {hash}");
1015 }
1016 (uuid, hash)
1017 })
1018 .collect::<Vec<_>>();
1019 mina_core::debug!(redux::Timestamp::global_now(); "alive_ledgers_after_commit: {alive_ledgers:#?}");
1020
1021 if !alive.is_empty() {
1022 bug_condition!(
1023 "masks alive which are no longer part of the ledger service: {alive:#?}"
1024 );
1025 }
1026 }
1027
1028 pub fn get_num_accounts(
1029 &mut self,
1030 ledger_hash: v2::LedgerHash,
1031 ) -> Option<(u64, v2::LedgerHash)> {
1032 let (mask, _) = self
1033 .mask(&ledger_hash)
1034 .filter(|(_, is_synced)| *is_synced)?;
1035 let num_accounts = mask.num_accounts() as u64;
1038 let first_node_addr = ledger::Address::first(
1039 LEDGER_DEPTH.saturating_sub(super::tree_height_for_num_accounts(num_accounts)),
1040 );
1041 let hash = LedgerHash::from_fp(mask.get_hash(first_node_addr)?);
1042 Some((num_accounts, hash))
1043 }
1044
1045 pub fn get_child_hashes(
1046 &mut self,
1047 ledger_hash: v2::LedgerHash,
1048 addr: LedgerAddress,
1049 ) -> Option<(v2::LedgerHash, v2::LedgerHash)> {
1050 let (mask, is_synced) = self.mask(&ledger_hash)?;
1051 let get_hash = |addr: LedgerAddress| {
1052 let depth = addr.length();
1053 mask.get_hash(addr)
1054 .map(|fp| MinaBaseLedgerHash0StableV1(fp.into()).into())
1055 .or_else(|| {
1056 if is_synced {
1057 Some(ledger_empty_hash_at_depth(depth))
1058 } else {
1059 None
1060 }
1061 })
1062 };
1063 let (left, right) = (addr.child_left(), addr.child_right());
1064 Some((get_hash(left)?, get_hash(right)?))
1065 }
1066
1067 pub fn get_child_accounts(
1068 &mut self,
1069 ledger_hash: v2::LedgerHash,
1070 addr: LedgerAddress,
1071 ) -> Option<Vec<v2::MinaBaseAccountBinableArgStableV2>> {
1072 let (mask, _) = self
1073 .mask(&ledger_hash)
1074 .filter(|(_, is_synced)| *is_synced)?;
1075 let accounts = mask
1077 .get_all_accounts_rooted_at(addr)?
1078 .into_iter()
1079 .map(|(_, account)| (&*account).into())
1080 .collect();
1081 Some(accounts)
1082 }
1083
1084 pub fn get_accounts(
1085 &mut self,
1086 ledger_hash: v2::LedgerHash,
1087 ids: Vec<AccountId>,
1088 ) -> Vec<Account> {
1089 let Some((mask, _)) = self.mask(&ledger_hash) else {
1090 mina_core::warn!(
1091 mina_core::log::system_time();
1092 kind = "LedgerService::get_accounts",
1093 summary = format!("Ledger not found: {ledger_hash:?}")
1094 );
1095 return Vec::new();
1096 };
1097 let addrs = mask
1098 .location_of_account_batch(&ids)
1099 .into_iter()
1100 .filter_map(|(_id, addr)| addr)
1101 .collect::<Vec<_>>();
1102
1103 mask.get_batch(&addrs)
1104 .into_iter()
1105 .filter_map(|(_, account)| account.map(|account| *account))
1106 .collect::<Vec<_>>()
1107 }
1108
1109 pub fn staged_ledger_aux_and_pending_coinbase(
1110 &mut self,
1111 ledger_hash: &MinaBaseStagedLedgerHashStableV1,
1112 protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
1113 ) -> Option<Arc<StagedLedgerAuxAndPendingCoinbases>> {
1114 let ledger = self.staged_ledger_mut(ledger_hash)?;
1115 let needed_blocks = ledger
1116 .scan_state()
1117 .required_state_hashes()
1118 .into_iter()
1119 .map(|fp| DataHashLibStateHashStableV1(fp.into()))
1120 .map(|hash| protocol_states.get(&hash.into()).ok_or(()).cloned())
1121 .collect::<Result<_, _>>()
1122 .ok()?;
1123 ledger.pending_coinbase_collection_merkle_root();
1126 Some(
1127 StagedLedgerAuxAndPendingCoinbases {
1128 scan_state: (ledger.scan_state()).into(),
1129 staged_ledger_hash: ledger_hash.non_snark.ledger_hash.clone(),
1130 pending_coinbase: (ledger.pending_coinbase_collection()).into(),
1131 needed_blocks,
1132 }
1133 .into(),
1134 )
1135 }
1136
1137 #[allow(clippy::too_many_arguments)]
1138 pub fn staged_ledger_diff_create(
1139 &mut self,
1140 pred_block: AppliedBlock,
1141 global_slot_since_genesis: v2::MinaNumbersGlobalSlotSinceGenesisMStableV1,
1142 is_new_epoch: bool,
1143 producer: NonZeroCurvePoint,
1144 delegator: NonZeroCurvePoint,
1145 coinbase_receiver: NonZeroCurvePoint,
1146 completed_snarks: BTreeMap<SnarkJobId, Snark>,
1147 supercharge_coinbase: bool,
1148 transactions_by_fee: Vec<valid::UserCommand>,
1149 ) -> Result<StagedLedgerDiffCreateOutput, String> {
1150 let mut staged_ledger = self
1151 .staged_ledger_mut(pred_block.staged_ledger_hashes())
1152 .ok_or_else(|| {
1153 format!(
1154 "parent staged ledger missing: {}",
1155 pred_block.merkle_root_hash()
1156 )
1157 })?
1158 .clone();
1159
1160 staged_ledger.hash();
1162 let pending_coinbase_witness =
1163 MinaBasePendingCoinbaseStableV2::from(staged_ledger.pending_coinbase_collection());
1164
1165 let protocol_state_view =
1166 protocol_state_view(&pred_block.header().protocol_state).map_err(error_to_string)?;
1167
1168 let (pre_diff, _invalid_txns) = staged_ledger
1170 .create_diff(
1171 constraint_constants(),
1172 (&global_slot_since_genesis).into(),
1173 Some(true),
1174 (&coinbase_receiver).try_into().map_err(error_to_string)?,
1175 (),
1176 &protocol_state_view,
1177 transactions_by_fee,
1178 |stmt| {
1179 let job_id = SnarkJobId::from(stmt);
1180 match completed_snarks.get(&job_id) {
1181 Some(snark) => snark.try_into().ok(),
1182 None => None,
1183 }
1184 },
1185 supercharge_coinbase,
1186 )
1187 .map_err(|err| format!("{err:?}"))?;
1188
1189 let pred_body_hash = pred_block
1193 .header()
1194 .protocol_state
1195 .body
1196 .try_hash()
1197 .map_err(error_to_string)?;
1198 let diff = (&pre_diff).into();
1199
1200 let res = staged_ledger
1201 .apply_diff_unchecked(
1202 constraint_constants(),
1203 (&global_slot_since_genesis).into(),
1204 pre_diff,
1205 (),
1206 &protocol_state_view,
1207 (
1208 pred_block.hash().0.to_field().map_err(error_to_string)?,
1209 pred_body_hash.0.to_field().map_err(error_to_string)?,
1210 ),
1211 (&coinbase_receiver).try_into().map_err(error_to_string)?,
1212 supercharge_coinbase,
1213 )
1214 .map_err(|err| format!("{err:?}"))?;
1215
1216 let diff_hash = block_body_hash(&diff).map_err(|err| format!("{err:?}"))?;
1217 let staking_ledger_hash = if is_new_epoch {
1218 pred_block.next_epoch_ledger_hash()
1219 } else {
1220 pred_block.staking_epoch_ledger_hash()
1221 };
1222
1223 Ok(StagedLedgerDiffCreateOutput {
1224 diff,
1225 diff_hash,
1226 staged_ledger_hash: (&res.hash_after_applying).into(),
1227 emitted_ledger_proof: res
1228 .ledger_proof
1229 .map(|(proof, ..)| (&proof).into())
1230 .map(Arc::new),
1231 pending_coinbase_update: (&res.pending_coinbase_update.1).into(),
1232 pending_coinbase_witness: MinaBasePendingCoinbaseWitnessStableV2 {
1233 pending_coinbases: pending_coinbase_witness,
1234 is_new_stack: res.pending_coinbase_update.0,
1235 },
1236 stake_proof_sparse_ledger: self
1237 .stake_proof_sparse_ledger(staking_ledger_hash, &producer, &delegator)
1238 .map_err(error_to_string)?,
1239 })
1240 }
1241
1242 pub fn stake_proof_sparse_ledger(
1243 &mut self,
1244 staking_ledger: &LedgerHash,
1245 producer: &NonZeroCurvePoint,
1246 delegator: &NonZeroCurvePoint,
1247 ) -> Result<v2::MinaBaseSparseLedgerBaseStableV2, InvalidBigInt> {
1248 let mask = self.snarked_ledgers.get(staking_ledger).unwrap();
1249 let producer_id = ledger::AccountId::new(producer.try_into()?, ledger::TokenId::default());
1250 let delegator_id =
1251 ledger::AccountId::new(delegator.try_into()?, ledger::TokenId::default());
1252 let sparse_ledger = ledger::sparse_ledger::SparseLedger::of_ledger_subset_exn(
1253 mask.clone(),
1254 &[producer_id, delegator_id],
1255 );
1256 Ok((&sparse_ledger).into())
1257 }
1258
1259 pub fn scan_state_summary(
1260 &self,
1261 staged_ledger_hash: &MinaBaseStagedLedgerHashStableV1,
1262 ) -> Result<Vec<Vec<RpcScanStateSummaryScanStateJob>>, String> {
1263 use ledger::scan_state::scan_state::JobValue;
1264
1265 let ledger = self.staged_ledgers.get(staged_ledger_hash);
1266 let Some(ledger) = ledger else {
1267 return Ok(Vec::new());
1268 };
1269 ledger
1270 .scan_state()
1271 .view()
1272 .map(|jobs| {
1273 let jobs = jobs.collect::<Vec<JobValueWithIndex<'_>>>();
1274 let mut iter = jobs.iter().peekable();
1275 let mut res = Vec::with_capacity(jobs.len());
1276
1277 loop {
1278 let Some(job) = iter.next() else { break };
1279
1280 let (stmt, seq_no, job_kind, is_done) = match &job.job {
1281 JobValue::Leaf(JobValueBase::Empty)
1282 | JobValue::Node(JobValueMerge::Empty)
1283 | JobValue::Node(JobValueMerge::Part(_)) => {
1284 res.push(RpcScanStateSummaryScanStateJob::Empty);
1285 continue;
1286 }
1287 JobValue::Leaf(JobValueBase::Full(job)) => {
1288 let stmt = &job.job.statement;
1289 let tx = job.job.transaction_with_info.transaction();
1290 let status = (&tx.status).into();
1291 let tx = MinaTransactionTransactionStableV2::from(&tx.data);
1292 let kind = RpcScanStateSummaryScanStateJobKind::Base(
1293 RpcScanStateSummaryBlockTransaction {
1294 hash: tx.hash().ok(),
1295 kind: (&tx).into(),
1296 status,
1297 },
1298 );
1299 let seq_no = job.seq_no.as_u64();
1300 (stmt.clone(), seq_no, kind, job.state.is_done())
1301 }
1302 JobValue::Node(JobValueMerge::Full(job)) => {
1303 let stmt = job
1304 .left
1305 .proof
1306 .statement()
1307 .merge(&job.right.proof.statement())
1308 .unwrap();
1309 let kind = RpcScanStateSummaryScanStateJobKind::Merge;
1310 let seq_no = job.seq_no.as_u64();
1311 (stmt, seq_no, kind, job.state.is_done())
1312 }
1313 };
1314 let stmt: MinaStateBlockchainStateValueStableV2LedgerProofStatement =
1315 (&stmt).into();
1316 let job_id: SnarkJobId = (&stmt.source, &stmt.target).into();
1317
1318 let bundle =
1319 job.bundle_sibling()
1320 .and_then(|(sibling_index, is_sibling_left)| {
1321 let sibling_job = jobs.get(sibling_index)?;
1322 let sibling_stmt: MinaStateBlockchainStateValueStableV2LedgerProofStatement = match &sibling_job.job {
1323 JobValue::Leaf(JobValueBase::Full(job)) => {
1324 (&job.job.statement).into()
1325 }
1326 JobValue::Node(JobValueMerge::Full(job)) => (&job
1327 .left
1328 .proof
1329 .statement()
1330 .merge(&job.right.proof.statement())
1331 .unwrap()).into(),
1332 _ => return None,
1333 };
1334 let bundle_job_id: SnarkJobId = match is_sibling_left {
1335 false => (&stmt.source, &sibling_stmt.target).into(),
1336 true => (&sibling_stmt.source, &stmt.target).into(),
1337 };
1338 Some((bundle_job_id, is_sibling_left))
1339 });
1340
1341 let bundle_job_id = bundle
1342 .as_ref()
1343 .map_or_else(|| job_id.clone(), |(id, _)| id.clone());
1344
1345 if is_done {
1346 let is_left =
1347 bundle.map_or_else(|| true, |(_, is_sibling_left)| !is_sibling_left);
1348 let parent = job.parent().ok_or_else(|| format!("job(depth: {}, index: {}) has no parent", job.depth(), job.index()))?;
1349 let sok_message: MinaBaseSokMessageStableV1 = {
1350 let job = jobs.get(parent).ok_or_else(|| format!("job(depth: {}, index: {}) parent not found", job.depth(), job.index()))?;
1351 match &job.job {
1352 JobValue::Node(JobValueMerge::Part(job)) if is_left => {
1353 (&job.sok_message).into()
1354 }
1355 JobValue::Node(JobValueMerge::Full(job)) => {
1356 if is_left {
1357 (&job.left.sok_message).into()
1358 } else {
1359 (&job.right.sok_message).into()
1360 }
1361 }
1362 _state => {
1363 res.push(RpcScanStateSummaryScanStateJob::Empty);
1368 continue;
1369 }
1370 }
1371 };
1372 res.push(RpcScanStateSummaryScanStateJob::Done {
1373 job_id,
1374 bundle_job_id,
1375 job: Box::new(job_kind),
1376 seq_no,
1377 snark: Box::new(RpcSnarkPoolJobSnarkWorkDone {
1378 snarker: sok_message.prover,
1379 fee: sok_message.fee,
1380 }),
1381 });
1382 } else {
1383 res.push(RpcScanStateSummaryScanStateJob::Todo {
1384 job_id,
1385 bundle_job_id,
1386 job: job_kind,
1387 seq_no,
1388 });
1389 }
1390 }
1391 Ok(res)
1392 })
1393 .collect()
1394 }
1395}
1396
1397impl LedgerSyncState {
1398 fn mask(&self, hash: &LedgerHash) -> Option<(Mask, bool)> {
1399 self.snarked_ledgers
1400 .get(hash)
1401 .cloned()
1402 .map(|mask| (mask, false))
1403 .or_else(|| Some((self.staged_ledgers.get_mask(hash)?, true)))
1404 }
1405
1406 fn pending_sync_snarked_ledger_mask(&self, hash: &LedgerHash) -> Result<Mask, String> {
1407 self.snarked_ledgers
1408 .get(hash)
1409 .cloned()
1410 .ok_or_else(|| format!("Missing sync snarked ledger {}", hash))
1411 }
1412
1413 fn snarked_ledger_mut(&mut self, hash: LedgerHash) -> Result<&mut Mask, InvalidBigInt> {
1416 let hash_fp = hash.to_field()?;
1417 Ok(self.snarked_ledgers.entry(hash.clone()).or_insert_with(|| {
1418 let mut ledger = Mask::create(LEDGER_DEPTH);
1419 ledger.set_cached_hash_unchecked(&LedgerAddress::root(), hash_fp);
1420 ledger
1421 }))
1422 }
1423
1424 fn staged_ledger_mut(
1425 &mut self,
1426 hash: &MinaBaseStagedLedgerHashStableV1,
1427 ) -> Option<&mut StagedLedger> {
1428 self.staged_ledgers.get_mut(hash)
1429 }
1430}
1431
1432fn staged_ledger_reconstruct(
1433 snarked_ledger: Mask,
1434 snarked_ledger_hash: LedgerHash,
1435 parts: Option<Arc<StagedLedgerAuxAndPendingCoinbasesValid>>,
1436) -> Result<(v2::LedgerHash, Result<StagedLedger, String>), InvalidBigInt> {
1437 let staged_ledger_hash = parts
1438 .as_ref()
1439 .map(|p| p.staged_ledger_hash.clone())
1440 .unwrap_or_else(|| snarked_ledger_hash.clone());
1441
1442 let ledger = snarked_ledger.make_child();
1443
1444 let mut result = if let Some(parts) = &parts {
1445 let states = parts
1446 .needed_blocks
1447 .iter()
1448 .map(|state| Ok((state.try_hash()?.to_field()?, state.clone())))
1449 .collect::<Result<BTreeMap<Fp, _>, _>>()?;
1450
1451 StagedLedger::of_scan_state_pending_coinbases_and_snarked_ledger(
1452 (),
1453 constraint_constants(),
1454 Verifier,
1455 (&parts.scan_state).try_into()?,
1456 ledger,
1457 LocalState::empty(),
1458 parts.staged_ledger_hash.to_field()?,
1459 (&parts.pending_coinbase).try_into()?,
1460 |key| states.get(&key).cloned().unwrap(),
1461 )
1462 } else {
1463 StagedLedger::create_exn(constraint_constants().clone(), ledger)
1464 };
1465
1466 match result.as_mut() {
1467 Ok(staged_ledger) => {
1468 staged_ledger.commit_and_reparent_to_root();
1469 }
1470 Err(_) => {
1471 if let Err(e) = dump_reconstruct_to_file(&snarked_ledger, &parts) {
1472 mina_core::error!(
1473 mina_core::log::system_time();
1474 kind = "LedgerService::dump - Failed reconstruct",
1475 summary = format!("Failed to save reconstruction to file: {e:?}")
1476 );
1477 }
1478 }
1479 }
1480
1481 Ok((staged_ledger_hash, result))
1482}
1483
1484pub trait LedgerService: redux::Service {
1485 fn ledger_manager(&self) -> &LedgerManager;
1486 fn force_sync_calls(&self) -> bool {
1487 false
1488 }
1489
1490 fn write_init(&mut self, request: LedgerWriteRequest) {
1491 let request = LedgerRequest::Write(request);
1492 if self.force_sync_calls() {
1493 let _ = self.ledger_manager().call_sync(request);
1494 } else {
1495 self.ledger_manager().call(request);
1496 }
1497 }
1498
1499 fn read_init(&mut self, id: LedgerReadId, request: LedgerReadRequest) {
1500 let request = LedgerRequest::Read(id, request);
1501 if self.force_sync_calls() {
1502 let _ = self.ledger_manager().call_sync(request);
1503 } else {
1504 self.ledger_manager().call(request);
1505 }
1506 }
1507}
1508
1509fn dump_reconstruct_to_file(
1512 snarked_ledger: &Mask,
1513 parts: &Option<Arc<StagedLedgerAuxAndPendingCoinbasesValid>>,
1514) -> std::io::Result<()> {
1515 use mina_p2p_messages::binprot::{
1516 self,
1517 macros::{BinProtRead, BinProtWrite},
1518 };
1519
1520 #[derive(BinProtRead, BinProtWrite)]
1521 struct ReconstructContext {
1522 accounts: Vec<v2::MinaBaseAccountBinableArgStableV2>,
1523 scan_state: v2::TransactionSnarkScanStateStableV2,
1524 pending_coinbase: v2::MinaBasePendingCoinbaseStableV2,
1525 staged_ledger_hash: LedgerHash,
1526 states: List<v2::MinaStateProtocolStateValueStableV2>,
1527 }
1528
1529 let Some(parts) = parts else {
1530 return Err(std::io::ErrorKind::Other.into());
1531 };
1532
1533 let StagedLedgerAuxAndPendingCoinbasesValid {
1534 scan_state,
1535 staged_ledger_hash,
1536 pending_coinbase,
1537 needed_blocks,
1538 } = &**parts;
1539
1540 let reconstruct_context = ReconstructContext {
1541 accounts: snarked_ledger
1542 .to_list()
1543 .iter()
1544 .map(v2::MinaBaseAccountBinableArgStableV2::from)
1545 .collect(),
1546 scan_state: scan_state.clone(),
1547 pending_coinbase: pending_coinbase.clone(),
1548 staged_ledger_hash: staged_ledger_hash.clone(),
1549 states: needed_blocks.clone(),
1550 };
1551
1552 let debug_dir = mina_core::get_debug_dir();
1553 let filename = debug_dir
1554 .join("failed_reconstruct_ctx.binprot")
1555 .to_string_lossy()
1556 .to_string();
1557 std::fs::create_dir_all(&debug_dir)?;
1558
1559 use mina_p2p_messages::binprot::BinProtWrite;
1560 let mut file = std::fs::File::create(&filename)?;
1561 reconstruct_context.binprot_write(&mut file)?;
1562 file.sync_all()?;
1563
1564 mina_core::info!(
1565 mina_core::log::system_time();
1566 kind = "LedgerService::dump - Failed reconstruct",
1567 summary = format!("Reconstruction saved to: {filename:?}")
1568 );
1569
1570 Ok(())
1571}
1572
1573fn dump_application_to_file(
1578 staged_ledger: &StagedLedger,
1579 block: ArcBlockWithHash,
1580 pred_block: AppliedBlock,
1581) -> std::io::Result<String> {
1582 use mina_p2p_messages::binprot::{
1583 self,
1584 macros::{BinProtRead, BinProtWrite},
1585 };
1586
1587 #[derive(BinProtRead, BinProtWrite)]
1588 struct ApplyContext {
1589 accounts: Vec<v2::MinaBaseAccountBinableArgStableV2>,
1590 scan_state: v2::TransactionSnarkScanStateStableV2,
1591 pending_coinbase: v2::MinaBasePendingCoinbaseStableV2,
1592 pred_block: v2::MinaBlockBlockStableV2,
1593 blocks: Vec<v2::MinaBlockBlockStableV2>,
1594 }
1595
1596 let cs = &block.block.header.protocol_state.body.consensus_state;
1597 let block_height = cs.blockchain_length.as_u32();
1598
1599 let apply_context = ApplyContext {
1600 accounts: staged_ledger
1601 .ledger()
1602 .to_list()
1603 .iter()
1604 .map(v2::MinaBaseAccountBinableArgStableV2::from)
1605 .collect::<Vec<_>>(),
1606 scan_state: staged_ledger.scan_state().into(),
1607 pending_coinbase: staged_ledger.pending_coinbase_collection().into(),
1608 pred_block: (**pred_block.block()).clone(),
1609 blocks: vec![(*block.block).clone()],
1610 };
1611
1612 let debug_dir = mina_core::get_debug_dir();
1613 let filename = debug_dir
1614 .join(format!("failed_application_ctx_{}.binprot", block_height))
1615 .to_string_lossy()
1616 .to_string();
1617 std::fs::create_dir_all(&debug_dir)?;
1618
1619 let mut file = std::fs::File::create(&filename)?;
1620
1621 use mina_p2p_messages::binprot::BinProtWrite;
1622 apply_context.binprot_write(&mut file)?;
1623 file.sync_all()?;
1624
1625 Ok(filename)
1626}
1627
1628#[cfg(test)]
1629mod tests {
1630 use mina_p2p_messages::v2::MinaBaseLedgerHash0StableV1;
1631
1632 use crate::ledger::hash_node_at_depth;
1633
1634 use super::*;
1635
1636 #[test]
1637 fn test_ledger_hash() {
1638 IntoIterator::into_iter([(
1639 LedgerAddress::root(),
1640 "jx5YAT36bv62M8mPcREYYfZWXaKqqMzDCP8wmc21uf4CfDKAHCr",
1641 "jxo5pSyt16XGwA9UeuAdiFDzrwFH3smbNTJF7fxq98w1y9Jem2m",
1642 "jwq3nCDr8XejL8HKDxR5qVhFJbKoUTGZgtLBZCp3MrqLTnqmjdP",
1643 )])
1644 .map(|(addr, expected_hash, left, right)| {
1645 let left: LedgerHash = left.parse().unwrap();
1646 let right: LedgerHash = right.parse().unwrap();
1647 (addr, expected_hash, left, right)
1648 })
1649 .for_each(|(address, expected_hash, left, right)| {
1650 let hash = hash_node_at_depth(
1651 address.length(),
1652 left.0.to_field().unwrap(),
1653 right.0.to_field().unwrap(),
1654 );
1655 let hash: LedgerHash = MinaBaseLedgerHash0StableV1(hash.into()).into();
1656 assert_eq!(hash.to_string(), expected_hash);
1657 });
1658 }
1659}