1use super::{
2 read::{LedgerReadId, LedgerReadRequest, LedgerReadResponse, LedgerStatus},
3 write::{LedgerWriteRequest, LedgerWriteResponse},
4 LedgerCtx, LedgerService,
5};
6use crate::{
7 account::AccountPublicKey, ledger::LedgerAddress, rpc::AccountQuery,
8 transition_frontier::sync::ledger::snarked::TransitionFrontierSyncLedgerSnarkedService,
9};
10use ledger::{
11 staged_ledger::staged_ledger::{SkipVerification, StagedLedger},
12 Account, AccountId, Mask,
13};
14use mina_core::{channels::mpsc, thread};
15use mina_p2p_messages::v2::{self, LedgerHash, MinaBaseAccountBinableArgStableV2};
16use mina_signer::CompressedPubKey;
17use std::collections::BTreeMap;
18
19#[expect(
26 clippy::large_enum_variant,
27 reason = "This is storing large messages, but size difference is only 2x between largest and second-largest variants"
28)]
29pub(super) enum LedgerRequest {
30 Write(LedgerWriteRequest),
31 Read(LedgerReadId, LedgerReadRequest),
32 AccountsSet {
34 snarked_ledger_hash: LedgerHash,
35 parent: LedgerAddress,
36 accounts: Vec<MinaBaseAccountBinableArgStableV2>,
37 },
38 AccountsGet {
40 ledger_hash: LedgerHash,
41 account_ids: Vec<AccountId>,
42 },
43 ChildHashesGet {
45 snarked_ledger_hash: LedgerHash,
46 parent: LedgerAddress,
47 },
48 ComputeSnarkedLedgerHashes {
50 snarked_ledger_hash: LedgerHash,
51 },
52 CopySnarkedLedgerContentsForSync {
54 origin_snarked_ledger_hash: Vec<LedgerHash>,
55 target_snarked_ledger_hash: LedgerHash,
56 overwrite: bool,
57 },
58 GetProducersWithDelegates {
60 ledger_hash: LedgerHash,
61 filter: fn(&CompressedPubKey) -> bool,
62 },
63 GetMask {
65 ledger_hash: LedgerHash,
66 },
67 InsertGenesisLedger {
68 mask: Mask,
69 },
70 StagedLedgerReconstructResult {
71 staged_ledger_hash: LedgerHash,
72 result: Result<StagedLedger, String>,
73 },
74}
75
76#[derive(Debug)]
77pub enum LedgerResponse {
78 Write(LedgerWriteResponse),
79 Read(LedgerReadId, LedgerReadResponse),
80 ChildHashes(Option<(LedgerHash, LedgerHash)>),
81 AccountsSet(Result<LedgerHash, String>),
82 AccountsGet(Result<Vec<Account>, String>),
83 LedgerMask(Option<(Mask, bool)>),
84 #[allow(clippy::type_complexity)]
85 ProducersWithDelegatesMap(
86 Option<BTreeMap<AccountPublicKey, Vec<(ledger::AccountIndex, AccountPublicKey, u64)>>>,
87 ),
88 SnarkedLedgerContentsCopied(Result<bool, String>),
89 Success, }
91
92impl LedgerRequest {
93 fn handle(
94 self,
95 ledger_ctx: &mut LedgerCtx,
96 caller: &LedgerCaller,
97 force_sync: bool,
98 ) -> LedgerResponse {
99 match self {
100 Self::Write(request) => LedgerResponse::Write(match request {
101 LedgerWriteRequest::StagedLedgerReconstruct {
102 snarked_ledger_hash,
103 parts,
104 } => {
105 if !force_sync {
106 let caller = caller.clone();
107 let cb = move |staged_ledger_hash, result| {
108 caller.call(LedgerRequest::StagedLedgerReconstructResult {
109 staged_ledger_hash,
110 result,
111 })
112 };
113 if let Err(e) =
114 ledger_ctx.staged_ledger_reconstruct(snarked_ledger_hash, parts, cb)
115 {
116 mina_core::log::inner::error!(
117 "Failed to reconstruct staged ledger: {:?}",
118 e
119 );
120 }
122 return LedgerResponse::Success;
123 } else {
124 let (staged_ledger_hash, result) = match ledger_ctx
125 .staged_ledger_reconstruct_sync(snarked_ledger_hash, parts)
126 {
127 Ok(result) => result,
128 Err(e) => (v2::LedgerHash::zero(), Err(String::from(e))),
129 };
130
131 LedgerWriteResponse::StagedLedgerReconstruct {
132 staged_ledger_hash,
133 result,
134 }
135 }
136 }
137 LedgerWriteRequest::StagedLedgerDiffCreate {
138 pred_block,
139 global_slot_since_genesis: global_slot,
140 is_new_epoch,
141 producer,
142 delegator,
143 coinbase_receiver,
144 completed_snarks,
145 supercharge_coinbase,
146 transactions_by_fee,
147 } => {
148 let pred_block_hash = pred_block.hash().clone();
149 let global_slot_since_genesis = global_slot.clone();
150 let result = ledger_ctx.staged_ledger_diff_create(
151 pred_block,
152 global_slot,
153 is_new_epoch,
154 producer,
155 delegator,
156 coinbase_receiver,
157 completed_snarks,
158 supercharge_coinbase,
159 transactions_by_fee,
160 );
161 LedgerWriteResponse::StagedLedgerDiffCreate {
162 pred_block_hash,
163 global_slot_since_genesis,
164 result: result.map(Into::into),
165 }
166 }
167 LedgerWriteRequest::BlockApply {
168 block,
169 pred_block,
170 skip_verification,
171 } => {
172 let block_hash = block.hash().clone();
173 let skip_verification = if skip_verification {
174 Some(SkipVerification::All)
175 } else {
176 None
177 };
178 let result = ledger_ctx.block_apply(block, pred_block, skip_verification);
179 LedgerWriteResponse::BlockApply { block_hash, result }
180 }
181 LedgerWriteRequest::Commit {
182 ledgers_to_keep,
183 root_snarked_ledger_updates,
184 needed_protocol_states,
185 new_root,
186 new_best_tip,
187 } => {
188 let best_tip_hash = new_best_tip.hash().clone();
189 let result = ledger_ctx.commit(
190 ledgers_to_keep,
191 root_snarked_ledger_updates,
192 needed_protocol_states,
193 &new_root,
194 &new_best_tip,
195 );
196 LedgerWriteResponse::Commit {
197 best_tip_hash,
198 result,
199 }
200 }
201 }),
202 Self::Read(id, request) => LedgerResponse::Read(
203 id,
204 match request {
205 LedgerReadRequest::DelegatorTable(ledger_hash, producer) => {
206 let res = ledger_ctx
207 .producers_with_delegates(&ledger_hash, |pub_key| {
208 AccountPublicKey::from(pub_key.clone()) == producer
209 })
210 .and_then(|list| list.into_iter().next())
211 .map(|(_, table)| {
212 table
213 .into_iter()
214 .map(|(index, pub_key, balance)| (index, (pub_key, balance)))
215 .collect()
216 });
217
218 LedgerReadResponse::DelegatorTable(res)
219 }
220 LedgerReadRequest::GetNumAccounts(ledger_hash) => {
221 let res = ledger_ctx.get_num_accounts(ledger_hash);
222 LedgerReadResponse::GetNumAccounts(res)
223 }
224 LedgerReadRequest::GetChildHashesAtAddr(ledger_hash, addr) => {
225 let res = ledger_ctx.get_child_hashes(ledger_hash, addr);
226 LedgerReadResponse::GetChildHashesAtAddr(res)
227 }
228 LedgerReadRequest::GetChildAccountsAtAddr(ledger_hash, addr) => {
229 let res = ledger_ctx.get_child_accounts(ledger_hash, addr);
230 LedgerReadResponse::GetChildAccountsAtAddr(res)
231 }
232 LedgerReadRequest::GetStagedLedgerAuxAndPendingCoinbases(data) => {
233 let res = ledger_ctx.staged_ledger_aux_and_pending_coinbase(
234 &data.ledger_hash,
235 data.protocol_states,
236 );
237 LedgerReadResponse::GetStagedLedgerAuxAndPendingCoinbases(res)
238 }
239 LedgerReadRequest::ScanStateSummary(ledger_hash) => {
240 let res = ledger_ctx.scan_state_summary(&ledger_hash);
241 LedgerReadResponse::ScanStateSummary(res)
242 }
243 LedgerReadRequest::GetAccounts(ledger_hash, account_ids, rpc_id) => {
244 let res = ledger_ctx.get_accounts(ledger_hash, account_ids);
245 LedgerReadResponse::GetAccounts(res, rpc_id)
246 }
247 LedgerReadRequest::AccountsForRpc(rpc_id, ledger_hash, account_query) => {
248 let res = match &account_query {
249 AccountQuery::All => ledger_ctx.get_accounts_for_rpc(ledger_hash, None),
250 AccountQuery::SinglePublicKey(public_key) => ledger_ctx
251 .get_accounts_for_rpc(ledger_hash, Some(public_key.clone())),
252 AccountQuery::PubKeyWithTokenId(public_key, token_id_key_hash) => {
253 let id = AccountId {
254 public_key: public_key.clone().try_into().unwrap(),
255 token_id: token_id_key_hash.clone().into(),
256 };
257 ledger_ctx.get_accounts(ledger_hash, vec![id])
258 }
259 AccountQuery::MultipleIds(ids) => {
260 ledger_ctx.get_accounts(ledger_hash, ids.clone())
261 }
262 };
263
264 LedgerReadResponse::AccountsForRpc(rpc_id, res, account_query)
265 }
266 LedgerReadRequest::GetLedgerStatus(rpc_id, ledger_hash) => {
267 let res = ledger_ctx.get_num_accounts(ledger_hash).map(
268 |(num_accounts, ledger_hash)| LedgerStatus {
269 num_accounts,
270 best_tip_staged_ledger_hash: ledger_hash,
271 },
272 );
273
274 LedgerReadResponse::GetLedgerStatus(rpc_id, res)
275 }
276 LedgerReadRequest::GetAccountDelegators(rpc_id, ledger_hash, account_id) => {
277 let res = ledger_ctx.get_account_delegators(&ledger_hash, &account_id);
278 LedgerReadResponse::GetAccountDelegators(rpc_id, res)
279 }
280 },
281 ),
282 LedgerRequest::AccountsSet {
283 snarked_ledger_hash,
284 parent,
285 accounts,
286 } => LedgerResponse::AccountsSet(ledger_ctx.accounts_set(
287 snarked_ledger_hash,
288 &parent,
289 accounts,
290 )),
291 LedgerRequest::ChildHashesGet {
292 snarked_ledger_hash,
293 parent,
294 } => {
295 let res = ledger_ctx.get_child_hashes(snarked_ledger_hash, parent);
296 LedgerResponse::ChildHashes(res)
297 }
298 LedgerRequest::ComputeSnarkedLedgerHashes {
299 snarked_ledger_hash,
300 } => {
301 ledger_ctx
302 .compute_snarked_ledger_hashes(&snarked_ledger_hash)
303 .unwrap();
304 LedgerResponse::Success
305 }
306 LedgerRequest::CopySnarkedLedgerContentsForSync {
307 origin_snarked_ledger_hash,
308 target_snarked_ledger_hash,
309 overwrite,
310 } => {
311 let origin_snarked_ledger_hash = origin_snarked_ledger_hash
312 .iter()
313 .find(|hash| ledger_ctx.contains_snarked_ledger(hash))
314 .unwrap_or_else(|| {
315 origin_snarked_ledger_hash
316 .first()
317 .expect("origin_snarked_ledger_hash cannot be empty")
318 })
319 .clone();
320
321 let res = ledger_ctx.copy_snarked_ledger_contents_for_sync(
322 origin_snarked_ledger_hash,
323 target_snarked_ledger_hash,
324 overwrite,
325 );
326 LedgerResponse::SnarkedLedgerContentsCopied(res)
327 }
328 LedgerRequest::GetMask { ledger_hash } => {
329 LedgerResponse::LedgerMask(ledger_ctx.mask(&ledger_hash))
330 }
331 LedgerRequest::GetProducersWithDelegates {
332 ledger_hash,
333 filter,
334 } => {
335 let res = ledger_ctx.producers_with_delegates(&ledger_hash, filter);
336 LedgerResponse::ProducersWithDelegatesMap(res)
337 }
338 LedgerRequest::InsertGenesisLedger { mask } => {
339 ledger_ctx.insert_genesis_ledger(mask);
340 LedgerResponse::Success
341 }
342 LedgerRequest::StagedLedgerReconstructResult {
343 staged_ledger_hash,
344 result,
345 } => {
346 let result = match result {
347 Err(err) => Err(err),
348 Ok(ledger) => {
349 ledger_ctx.staged_ledger_reconstruct_result_store(ledger);
350 Ok(())
351 }
352 };
353 LedgerResponse::Write(LedgerWriteResponse::StagedLedgerReconstruct {
354 staged_ledger_hash,
355 result,
356 })
357 }
358 LedgerRequest::AccountsGet {
359 ledger_hash,
360 account_ids,
361 } => {
362 let res = ledger_ctx.get_accounts(ledger_hash, account_ids);
363 LedgerResponse::AccountsGet(Ok(res))
364 }
365 }
366 }
367}
368
369struct LedgerRequestWithChan {
370 request: LedgerRequest,
371 responder: Option<std::sync::mpsc::SyncSender<LedgerResponse>>,
372}
373
374pub struct LedgerManager {
375 caller: LedgerCaller,
376 join_handle: thread::JoinHandle<LedgerCtx>,
377}
378
379#[derive(Clone)]
380pub(super) struct LedgerCaller(mpsc::TrackedUnboundedSender<LedgerRequestWithChan>);
381
382impl LedgerManager {
383 pub fn spawn(mut ledger_ctx: LedgerCtx) -> LedgerManager {
384 let (sender, mut receiver) = mpsc::tracked_unbounded_channel();
385 let caller = LedgerCaller(sender);
386 let ledger_caller = caller.clone();
387
388 let ledger_manager_loop = move || {
389 while let Some(msg) = receiver.blocking_recv() {
390 let LedgerRequestWithChan { request, responder } = msg.0;
391 let response = request.handle(&mut ledger_ctx, &ledger_caller, responder.is_some());
392 match (response, responder) {
393 (LedgerResponse::Write(resp), None) => {
394 ledger_ctx.send_write_response(resp);
395 }
396 (LedgerResponse::Write(resp), Some(responder)) => {
397 ledger_ctx.send_write_response(resp.clone());
398 let _ = responder.send(LedgerResponse::Write(resp));
399 }
400 (LedgerResponse::Read(id, resp), None) => {
401 ledger_ctx.send_read_response(id, resp);
402 }
403 (LedgerResponse::Read(id, resp), Some(responder)) => {
404 ledger_ctx.send_read_response(id, resp.clone());
405 let _ = responder.send(LedgerResponse::Read(id, resp));
406 }
407 (resp, Some(responder)) => {
408 let _ = responder.send(resp);
409 }
410 (_, None) => {}
411 }
412 }
413 ledger_ctx
414 };
415 let join_handle = thread::Builder::new()
416 .name("ledger-manager".into())
417 .spawn(ledger_manager_loop)
418 .expect("Failed: ledger manager");
419 LedgerManager {
420 caller,
421 join_handle,
422 }
423 }
424
425 pub fn pending_calls(&self) -> usize {
426 self.caller.0.len()
427 }
428
429 pub(super) fn call(&self, request: LedgerRequest) {
430 self.caller.call(request)
431 }
432
433 pub(super) fn call_sync(
434 &self,
435 request: LedgerRequest,
436 ) -> Result<LedgerResponse, std::sync::mpsc::RecvError> {
437 self.caller.call_sync(request)
438 }
439
440 pub async fn wait_for_stop(self) -> thread::Result<LedgerCtx> {
441 self.join_handle.join()
442 }
443
444 pub fn insert_genesis_ledger(&self, mask: Mask) {
445 self.call(LedgerRequest::InsertGenesisLedger { mask });
446 }
447
448 pub fn get_mask(&self, ledger_hash: &LedgerHash) -> Option<(Mask, bool)> {
449 match self.call_sync(LedgerRequest::GetMask {
450 ledger_hash: ledger_hash.clone(),
451 }) {
452 Ok(LedgerResponse::LedgerMask(mask)) => mask,
453 _ => panic!("get_mask failed"),
454 }
455 }
456
457 pub fn get_accounts(
458 &self,
459 ledger_hash: &LedgerHash,
460 account_ids: Vec<AccountId>,
461 ) -> Result<Vec<Account>, String> {
462 match self.call_sync(LedgerRequest::AccountsGet {
464 ledger_hash: ledger_hash.clone(),
465 account_ids,
466 }) {
467 Ok(LedgerResponse::AccountsGet(result)) => result,
468 _ => panic!("get_accounts failed"),
469 }
470 }
471
472 #[allow(clippy::type_complexity)]
473 pub fn producers_with_delegates(
474 &self,
475 ledger_hash: &LedgerHash,
476 filter: fn(&CompressedPubKey) -> bool,
477 ) -> Option<BTreeMap<AccountPublicKey, Vec<(ledger::AccountIndex, AccountPublicKey, u64)>>>
478 {
479 match self.call_sync(LedgerRequest::GetProducersWithDelegates {
480 ledger_hash: ledger_hash.clone(),
481 filter,
482 }) {
483 Ok(LedgerResponse::ProducersWithDelegatesMap(map)) => map,
484 _ => panic!("producers_with_delegates failed"),
485 }
486 }
487}
488
489impl LedgerCaller {
490 pub fn call(&self, request: LedgerRequest) {
491 self.0
492 .tracked_send(LedgerRequestWithChan {
493 request,
494 responder: None,
495 })
496 .unwrap();
497 }
498
499 fn call_sync(
500 &self,
501 request: LedgerRequest,
502 ) -> Result<LedgerResponse, std::sync::mpsc::RecvError> {
503 let (responder, receiver) = std::sync::mpsc::sync_channel(0);
504 self.0
505 .tracked_send(LedgerRequestWithChan {
506 request,
507 responder: Some(responder),
508 })
509 .unwrap();
510 receiver.recv()
511 }
512}
513
514fn format_response_error(method: &str, res: LedgerResponse) -> String {
515 format!("LedgerManager::{method}: unexpected response: {res:?}")
516}
517
518impl<T: LedgerService> TransitionFrontierSyncLedgerSnarkedService for T {
519 fn compute_snarked_ledger_hashes(
520 &self,
521 snarked_ledger_hash: &LedgerHash,
522 ) -> Result<(), String> {
523 self.ledger_manager()
524 .call_sync(LedgerRequest::ComputeSnarkedLedgerHashes {
525 snarked_ledger_hash: snarked_ledger_hash.clone(),
526 })
527 .map_err(|_| "compute_snarked_ledger_hashes responder dropped".to_owned())
528 .and_then(|res| {
529 if let LedgerResponse::Success = res {
530 Ok(())
531 } else {
532 Err(format_response_error("compute_snarked_ledger_hashes", res))
533 }
534 })
535 }
536
537 fn copy_snarked_ledger_contents_for_sync(
538 &self,
539 origin_snarked_ledger_hash: Vec<LedgerHash>,
540 target_snarked_ledger_hash: LedgerHash,
541 overwrite: bool,
542 ) -> Result<bool, String> {
543 self.ledger_manager()
544 .call_sync(LedgerRequest::CopySnarkedLedgerContentsForSync {
545 origin_snarked_ledger_hash,
546 target_snarked_ledger_hash,
547 overwrite,
548 })
549 .map_err(|_| "copy_snarked_ledger_contents_for_sync responder dropped".to_owned())
550 .and_then(|res| {
551 if let LedgerResponse::SnarkedLedgerContentsCopied(copied) = res {
552 copied
553 } else {
554 Err(format_response_error(
555 "copy_snarked_ledger_contents_for_sync",
556 res,
557 ))
558 }
559 })
560 }
561
562 fn child_hashes_get(
563 &self,
564 snarked_ledger_hash: LedgerHash,
565 parent: &LedgerAddress,
566 ) -> Result<(LedgerHash, LedgerHash), String> {
567 self.ledger_manager()
568 .call_sync(LedgerRequest::ChildHashesGet {
569 snarked_ledger_hash,
570 parent: parent.clone(),
571 })
572 .map_err(|_| "child_hashes_get responder dropped".to_owned())
573 .and_then(|res| {
574 if let LedgerResponse::ChildHashes(Some(res)) = res {
575 Ok(res)
576 } else {
577 Err(format_response_error("child_hashes_get", res))
578 }
579 })
580 }
581
582 fn accounts_set(
583 &self,
584 snarked_ledger_hash: LedgerHash,
585 parent: &LedgerAddress,
586 accounts: Vec<MinaBaseAccountBinableArgStableV2>,
587 ) -> Result<LedgerHash, String> {
588 self.ledger_manager()
589 .call_sync(LedgerRequest::AccountsSet {
590 snarked_ledger_hash,
591 parent: parent.clone(),
592 accounts,
593 })
594 .map_err(|_| "accounts_set responder dropped".to_owned())
595 .and_then(|res| {
596 if let LedgerResponse::AccountsSet(res) = res {
597 res
598 } else {
599 Err(format_response_error("accounts_set", res))
600 }
601 })
602 }
603}