1use super::{
2 ledger_empty_hash_at_depth,
3 read::{LedgerReadId, LedgerReadRequest, LedgerReadResponse},
4 write::{CommitResult, LedgerWriteRequest, LedgerWriteResponse, LedgersToKeep},
5 LedgerAddress, LedgerEvent, LEDGER_DEPTH,
6};
7use crate::{
8 account::AccountPublicKey,
9 block_producer_effectful::StagedLedgerDiffCreateOutput,
10 ledger::{
11 ledger_manager::{LedgerManager, LedgerRequest},
12 write::{BlockApplyResult, BlockApplyResultArchive},
13 },
14 p2p::channels::rpc::StagedLedgerAuxAndPendingCoinbases,
15 rpc::{
16 RpcScanStateSummaryBlockTransaction, RpcScanStateSummaryScanStateJob,
17 RpcScanStateSummaryScanStateJobKind, RpcSnarkPoolJobSnarkWorkDone,
18 },
19 transition_frontier::{
20 genesis::empty_pending_coinbase_hash,
21 sync::{
22 ledger::staged::StagedLedgerAuxAndPendingCoinbasesValid,
23 TransitionFrontierRootSnarkedLedgerUpdates,
24 },
25 },
26};
27use ledger::{
28 scan_state::{
29 currency::Slot,
30 scan_state::{AvailableJobMessage, JobValueBase, JobValueMerge, JobValueWithIndex, Pass},
31 transaction_logic::{
32 local_state::LocalState,
33 protocol_state::{protocol_state_view, ProtocolStateView},
34 transaction_partially_applied::TransactionPartiallyApplied,
35 valid,
36 zkapp_command::AccessedOrNot,
37 Transaction, TransactionStatus, UserCommand,
38 },
39 },
40 sparse_ledger::SparseLedger,
41 staged_ledger::{
42 diff::Diff,
43 staged_ledger::{SkipVerification, StagedLedger},
44 validate_block::block_body_hash,
45 },
46 verifier::Verifier,
47 Account, AccountId, AccountIndex, BaseLedger, Database, Mask, TokenId, UnregisterBehavior,
48};
49use mina_core::{
50 block::{AppliedBlock, ArcBlockWithHash},
51 bug_condition,
52 constants::constraint_constants,
53 snark::{Snark, SnarkJobId},
54 thread,
55};
56use mina_curves::pasta::Fp;
57use mina_p2p_messages::{
58 bigint::InvalidBigInt,
59 binprot::BinProtRead,
60 list::List,
61 v2::{
62 self, DataHashLibStateHashStableV1, LedgerHash, MinaBaseLedgerHash0StableV1,
63 MinaBasePendingCoinbaseStableV2, MinaBasePendingCoinbaseWitnessStableV2,
64 MinaBaseSokMessageStableV1, MinaBaseStagedLedgerHashStableV1,
65 MinaStateBlockchainStateValueStableV2LedgerProofStatement,
66 MinaStateProtocolStateValueStableV2, MinaTransactionTransactionStableV2, NonZeroCurvePoint,
67 StateHash,
68 },
69};
70use mina_signer::CompressedPubKey;
71use std::{
72 collections::{BTreeMap, BTreeSet},
73 path::Path,
74 sync::Arc,
75};
76
77fn merkle_root(mask: &mut Mask) -> LedgerHash {
78 MinaBaseLedgerHash0StableV1(mask.merkle_root().into()).into()
79}
80
81fn error_to_string(e: InvalidBigInt) -> String {
82 format!("{:?}", e)
83}
84
85#[derive(Default)]
87struct StagedLedgersStorage {
88 by_merkle_root_hash: BTreeMap<LedgerHash, Vec<Arc<MinaBaseStagedLedgerHashStableV1>>>,
90 staged_ledgers: BTreeMap<Arc<MinaBaseStagedLedgerHashStableV1>, StagedLedger>,
91}
92
93impl StagedLedgersStorage {
94 fn insert_by_recomputing_hash(&mut self, mut staged_ledger: StagedLedger) {
97 let staged_ledger_hash: MinaBaseStagedLedgerHashStableV1 = (&staged_ledger.hash()).into();
98 self.insert(Arc::new(staged_ledger_hash), staged_ledger);
99 }
100
101 fn insert(
102 &mut self,
103 staged_ledger_hash: Arc<MinaBaseStagedLedgerHashStableV1>,
104 staged_ledger: StagedLedger,
105 ) {
106 let merkle_root_hash: LedgerHash = merkle_root(&mut staged_ledger.ledger());
107 self.by_merkle_root_hash
108 .entry(merkle_root_hash.clone())
109 .or_insert_with(|| Vec::with_capacity(1))
110 .extend([staged_ledger_hash.clone()]);
111 self.staged_ledgers
112 .insert(staged_ledger_hash, staged_ledger);
113 }
114
115 fn get_mask(&self, root_hash: &LedgerHash) -> Option<Mask> {
116 let staged_ledger_hashes: &Vec<_> = self.by_merkle_root_hash.get(root_hash)?;
117 self.staged_ledgers
120 .get(staged_ledger_hashes.first()?)
121 .map(|staged_ledger| staged_ledger.ledger())
122 }
123
124 fn get(&self, staged_ledger_hash: &MinaBaseStagedLedgerHashStableV1) -> Option<&StagedLedger> {
125 self.staged_ledgers.get(staged_ledger_hash)
126 }
127
128 fn get_mut(
129 &mut self,
130 staged_ledger_hash: &MinaBaseStagedLedgerHashStableV1,
131 ) -> Option<&mut StagedLedger> {
132 self.staged_ledgers.get_mut(staged_ledger_hash)
133 }
134
135 fn retain<F>(&mut self, fun: F)
136 where
137 F: Fn(&MinaBaseStagedLedgerHashStableV1) -> bool,
138 {
139 self.by_merkle_root_hash.retain(|_, staged_ledger_hashes| {
140 staged_ledger_hashes.retain(|hash| {
141 if fun(hash) {
142 return true;
143 }
144 self.staged_ledgers.remove(hash);
145 false
146 });
147 !staged_ledger_hashes.is_empty()
148 });
149 }
150
151 fn extend<I>(&mut self, iterator: I)
152 where
153 I: IntoIterator<Item = (Arc<MinaBaseStagedLedgerHashStableV1>, StagedLedger)>,
154 {
155 for (hash, staged_ledger) in iterator.into_iter() {
156 self.insert(hash, staged_ledger);
157 }
158 }
159
160 fn take(&mut self) -> BTreeMap<Arc<MinaBaseStagedLedgerHashStableV1>, StagedLedger> {
161 let Self {
162 by_merkle_root_hash,
163 staged_ledgers,
164 } = self;
165
166 let _ = std::mem::take(by_merkle_root_hash);
167 std::mem::take(staged_ledgers)
168 }
169}
170
171#[derive(Default)]
172pub struct LedgerCtx {
173 snarked_ledgers: BTreeMap<LedgerHash, Mask>,
174 additional_snarked_ledgers: BTreeMap<LedgerHash, Mask>,
176 staged_ledgers: StagedLedgersStorage,
177 sync: LedgerSyncState,
178 archive_mode: bool,
180 event_sender: Option<mina_core::channels::mpsc::UnboundedSender<crate::event_source::Event>>,
181}
182
183#[derive(Default)]
184struct LedgerSyncState {
185 snarked_ledgers: BTreeMap<LedgerHash, Mask>,
186 staged_ledgers: StagedLedgersStorage,
187}
188
189impl LedgerCtx {
190 pub fn new_with_additional_snarked_ledgers<P>(path: P) -> Self
191 where
192 P: AsRef<Path>,
193 {
194 use std::fs;
195
196 let Ok(dir) = fs::read_dir(path) else {
197 return Self::default();
198 };
199
200 let additional_snarked_ledgers = dir
201 .filter_map(|entry| {
202 let entry = entry.ok()?;
203 let hash = entry.file_name().to_str()?.parse().ok()?;
204 let mut file = fs::File::open(entry.path()).ok()?;
205
206 let _ = Option::<LedgerHash>::binprot_read(&mut file).ok()?;
207
208 let accounts = Vec::<Account>::binprot_read(&mut file).ok()?;
209 let mut mask = Mask::new_root(Database::create(35));
210 for account in accounts {
211 let account_id = account.id();
212 mask.get_or_create_account(account_id, account).unwrap();
213 }
214 Some((hash, mask))
215 })
216 .collect();
217
218 LedgerCtx {
219 additional_snarked_ledgers,
220 ..Default::default()
221 }
222 }
223
224 pub fn set_archive_mode(&mut self) {
225 self.archive_mode = true;
226 }
227
228 pub fn set_event_sender(
231 &mut self,
232 event_sender: mina_core::channels::mpsc::UnboundedSender<crate::event_source::Event>,
233 ) {
234 self.event_sender = Some(event_sender);
235 }
236
237 pub(super) fn send_event(&self, event: LedgerEvent) {
238 if let Some(tx) = self.event_sender.as_ref() {
239 let _ = tx.send(event.into());
240 }
241 }
242
243 pub(super) fn send_write_response(&self, resp: LedgerWriteResponse) {
244 self.send_event(LedgerEvent::Write(resp))
245 }
246
247 pub(super) fn send_read_response(&self, id: LedgerReadId, resp: LedgerReadResponse) {
248 self.send_event(LedgerEvent::Read(id, resp))
249 }
250
251 pub fn insert_genesis_ledger(&mut self, mut mask: Mask) {
252 let merkle_root_hash = merkle_root(&mut mask);
253 let staged_ledger =
254 StagedLedger::create_exn(constraint_constants().clone(), mask.copy()).unwrap();
255 self.snarked_ledgers.insert(merkle_root_hash.clone(), mask);
256 let staged_ledger_hash =
258 MinaBaseStagedLedgerHashStableV1::zero(merkle_root_hash, empty_pending_coinbase_hash());
259 self.staged_ledgers
260 .insert(Arc::new(staged_ledger_hash), staged_ledger);
261 }
262
263 pub fn staged_ledger_reconstruct_result_store(&mut self, ledger: StagedLedger) {
264 self.staged_ledgers.insert_by_recomputing_hash(ledger);
265 }
266
267 pub fn get_accounts_for_rpc(
269 &self,
270 ledger_hash: LedgerHash,
271 requested_public_key: Option<AccountPublicKey>,
272 ) -> Vec<Account> {
273 if let Some((mask, _)) = self.mask(&ledger_hash) {
274 let mut accounts = Vec::new();
275 let mut single_account = Vec::new();
276
277 mask.iter(|account| {
278 accounts.push(account.clone());
279 if let Some(public_key) = requested_public_key.as_ref() {
280 if public_key == &AccountPublicKey::from(account.public_key.clone()) {
281 single_account.push(account.clone());
282 }
283 }
284 });
285
286 if requested_public_key.is_some() {
287 single_account
288 } else {
289 accounts
290 }
291 } else {
292 vec![]
293 }
294 }
295
296 pub fn mask(&self, hash: &LedgerHash) -> Option<(Mask, bool)> {
299 self.snarked_ledgers
300 .get(hash)
301 .cloned()
302 .map(|mask| (mask, true))
303 .or_else(|| Some((self.staged_ledgers.get_mask(hash)?, true)))
304 .or_else(|| self.sync.mask(hash))
305 .or_else(|| {
306 self.additional_snarked_ledgers
307 .get(hash)
308 .map(|l| (l.clone(), true))
309 })
310 }
311
312 pub fn contains_snarked_ledger(&self, hash: &LedgerHash) -> bool {
313 self.snarked_ledgers.contains_key(hash)
314 }
315
316 pub fn pending_sync_snarked_ledger_mask(&self, hash: &LedgerHash) -> Result<Mask, String> {
318 self.sync.pending_sync_snarked_ledger_mask(hash)
319 }
320
321 pub fn copy_snarked_ledger_contents_for_sync(
324 &mut self,
325 origin_snarked_ledger_hash: LedgerHash,
326 target_snarked_ledger_hash: LedgerHash,
327 overwrite: bool,
328 ) -> Result<bool, String> {
329 if !overwrite
330 && self
331 .sync
332 .snarked_ledgers
333 .contains_key(&target_snarked_ledger_hash)
334 {
335 return Ok(false);
336 }
337
338 let origin = self
339 .snarked_ledgers
340 .get(&origin_snarked_ledger_hash)
341 .or_else(|| {
342 self.sync.snarked_ledgers.get(&origin_snarked_ledger_hash)
345 })
346 .ok_or(format!(
347 "Tried to copy from non-existing snarked ledger with hash: {}",
348 origin_snarked_ledger_hash
349 ))?;
350
351 let target = origin.copy();
352 self.sync
353 .snarked_ledgers
354 .insert(target_snarked_ledger_hash, target);
355
356 Ok(true)
357 }
358
359 pub fn compute_snarked_ledger_hashes(
360 &mut self,
361 snarked_ledger_hash: &LedgerHash,
362 ) -> Result<(), String> {
363 let origin = self
364 .snarked_ledgers
365 .get_mut(snarked_ledger_hash)
366 .or_else(|| self.sync.snarked_ledgers.get_mut(snarked_ledger_hash))
367 .ok_or(format!(
368 "Cannot hash non-existing snarked ledger: {}",
369 snarked_ledger_hash
370 ))?;
371
372 let _force_hashing = origin.merkle_root();
375
376 Ok(())
377 }
378
379 fn staged_ledger_mut(
381 &mut self,
382 hash: &MinaBaseStagedLedgerHashStableV1,
383 ) -> Option<&mut StagedLedger> {
384 match self.staged_ledgers.get_mut(hash) {
385 Some(v) => Some(v),
386 None => self.sync.staged_ledger_mut(hash),
387 }
388 }
389
390 fn recreate_snarked_ledger(
391 &mut self,
392 root_snarked_ledger_updates: &TransitionFrontierRootSnarkedLedgerUpdates,
393 needed_protocol_states: &BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
394 snarked_ledger_hash: &LedgerHash,
395 ) -> Result<(), String> {
396 let Some(update) = root_snarked_ledger_updates.get(snarked_ledger_hash) else {
397 return Ok(());
398 };
399 self.recreate_snarked_ledger(
400 root_snarked_ledger_updates,
401 needed_protocol_states,
402 &update.parent,
403 )?;
404
405 self.push_snarked_ledger(
406 needed_protocol_states,
407 &update.parent,
408 snarked_ledger_hash,
409 &update.staged_ledger_hash,
410 )
411 }
412
413 fn push_snarked_ledger(
414 &mut self,
415 protocol_states: &BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
416 old_root_snarked_ledger_hash: &LedgerHash,
417 new_root_snarked_ledger_hash: &LedgerHash,
418 new_root_staged_ledger_hash: &MinaBaseStagedLedgerHashStableV1,
419 ) -> Result<(), String> {
420 mina_core::debug!(mina_core::log::system_time();
421 kind = "LedgerService::push_snarked_ledger",
422 summary = format!("{old_root_snarked_ledger_hash} -> {new_root_snarked_ledger_hash}"));
423 let constraint_constants = &constraint_constants();
425
426 let root_snarked_ledger = self
428 .snarked_ledgers
429 .get(old_root_snarked_ledger_hash)
430 .or_else(|| self.sync.snarked_ledgers.get(old_root_snarked_ledger_hash))
431 .ok_or_else(|| {
432 format!(
433 "push_snarked_ledger: could not find old root snarked ledger: {}",
434 old_root_snarked_ledger_hash,
435 )
436 })?;
437 let mut mt = root_snarked_ledger.make_child();
438
439 let apply_first_pass = |global_slot: Slot,
441 txn_state_view: &ProtocolStateView,
442 ledger: &mut Mask,
443 transaction: &Transaction| {
444 ledger::scan_state::transaction_logic::apply_transaction_first_pass(
445 constraint_constants,
446 global_slot,
447 txn_state_view,
448 ledger,
449 transaction,
450 )
451 };
452
453 let apply_second_pass = |ledger: &mut Mask, tx: TransactionPartiallyApplied<Mask>| {
454 ledger::scan_state::transaction_logic::apply_transaction_second_pass(
455 constraint_constants,
456 ledger,
457 tx,
458 )
459 };
460
461 let apply_first_pass_sparse_ledger =
462 |global_slot: Slot,
463 txn_state_view: &ProtocolStateView,
464 sparse_ledger: &mut SparseLedger,
465 transaction: &Transaction| {
466 ledger::scan_state::transaction_logic::apply_transaction_first_pass(
467 constraint_constants,
468 global_slot,
469 txn_state_view,
470 sparse_ledger,
471 transaction,
472 )
473 };
474
475 let get_protocol_state = |state_hash: Fp| {
476 let state_hash = StateHash::from_fp(state_hash);
477 if let Some(s) = protocol_states.get(&state_hash) {
478 Ok(s.clone())
479 } else {
480 Err(format!(
481 "Failed to find protocol state for state hash: {}",
482 state_hash
483 ))
484 }
485 };
486
487 let scan_state = self
488 .staged_ledger_mut(new_root_staged_ledger_hash)
489 .ok_or_else(|| {
490 format!(
491 "Failed to find staged ledger with hash: {:#?}",
492 new_root_staged_ledger_hash
493 )
494 })?
495 .scan_state();
496
497 let Pass::FirstPassLedgerHash(_first_pass_ledger_target) = scan_state
498 .get_snarked_ledger_sync(
499 &mut mt,
500 get_protocol_state,
501 apply_first_pass,
502 apply_second_pass,
503 apply_first_pass_sparse_ledger,
504 )?;
505
506 let expected_hash = new_root_snarked_ledger_hash;
508 let obtained_hash = LedgerHash::from_fp(mt.merkle_root());
509
510 if expected_hash != &obtained_hash {
511 return Err(format!(
512 "Expected to obtain snarked root ledger hash {} but got {}",
513 expected_hash, obtained_hash
514 ));
515 }
516
517 self.sync
518 .snarked_ledgers
519 .insert(new_root_snarked_ledger_hash.clone(), mt);
520
521 Ok(())
522 }
523
524 pub fn get_account_delegators(
525 &self,
526 ledger_hash: &LedgerHash,
527 account_id: &AccountId,
528 ) -> Option<Vec<Account>> {
529 let (mask, _) = self.mask(ledger_hash)?;
530 let mut accounts = Vec::new();
531
532 mask.iter(|account| {
533 if account.delegate == Some(account_id.public_key.clone()) {
534 accounts.push(account.clone());
535 }
536 });
537
538 Some(accounts)
539 }
540
541 #[allow(clippy::type_complexity)]
542 pub fn producers_with_delegates<F: FnMut(&CompressedPubKey) -> bool>(
543 &self,
544 ledger_hash: &LedgerHash,
545 mut filter: F,
546 ) -> Option<BTreeMap<AccountPublicKey, Vec<(ledger::AccountIndex, AccountPublicKey, u64)>>>
547 {
548 let (mask, _) = self.mask(ledger_hash)?;
549 let mut accounts = Vec::new();
550
551 mask.iter(|account| {
552 if filter(account.delegate.as_ref().unwrap_or(&account.public_key)) {
553 accounts.push((
554 account.id(),
555 account.delegate.clone(),
556 account.balance.as_u64(),
557 ))
558 }
559 });
560
561 let producers = accounts.into_iter().fold(
562 BTreeMap::<_, Vec<_>>::new(),
563 |mut producers, (id, delegate, balance)| {
564 let index = mask.index_of_account(id.clone()).unwrap();
565 let pub_key = AccountPublicKey::from(id.public_key);
566 let producer = delegate.map(Into::into).unwrap_or(pub_key.clone());
567 producers
568 .entry(producer)
569 .or_default()
570 .push((index, pub_key, balance));
571 producers
572 },
573 );
574 Some(producers)
575 }
576
577 pub fn child_hashes_get(
578 &mut self,
579 snarked_ledger_hash: LedgerHash,
580 parent: &LedgerAddress,
581 ) -> Result<(LedgerHash, LedgerHash), String> {
582 let mut mask = self.pending_sync_snarked_ledger_mask(&snarked_ledger_hash)?;
583 let left_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.child_left())?);
584 let right_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.child_right())?);
585
586 Ok((left_hash, right_hash))
587 }
588
589 pub fn accounts_set(
590 &mut self,
591 snarked_ledger_hash: LedgerHash,
592 parent: &LedgerAddress,
593 accounts: Vec<v2::MinaBaseAccountBinableArgStableV2>,
594 ) -> Result<LedgerHash, String> {
595 let mut mask = self.pending_sync_snarked_ledger_mask(&snarked_ledger_hash)?;
596 let accounts: Vec<_> = accounts
597 .into_iter()
598 .map(|account| Ok(Box::new((&account).try_into()?)))
599 .collect::<Result<Vec<_>, InvalidBigInt>>()
600 .map_err(error_to_string)?;
601
602 mask.set_all_accounts_rooted_at(parent.clone(), &accounts)
603 .map_err(|_| "Failed when setting accounts".to_owned())?;
604
605 let computed_hash = LedgerHash::from_fp(mask.get_inner_hash_at_addr(parent.clone())?);
606
607 Ok(computed_hash)
608 }
609
610 pub fn staged_ledger_reconstruct<F>(
611 &mut self,
612 snarked_ledger_hash: LedgerHash,
613 parts: Option<Arc<StagedLedgerAuxAndPendingCoinbasesValid>>,
614 callback: F,
615 ) -> Result<(), InvalidBigInt>
616 where
617 F: 'static + FnOnce(v2::LedgerHash, Result<StagedLedger, String>) + Send,
618 {
619 let snarked_ledger = self
620 .sync
621 .snarked_ledger_mut(snarked_ledger_hash.clone())?
622 .copy();
623
624 thread::Builder::new()
625 .name("staged-ledger-reconstruct".into())
626 .spawn(move || {
627 match staged_ledger_reconstruct(snarked_ledger, snarked_ledger_hash, parts) {
628 Ok((staged_ledger_hash, result)) => {
629 callback(staged_ledger_hash, result);
630 }
631 Err(e) => callback(v2::LedgerHash::zero(), Err(format!("{:?}", e))),
632 }
633 })
634 .expect("Failed: staged ledger reconstruct thread");
635
636 Ok(())
637 }
638
639 pub fn staged_ledger_reconstruct_sync(
640 &mut self,
641 snarked_ledger_hash: LedgerHash,
642 parts: Option<Arc<StagedLedgerAuxAndPendingCoinbasesValid>>,
643 ) -> Result<(v2::LedgerHash, Result<(), String>), InvalidBigInt> {
644 let snarked_ledger = self
645 .sync
646 .snarked_ledger_mut(snarked_ledger_hash.clone())?
647 .copy();
648 let (staged_ledger_hash, result) =
649 staged_ledger_reconstruct(snarked_ledger, snarked_ledger_hash, parts)?;
650 let result = match result {
651 Err(err) => Err(err),
652 Ok(staged_ledger) => {
653 self.staged_ledger_reconstruct_result_store(staged_ledger);
654 Ok(())
655 }
656 };
657
658 Ok((staged_ledger_hash, result))
659 }
660
661 pub fn block_apply(
662 &mut self,
663 block: ArcBlockWithHash,
664 pred_block: AppliedBlock,
665 skip_verification: Option<SkipVerification>,
666 ) -> Result<BlockApplyResult, String> {
667 mina_core::info!(mina_core::log::system_time();
668 kind = "LedgerService::block_apply",
669 summary = format!("{}, {} <- {}", block.height(), block.hash(), block.pred_hash()),
670 pred_staged_ledger_hash = pred_block.merkle_root_hash().to_string(),
671 staged_ledger_hash = block.merkle_root_hash().to_string(),
672 );
673 let mut staged_ledger = self
674 .staged_ledger_mut(pred_block.staged_ledger_hashes())
675 .ok_or_else(|| {
676 format!(
677 "parent staged ledger missing: {:#?}",
678 pred_block.staged_ledger_hashes()
679 )
680 })?
681 .clone();
682
683 let global_slot = block.global_slot_since_genesis();
684 let prev_protocol_state = &pred_block.header().protocol_state;
685 let prev_state_view = protocol_state_view(prev_protocol_state).map_err(error_to_string)?;
686
687 let consensus_state = &block.header().protocol_state.body.consensus_state;
688 let coinbase_receiver: CompressedPubKey = (&consensus_state.coinbase_receiver)
689 .try_into()
690 .map_err(error_to_string)?;
691 let supercharge_coinbase = consensus_state.supercharge_coinbase;
692
693 let diff: Diff = (&block.body().staged_ledger_diff)
694 .try_into()
695 .map_err(error_to_string)?;
696
697 let prev_protocol_state: ledger::proofs::block::ProtocolState =
698 prev_protocol_state.try_into()?;
699
700 let result = staged_ledger
701 .apply(
702 skip_verification,
703 constraint_constants(),
704 Slot::from_u32(global_slot),
705 diff,
706 (),
707 &Verifier,
708 &prev_state_view,
709 prev_protocol_state.hashes(),
710 coinbase_receiver.clone(),
711 supercharge_coinbase,
712 )
713 .map_err(|err| format!("{err:?}"))?;
714 let just_emitted_a_proof = result.ledger_proof.is_some();
715 let ledger_hashes = MinaBaseStagedLedgerHashStableV1::from(&result.hash_after_applying);
716
717 let expected_ledger_hashes = block.staged_ledger_hashes();
719 if &ledger_hashes != expected_ledger_hashes {
720 let staged_ledger = self
721 .staged_ledger_mut(pred_block.staged_ledger_hashes())
722 .unwrap(); match dump_application_to_file(staged_ledger, block.clone(), pred_block) {
725 Ok(filename) => mina_core::info!(
726 mina_core::log::system_time();
727 kind = "LedgerService::dump - Failed application",
728 summary = format!("StagedLedger and block saved to: {filename:?}")
729 ),
730 Err(e) => mina_core::error!(
731 mina_core::log::system_time();
732 kind = "LedgerService::dump - Failed application",
733 summary = format!("Failed to save block application to file: {e:?}")
734 ),
735 }
736
737 panic!("staged ledger hash mismatch. found: {ledger_hashes:#?}, expected: {expected_ledger_hashes:#?}");
738 }
739
740 let archive_data = if self.archive_mode {
741 let senders = block
742 .body()
743 .transactions()
744 .filter_map(|tx| UserCommand::try_from(tx).ok().map(|cmd| cmd.fee_payer()))
745 .collect::<BTreeSet<_>>()
746 .into_iter();
747
748 let coinbase_receiver_id = AccountId::new(coinbase_receiver, TokenId::default());
749
750 let (accessed, not_accessed): (BTreeSet<_>, BTreeSet<_>) = block
752 .body()
753 .tranasctions_with_status()
754 .flat_map(|(tx, status)| {
755 let status: TransactionStatus = status.into();
756 UserCommand::try_from(tx)
757 .ok()
758 .map(|cmd| cmd.account_access_statuses(&status))
759 .into_iter()
760 .flatten()
761 })
762 .partition(|(_, status)| *status == AccessedOrNot::Accessed);
763
764 let mut account_ids_accessed: BTreeSet<_> =
765 accessed.into_iter().map(|(id, _)| id).collect();
766 let mut account_ids_not_accessed: BTreeSet<_> =
767 not_accessed.into_iter().map(|(id, _)| id).collect();
768
769 let has_coinbase = block.body().has_coinbase();
775
776 if has_coinbase {
777 account_ids_accessed.insert(coinbase_receiver_id);
778 } else {
779 account_ids_not_accessed.insert(coinbase_receiver_id);
780 }
781
782 let fee_transfer_accounts =
784 block.body().coinbase_fee_transfers_iter().filter_map(|cb| {
785 let receiver: CompressedPubKey = cb.receiver_pk.inner().try_into().ok()?;
786 let account_id = AccountId::new(receiver, TokenId::default());
787 Some(account_id)
788 });
789 account_ids_accessed.extend(fee_transfer_accounts);
790
791 let accounts_accessed: Vec<(AccountIndex, Account)> = account_ids_accessed
793 .iter()
794 .filter_map(|id| {
795 staged_ledger
796 .ledger()
797 .index_of_account(id.clone())
798 .and_then(|index| {
799 staged_ledger
800 .ledger()
801 .get_at_index(index)
802 .map(|account| (index, *account))
803 })
804 })
805 .collect();
806
807 let account_creation_fee = constraint_constants().account_creation_fee;
808
809 let accounts_created: Vec<(AccountId, u64)> = staged_ledger
811 .latest_block_accounts_created(pred_block.hash().to_field()?)
812 .iter()
813 .map(|id| (id.clone(), account_creation_fee))
814 .collect();
815
816 let all_account_ids: BTreeSet<_> = account_ids_accessed
819 .union(&account_ids_not_accessed)
820 .collect();
821 let tokens_used: BTreeSet<(TokenId, Option<AccountId>)> = if has_coinbase {
822 all_account_ids
823 .iter()
824 .map(|id| {
825 let token_id = id.token_id.clone();
826 let token_owner = staged_ledger.ledger().token_owner(token_id.clone());
827 (token_id, token_owner)
828 })
829 .collect()
830 } else {
831 BTreeSet::new()
832 };
833
834 let sender_receipt_chains_from_parent_ledger = senders
835 .filter_map(|sender| {
836 if let Some(location) = staged_ledger.ledger().location_of_account(&sender) {
837 staged_ledger.ledger().get(location).map(|account| {
838 (
839 sender,
840 v2::ReceiptChainHash::from(account.receipt_chain_hash),
841 )
842 })
843 } else {
844 None
845 }
846 })
847 .collect();
848 Some(BlockApplyResultArchive {
849 accounts_accessed,
850 accounts_created,
851 tokens_used,
852 sender_receipt_chains_from_parent_ledger,
853 })
854 } else {
855 None
856 };
857
858 self.sync
859 .staged_ledgers
860 .insert(Arc::new(ledger_hashes), staged_ledger);
861
862 Ok(BlockApplyResult {
865 block,
866 just_emitted_a_proof,
867 archive_data,
868 })
869 }
870
871 pub fn commit(
872 &mut self,
873 ledgers_to_keep: LedgersToKeep,
874 root_snarked_ledger_updates: TransitionFrontierRootSnarkedLedgerUpdates,
875 needed_protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
876 new_root: &ArcBlockWithHash,
877 new_best_tip: &ArcBlockWithHash,
878 ) -> CommitResult {
879 mina_core::debug!(mina_core::log::system_time();
880 kind = "LedgerService::commit",
881 summary = format!("commit {}, {}", new_best_tip.height(), new_best_tip.hash()),
882 new_root = format!("{}, {}", new_root.height(), new_root.hash()),
883 new_root_staking_epoch_ledger = new_root.staking_epoch_ledger_hash().to_string(),
884 new_root_next_epoch_ledger = new_root.next_epoch_ledger_hash().to_string(),
885 new_root_snarked_ledger = new_root.snarked_ledger_hash().to_string(),
886 );
887 self.recreate_snarked_ledger(
888 &root_snarked_ledger_updates,
889 &needed_protocol_states,
890 new_root.snarked_ledger_hash(),
891 )
892 .unwrap();
893
894 self.snarked_ledgers.retain(|hash, _| {
895 let keep = ledgers_to_keep.contains(hash);
896 if !keep {
897 mina_core::debug!(mina_core::log::system_time();
898 kind = "LedgerService::commit - snarked_ledgers.drop",
899 summary = format!("drop snarked ledger {hash}"));
900 }
901 keep
902 });
903 self.snarked_ledgers.extend(
904 std::mem::take(&mut self.sync.snarked_ledgers)
905 .into_iter()
906 .filter(|(hash, _)| {
907 let keep = ledgers_to_keep.contains(hash);
908 if !keep {
909 mina_core::debug!(mina_core::log::system_time();
910 kind = "LedgerService::commit - snarked_ledgers.drop",
911 summary = format!("drop snarked ledger {hash}"));
912 }
913 keep
914 }),
915 );
916
917 self.staged_ledgers
918 .retain(|hash| ledgers_to_keep.contains(hash));
919 self.staged_ledgers.extend(
920 self.sync
921 .staged_ledgers
922 .take()
923 .into_iter()
924 .filter(|(hash, _)| ledgers_to_keep.contains(&**hash)),
925 );
926
927 for ledger_hash in [
928 new_best_tip.staking_epoch_ledger_hash(),
929 new_root.snarked_ledger_hash(),
930 new_root.merkle_root_hash(),
931 ] {
932 if let Some((mut mask, is_synced)) = self.mask(ledger_hash) {
933 if !is_synced {
934 panic!("ledger mask expected to be synced: {ledger_hash}");
935 }
936 let calculated = merkle_root(&mut mask);
937 assert_eq!(ledger_hash, &calculated, "ledger mask hash mismatch");
938 } else {
939 panic!("ledger mask is missing: {ledger_hash}");
940 }
941 }
942
943 for (ledger_hash, snarked_ledger) in self.snarked_ledgers.iter_mut() {
944 while let Some((parent_hash, parent)) = snarked_ledger
945 .get_parent()
946 .map(|mut parent| (merkle_root(&mut parent), parent))
947 .filter(|(parent_hash, _)| !ledgers_to_keep.contains(parent_hash))
948 {
949 mina_core::debug!(mina_core::log::system_time();
950 kind = "LedgerService::commit - mask.commit_and_reparent",
951 summary = format!("{ledger_hash} -> {parent_hash}"));
952 snarked_ledger.commit();
953 snarked_ledger.unregister_mask(UnregisterBehavior::Check);
954 *snarked_ledger = parent;
955 }
956 }
957
958 let Some(new_root_ledger) = self.staged_ledgers.get_mut(new_root.staged_ledger_hashes())
960 else {
961 return Default::default();
962 };
963
964 new_root_ledger.commit_and_reparent_to_root();
966
967 let needed_protocol_states = self
968 .staged_ledger_mut(new_root.staged_ledger_hashes())
969 .map(|l| {
970 l.scan_state()
971 .required_state_hashes()
972 .into_iter()
973 .map(|fp| DataHashLibStateHashStableV1(fp.into()).into())
974 .collect()
975 })
976 .unwrap_or_default();
977
978 let available_jobs = Arc::new(
979 self.staged_ledger_mut(new_best_tip.staged_ledger_hashes())
980 .map(|l| {
981 l.scan_state()
982 .all_job_pairs_iter()
983 .map(|job| job.map(|single| AvailableJobMessage::from(single)))
984 .collect()
985 })
986 .unwrap_or_default(),
987 );
988
989 CommitResult {
992 alive_masks: ::ledger::mask::alive_len(),
993 available_jobs,
994 needed_protocol_states,
995 }
996 }
997
998 #[allow(dead_code)]
999 fn check_alive_masks(&mut self) {
1000 let mut alive: BTreeSet<_> = ::ledger::mask::alive_collect();
1001 let staged_ledgers = self
1002 .staged_ledgers
1003 .staged_ledgers
1004 .iter()
1005 .map(|(hash, ledger)| (&hash.non_snark.ledger_hash, ledger.ledger_ref()));
1006
1007 let alive_ledgers = self
1008 .snarked_ledgers
1009 .iter()
1010 .chain(staged_ledgers)
1011 .map(|(hash, mask)| {
1012 let uuid = mask.get_uuid();
1013 if !alive.remove(&uuid) {
1014 bug_condition!("mask not found among alive masks! uuid: {uuid}, hash: {hash}");
1015 }
1016 (uuid, hash)
1017 })
1018 .collect::<Vec<_>>();
1019 mina_core::debug!(redux::Timestamp::global_now(); "alive_ledgers_after_commit: {alive_ledgers:#?}");
1020
1021 if !alive.is_empty() {
1022 bug_condition!(
1023 "masks alive which are no longer part of the ledger service: {alive:#?}"
1024 );
1025 }
1026 }
1027
1028 pub fn get_num_accounts(
1029 &mut self,
1030 ledger_hash: v2::LedgerHash,
1031 ) -> Option<(u64, v2::LedgerHash)> {
1032 let (mask, _) = self
1033 .mask(&ledger_hash)
1034 .filter(|(_, is_synced)| *is_synced)?;
1035 let num_accounts = mask.num_accounts() as u64;
1038 let first_node_addr = ledger::Address::first(
1039 LEDGER_DEPTH.saturating_sub(super::tree_height_for_num_accounts(num_accounts)),
1040 );
1041 let hash = LedgerHash::from_fp(mask.get_hash(first_node_addr)?);
1042 Some((num_accounts, hash))
1043 }
1044
1045 pub fn get_child_hashes(
1046 &mut self,
1047 ledger_hash: v2::LedgerHash,
1048 addr: LedgerAddress,
1049 ) -> Option<(v2::LedgerHash, v2::LedgerHash)> {
1050 let (mask, is_synced) = self.mask(&ledger_hash)?;
1051 let get_hash = |addr: LedgerAddress| {
1052 let depth = addr.length();
1053 mask.get_hash(addr)
1054 .map(|fp| MinaBaseLedgerHash0StableV1(fp.into()).into())
1055 .or_else(|| {
1056 if is_synced {
1057 Some(ledger_empty_hash_at_depth(depth))
1058 } else {
1059 None
1060 }
1061 })
1062 };
1063 let (left, right) = (addr.child_left(), addr.child_right());
1064 Some((get_hash(left)?, get_hash(right)?))
1065 }
1066
1067 pub fn get_child_accounts(
1068 &mut self,
1069 ledger_hash: v2::LedgerHash,
1070 addr: LedgerAddress,
1071 ) -> Option<Vec<v2::MinaBaseAccountBinableArgStableV2>> {
1072 let (mask, _) = self
1073 .mask(&ledger_hash)
1074 .filter(|(_, is_synced)| *is_synced)?;
1075 let accounts = mask
1077 .get_all_accounts_rooted_at(addr)?
1078 .into_iter()
1079 .map(|(_, account)| (&*account).into())
1080 .collect();
1081 Some(accounts)
1082 }
1083
1084 pub fn get_accounts(
1085 &mut self,
1086 ledger_hash: v2::LedgerHash,
1087 ids: Vec<AccountId>,
1088 ) -> Vec<Account> {
1089 let Some((mask, _)) = self.mask(&ledger_hash) else {
1090 mina_core::warn!(
1091 mina_core::log::system_time();
1092 kind = "LedgerService::get_accounts",
1093 summary = format!("Ledger not found: {ledger_hash:?}")
1094 );
1095 return Vec::new();
1096 };
1097 let addrs = mask
1098 .location_of_account_batch(&ids)
1099 .into_iter()
1100 .filter_map(|(_id, addr)| addr)
1101 .collect::<Vec<_>>();
1102
1103 mask.get_batch(&addrs)
1104 .into_iter()
1105 .filter_map(|(_, account)| account.map(|account| *account))
1106 .collect::<Vec<_>>()
1107 }
1108
1109 pub fn staged_ledger_aux_and_pending_coinbase(
1110 &mut self,
1111 ledger_hash: &MinaBaseStagedLedgerHashStableV1,
1112 protocol_states: BTreeMap<StateHash, MinaStateProtocolStateValueStableV2>,
1113 ) -> Option<Arc<StagedLedgerAuxAndPendingCoinbases>> {
1114 let ledger = self.staged_ledger_mut(ledger_hash)?;
1115 let needed_blocks = ledger
1116 .scan_state()
1117 .required_state_hashes()
1118 .into_iter()
1119 .map(|fp| DataHashLibStateHashStableV1(fp.into()))
1120 .map(|hash| protocol_states.get(&hash.into()).ok_or(()).cloned())
1121 .collect::<Result<_, _>>()
1122 .ok()?;
1123 ledger.pending_coinbase_collection_merkle_root();
1126 Some(
1127 StagedLedgerAuxAndPendingCoinbases {
1128 scan_state: (ledger.scan_state()).into(),
1129 staged_ledger_hash: ledger_hash.non_snark.ledger_hash.clone(),
1130 pending_coinbase: (ledger.pending_coinbase_collection()).into(),
1131 needed_blocks,
1132 }
1133 .into(),
1134 )
1135 }
1136
1137 #[allow(clippy::too_many_arguments)]
1138 pub fn staged_ledger_diff_create(
1139 &mut self,
1140 pred_block: AppliedBlock,
1141 global_slot_since_genesis: v2::MinaNumbersGlobalSlotSinceGenesisMStableV1,
1142 is_new_epoch: bool,
1143 producer: NonZeroCurvePoint,
1144 delegator: NonZeroCurvePoint,
1145 coinbase_receiver: NonZeroCurvePoint,
1146 completed_snarks: BTreeMap<SnarkJobId, Snark>,
1147 supercharge_coinbase: bool,
1148 transactions_by_fee: Vec<valid::UserCommand>,
1149 ) -> Result<StagedLedgerDiffCreateOutput, String> {
1150 let mut staged_ledger = self
1151 .staged_ledger_mut(pred_block.staged_ledger_hashes())
1152 .ok_or_else(|| {
1153 format!(
1154 "parent staged ledger missing: {}",
1155 pred_block.merkle_root_hash()
1156 )
1157 })?
1158 .clone();
1159
1160 staged_ledger.hash();
1162 let pending_coinbase_witness =
1163 MinaBasePendingCoinbaseStableV2::from(staged_ledger.pending_coinbase_collection());
1164
1165 let protocol_state_view =
1166 protocol_state_view(&pred_block.header().protocol_state).map_err(error_to_string)?;
1167
1168 let (pre_diff, _invalid_txns) = staged_ledger
1170 .create_diff(
1171 constraint_constants(),
1172 (&global_slot_since_genesis).into(),
1173 Some(true),
1174 (&coinbase_receiver).try_into().map_err(error_to_string)?,
1175 (),
1176 &protocol_state_view,
1177 transactions_by_fee,
1178 |stmt| {
1179 let job_id = SnarkJobId::from(stmt);
1180 match completed_snarks.get(&job_id) {
1181 Some(snark) => snark.try_into().ok(),
1182 None => None,
1183 }
1184 },
1185 supercharge_coinbase,
1186 )
1187 .map_err(|err| format!("{err:?}"))?;
1188
1189 let pred_body_hash = pred_block
1193 .header()
1194 .protocol_state
1195 .body
1196 .try_hash()
1197 .map_err(error_to_string)?;
1198 let diff = (&pre_diff).into();
1199
1200 let res = staged_ledger
1201 .apply_diff_unchecked(
1202 constraint_constants(),
1203 (&global_slot_since_genesis).into(),
1204 pre_diff,
1205 (),
1206 &protocol_state_view,
1207 (
1208 pred_block.hash().0.to_field().map_err(error_to_string)?,
1209 pred_body_hash.0.to_field().map_err(error_to_string)?,
1210 ),
1211 (&coinbase_receiver).try_into().map_err(error_to_string)?,
1212 supercharge_coinbase,
1213 )
1214 .map_err(|err| format!("{err:?}"))?;
1215
1216 let diff_hash = block_body_hash(&diff).map_err(|err| format!("{err:?}"))?;
1217 let staking_ledger_hash = if is_new_epoch {
1218 pred_block.next_epoch_ledger_hash()
1219 } else {
1220 pred_block.staking_epoch_ledger_hash()
1221 };
1222
1223 Ok(StagedLedgerDiffCreateOutput {
1224 diff,
1225 diff_hash,
1226 staged_ledger_hash: (&res.hash_after_applying).into(),
1227 emitted_ledger_proof: res
1228 .ledger_proof
1229 .map(|(proof, ..)| (&proof).into())
1230 .map(Arc::new),
1231 pending_coinbase_update: (&res.pending_coinbase_update.1).into(),
1232 pending_coinbase_witness: MinaBasePendingCoinbaseWitnessStableV2 {
1233 pending_coinbases: pending_coinbase_witness,
1234 is_new_stack: res.pending_coinbase_update.0,
1235 },
1236 stake_proof_sparse_ledger: self
1237 .stake_proof_sparse_ledger(staking_ledger_hash, &producer, &delegator)
1238 .map_err(error_to_string)?,
1239 })
1240 }
1241
1242 pub fn stake_proof_sparse_ledger(
1243 &mut self,
1244 staking_ledger: &LedgerHash,
1245 producer: &NonZeroCurvePoint,
1246 delegator: &NonZeroCurvePoint,
1247 ) -> Result<v2::MinaBaseSparseLedgerBaseStableV2, InvalidBigInt> {
1248 let mask = self.snarked_ledgers.get(staking_ledger).unwrap();
1249 let producer_id = ledger::AccountId::new(producer.try_into()?, ledger::TokenId::default());
1250 let delegator_id =
1251 ledger::AccountId::new(delegator.try_into()?, ledger::TokenId::default());
1252 let sparse_ledger = ledger::sparse_ledger::SparseLedger::of_ledger_subset_exn(
1253 mask.clone(),
1254 &[producer_id, delegator_id],
1255 );
1256 Ok((&sparse_ledger).into())
1257 }
1258
1259 pub fn scan_state_summary(
1260 &self,
1261 staged_ledger_hash: &MinaBaseStagedLedgerHashStableV1,
1262 ) -> Result<Vec<Vec<RpcScanStateSummaryScanStateJob>>, String> {
1263 use ledger::scan_state::scan_state::JobValue;
1264
1265 let ledger = self.staged_ledgers.get(staged_ledger_hash);
1266 let Some(ledger) = ledger else {
1267 return Ok(Vec::new());
1268 };
1269 ledger
1270 .scan_state()
1271 .view()
1272 .map(|jobs| {
1273 let jobs = jobs.collect::<Vec<JobValueWithIndex<'_>>>();
1274 let mut iter = jobs.iter().peekable();
1275 let mut res = Vec::with_capacity(jobs.len());
1276
1277 for job in iter {
1278 let (stmt, seq_no, job_kind, is_done) = match &job.job {
1279 JobValue::Leaf(JobValueBase::Empty)
1280 | JobValue::Node(JobValueMerge::Empty)
1281 | JobValue::Node(JobValueMerge::Part(_)) => {
1282 res.push(RpcScanStateSummaryScanStateJob::Empty);
1283 continue;
1284 }
1285 JobValue::Leaf(JobValueBase::Full(job)) => {
1286 let stmt = &job.job.statement;
1287 let tx = job.job.transaction_with_info.transaction();
1288 let status = (&tx.status).into();
1289 let tx = MinaTransactionTransactionStableV2::from(&tx.data);
1290 let kind = RpcScanStateSummaryScanStateJobKind::Base(
1291 RpcScanStateSummaryBlockTransaction {
1292 hash: tx.hash().ok(),
1293 kind: (&tx).into(),
1294 status,
1295 },
1296 );
1297 let seq_no = job.seq_no.as_u64();
1298 (stmt.clone(), seq_no, kind, job.state.is_done())
1299 }
1300 JobValue::Node(JobValueMerge::Full(job)) => {
1301 let stmt = job
1302 .left
1303 .proof
1304 .statement()
1305 .merge(&job.right.proof.statement())
1306 .unwrap();
1307 let kind = RpcScanStateSummaryScanStateJobKind::Merge;
1308 let seq_no = job.seq_no.as_u64();
1309 (stmt, seq_no, kind, job.state.is_done())
1310 }
1311 };
1312 let stmt: MinaStateBlockchainStateValueStableV2LedgerProofStatement =
1313 (&stmt).into();
1314 let job_id: SnarkJobId = (&stmt.source, &stmt.target).into();
1315
1316 let bundle =
1317 job.bundle_sibling()
1318 .and_then(|(sibling_index, is_sibling_left)| {
1319 let sibling_job = jobs.get(sibling_index)?;
1320 let sibling_stmt: MinaStateBlockchainStateValueStableV2LedgerProofStatement = match &sibling_job.job {
1321 JobValue::Leaf(JobValueBase::Full(job)) => {
1322 (&job.job.statement).into()
1323 }
1324 JobValue::Node(JobValueMerge::Full(job)) => (&job
1325 .left
1326 .proof
1327 .statement()
1328 .merge(&job.right.proof.statement())
1329 .unwrap()).into(),
1330 _ => return None,
1331 };
1332 let bundle_job_id: SnarkJobId = match is_sibling_left {
1333 false => (&stmt.source, &sibling_stmt.target).into(),
1334 true => (&sibling_stmt.source, &stmt.target).into(),
1335 };
1336 Some((bundle_job_id, is_sibling_left))
1337 });
1338
1339 let bundle_job_id = bundle
1340 .as_ref()
1341 .map_or_else(|| job_id.clone(), |(id, _)| id.clone());
1342
1343 if is_done {
1344 let is_left =
1345 bundle.map_or_else(|| true, |(_, is_sibling_left)| !is_sibling_left);
1346 let parent = job.parent().ok_or_else(|| format!("job(depth: {}, index: {}) has no parent", job.depth(), job.index()))?;
1347 let sok_message: MinaBaseSokMessageStableV1 = {
1348 let job = jobs.get(parent).ok_or_else(|| format!("job(depth: {}, index: {}) parent not found", job.depth(), job.index()))?;
1349 match &job.job {
1350 JobValue::Node(JobValueMerge::Part(job)) if is_left => {
1351 (&job.sok_message).into()
1352 }
1353 JobValue::Node(JobValueMerge::Full(job)) => {
1354 if is_left {
1355 (&job.left.sok_message).into()
1356 } else {
1357 (&job.right.sok_message).into()
1358 }
1359 }
1360 _state => {
1361 res.push(RpcScanStateSummaryScanStateJob::Empty);
1366 continue;
1367 }
1368 }
1369 };
1370 res.push(RpcScanStateSummaryScanStateJob::Done {
1371 job_id,
1372 bundle_job_id,
1373 job: Box::new(job_kind),
1374 seq_no,
1375 snark: Box::new(RpcSnarkPoolJobSnarkWorkDone {
1376 snarker: sok_message.prover,
1377 fee: sok_message.fee,
1378 }),
1379 });
1380 } else {
1381 res.push(RpcScanStateSummaryScanStateJob::Todo {
1382 job_id,
1383 bundle_job_id,
1384 job: job_kind,
1385 seq_no,
1386 });
1387 }
1388 }
1389 Ok(res)
1390 })
1391 .collect()
1392 }
1393}
1394
1395impl LedgerSyncState {
1396 fn mask(&self, hash: &LedgerHash) -> Option<(Mask, bool)> {
1397 self.snarked_ledgers
1398 .get(hash)
1399 .cloned()
1400 .map(|mask| (mask, false))
1401 .or_else(|| Some((self.staged_ledgers.get_mask(hash)?, true)))
1402 }
1403
1404 fn pending_sync_snarked_ledger_mask(&self, hash: &LedgerHash) -> Result<Mask, String> {
1405 self.snarked_ledgers
1406 .get(hash)
1407 .cloned()
1408 .ok_or_else(|| format!("Missing sync snarked ledger {}", hash))
1409 }
1410
1411 fn snarked_ledger_mut(&mut self, hash: LedgerHash) -> Result<&mut Mask, InvalidBigInt> {
1414 let hash_fp = hash.to_field()?;
1415 Ok(self.snarked_ledgers.entry(hash.clone()).or_insert_with(|| {
1416 let mut ledger = Mask::create(LEDGER_DEPTH);
1417 ledger.set_cached_hash_unchecked(&LedgerAddress::root(), hash_fp);
1418 ledger
1419 }))
1420 }
1421
1422 fn staged_ledger_mut(
1423 &mut self,
1424 hash: &MinaBaseStagedLedgerHashStableV1,
1425 ) -> Option<&mut StagedLedger> {
1426 self.staged_ledgers.get_mut(hash)
1427 }
1428}
1429
1430fn staged_ledger_reconstruct(
1431 snarked_ledger: Mask,
1432 snarked_ledger_hash: LedgerHash,
1433 parts: Option<Arc<StagedLedgerAuxAndPendingCoinbasesValid>>,
1434) -> Result<(v2::LedgerHash, Result<StagedLedger, String>), InvalidBigInt> {
1435 let staged_ledger_hash = parts
1436 .as_ref()
1437 .map(|p| p.staged_ledger_hash.clone())
1438 .unwrap_or_else(|| snarked_ledger_hash.clone());
1439
1440 let ledger = snarked_ledger.make_child();
1441
1442 let mut result = if let Some(parts) = &parts {
1443 let states = parts
1444 .needed_blocks
1445 .iter()
1446 .map(|state| Ok((state.try_hash()?.to_field()?, state.clone())))
1447 .collect::<Result<BTreeMap<Fp, _>, _>>()?;
1448
1449 StagedLedger::of_scan_state_pending_coinbases_and_snarked_ledger(
1450 (),
1451 constraint_constants(),
1452 Verifier,
1453 (&parts.scan_state).try_into()?,
1454 ledger,
1455 LocalState::empty(),
1456 parts.staged_ledger_hash.to_field()?,
1457 (&parts.pending_coinbase).try_into()?,
1458 |key| states.get(&key).cloned().unwrap(),
1459 )
1460 } else {
1461 StagedLedger::create_exn(constraint_constants().clone(), ledger)
1462 };
1463
1464 match result.as_mut() {
1465 Ok(staged_ledger) => {
1466 staged_ledger.commit_and_reparent_to_root();
1467 }
1468 Err(_) => {
1469 if let Err(e) = dump_reconstruct_to_file(&snarked_ledger, &parts) {
1470 mina_core::error!(
1471 mina_core::log::system_time();
1472 kind = "LedgerService::dump - Failed reconstruct",
1473 summary = format!("Failed to save reconstruction to file: {e:?}")
1474 );
1475 }
1476 }
1477 }
1478
1479 Ok((staged_ledger_hash, result))
1480}
1481
1482pub trait LedgerService: redux::Service {
1483 fn ledger_manager(&self) -> &LedgerManager;
1484 fn force_sync_calls(&self) -> bool {
1485 false
1486 }
1487
1488 fn write_init(&mut self, request: LedgerWriteRequest) {
1489 let request = LedgerRequest::Write(request);
1490 if self.force_sync_calls() {
1491 let _ = self.ledger_manager().call_sync(request);
1492 } else {
1493 self.ledger_manager().call(request);
1494 }
1495 }
1496
1497 fn read_init(&mut self, id: LedgerReadId, request: LedgerReadRequest) {
1498 let request = LedgerRequest::Read(id, request);
1499 if self.force_sync_calls() {
1500 let _ = self.ledger_manager().call_sync(request);
1501 } else {
1502 self.ledger_manager().call(request);
1503 }
1504 }
1505}
1506
1507fn dump_reconstruct_to_file(
1510 snarked_ledger: &Mask,
1511 parts: &Option<Arc<StagedLedgerAuxAndPendingCoinbasesValid>>,
1512) -> std::io::Result<()> {
1513 use mina_p2p_messages::binprot::{
1514 self,
1515 macros::{BinProtRead, BinProtWrite},
1516 };
1517
1518 #[derive(BinProtRead, BinProtWrite)]
1519 struct ReconstructContext {
1520 accounts: Vec<v2::MinaBaseAccountBinableArgStableV2>,
1521 scan_state: v2::TransactionSnarkScanStateStableV2,
1522 pending_coinbase: v2::MinaBasePendingCoinbaseStableV2,
1523 staged_ledger_hash: LedgerHash,
1524 states: List<v2::MinaStateProtocolStateValueStableV2>,
1525 }
1526
1527 let Some(parts) = parts else {
1528 return Err(std::io::ErrorKind::Other.into());
1529 };
1530
1531 let StagedLedgerAuxAndPendingCoinbasesValid {
1532 scan_state,
1533 staged_ledger_hash,
1534 pending_coinbase,
1535 needed_blocks,
1536 } = &**parts;
1537
1538 let reconstruct_context = ReconstructContext {
1539 accounts: snarked_ledger
1540 .to_list()
1541 .iter()
1542 .map(v2::MinaBaseAccountBinableArgStableV2::from)
1543 .collect(),
1544 scan_state: scan_state.clone(),
1545 pending_coinbase: pending_coinbase.clone(),
1546 staged_ledger_hash: staged_ledger_hash.clone(),
1547 states: needed_blocks.clone(),
1548 };
1549
1550 let debug_dir = mina_core::get_debug_dir();
1551 let filename = debug_dir
1552 .join("failed_reconstruct_ctx.binprot")
1553 .to_string_lossy()
1554 .to_string();
1555 std::fs::create_dir_all(&debug_dir)?;
1556
1557 use mina_p2p_messages::binprot::BinProtWrite;
1558 let mut file = std::fs::File::create(&filename)?;
1559 reconstruct_context.binprot_write(&mut file)?;
1560 file.sync_all()?;
1561
1562 mina_core::info!(
1563 mina_core::log::system_time();
1564 kind = "LedgerService::dump - Failed reconstruct",
1565 summary = format!("Reconstruction saved to: {filename:?}")
1566 );
1567
1568 Ok(())
1569}
1570
1571fn dump_application_to_file(
1576 staged_ledger: &StagedLedger,
1577 block: ArcBlockWithHash,
1578 pred_block: AppliedBlock,
1579) -> std::io::Result<String> {
1580 use mina_p2p_messages::binprot::{
1581 self,
1582 macros::{BinProtRead, BinProtWrite},
1583 };
1584
1585 #[derive(BinProtRead, BinProtWrite)]
1586 struct ApplyContext {
1587 accounts: Vec<v2::MinaBaseAccountBinableArgStableV2>,
1588 scan_state: v2::TransactionSnarkScanStateStableV2,
1589 pending_coinbase: v2::MinaBasePendingCoinbaseStableV2,
1590 pred_block: v2::MinaBlockBlockStableV2,
1591 blocks: Vec<v2::MinaBlockBlockStableV2>,
1592 }
1593
1594 let cs = &block.block.header.protocol_state.body.consensus_state;
1595 let block_height = cs.blockchain_length.as_u32();
1596
1597 let apply_context = ApplyContext {
1598 accounts: staged_ledger
1599 .ledger()
1600 .to_list()
1601 .iter()
1602 .map(v2::MinaBaseAccountBinableArgStableV2::from)
1603 .collect::<Vec<_>>(),
1604 scan_state: staged_ledger.scan_state().into(),
1605 pending_coinbase: staged_ledger.pending_coinbase_collection().into(),
1606 pred_block: (**pred_block.block()).clone(),
1607 blocks: vec![(*block.block).clone()],
1608 };
1609
1610 let debug_dir = mina_core::get_debug_dir();
1611 let filename = debug_dir
1612 .join(format!("failed_application_ctx_{}.binprot", block_height))
1613 .to_string_lossy()
1614 .to_string();
1615 std::fs::create_dir_all(&debug_dir)?;
1616
1617 let mut file = std::fs::File::create(&filename)?;
1618
1619 use mina_p2p_messages::binprot::BinProtWrite;
1620 apply_context.binprot_write(&mut file)?;
1621 file.sync_all()?;
1622
1623 Ok(filename)
1624}
1625
1626#[cfg(test)]
1627mod tests {
1628 use mina_p2p_messages::v2::MinaBaseLedgerHash0StableV1;
1629
1630 use crate::ledger::hash_node_at_depth;
1631
1632 use super::*;
1633
1634 #[test]
1635 fn test_ledger_hash() {
1636 IntoIterator::into_iter([(
1637 LedgerAddress::root(),
1638 "jx5YAT36bv62M8mPcREYYfZWXaKqqMzDCP8wmc21uf4CfDKAHCr",
1639 "jxo5pSyt16XGwA9UeuAdiFDzrwFH3smbNTJF7fxq98w1y9Jem2m",
1640 "jwq3nCDr8XejL8HKDxR5qVhFJbKoUTGZgtLBZCp3MrqLTnqmjdP",
1641 )])
1642 .map(|(addr, expected_hash, left, right)| {
1643 let left: LedgerHash = left.parse().unwrap();
1644 let right: LedgerHash = right.parse().unwrap();
1645 (addr, expected_hash, left, right)
1646 })
1647 .for_each(|(address, expected_hash, left, right)| {
1648 let hash = hash_node_at_depth(
1649 address.length(),
1650 left.0.to_field().unwrap(),
1651 right.0.to_field().unwrap(),
1652 );
1653 let hash: LedgerHash = MinaBaseLedgerHash0StableV1(hash.into()).into();
1654 assert_eq!(hash.to_string(), expected_hash);
1655 });
1656 }
1657}