1use super::{
2 read::{LedgerReadId, LedgerReadRequest, LedgerReadResponse, LedgerStatus},
3 write::{LedgerWriteRequest, LedgerWriteResponse},
4 LedgerCtx, LedgerService,
5};
6use crate::{
7 account::AccountPublicKey, ledger::LedgerAddress, rpc::AccountQuery,
8 transition_frontier::sync::ledger::snarked::TransitionFrontierSyncLedgerSnarkedService,
9};
10use ledger::{
11 staged_ledger::staged_ledger::{SkipVerification, StagedLedger},
12 Account, AccountId, Mask,
13};
14use mina_p2p_messages::v2::{self, LedgerHash, MinaBaseAccountBinableArgStableV2};
15use mina_signer::CompressedPubKey;
16use openmina_core::{channels::mpsc, thread};
17use std::collections::BTreeMap;
18
19#[allow(dead_code)] pub(super) enum LedgerRequest {
27 Write(LedgerWriteRequest),
28 Read(LedgerReadId, LedgerReadRequest),
29 AccountsSet {
30 snarked_ledger_hash: LedgerHash,
31 parent: LedgerAddress,
32 accounts: Vec<MinaBaseAccountBinableArgStableV2>,
33 }, AccountsGet {
35 ledger_hash: LedgerHash,
36 account_ids: Vec<AccountId>,
37 }, ChildHashesGet {
39 snarked_ledger_hash: LedgerHash,
40 parent: LedgerAddress,
41 }, ComputeSnarkedLedgerHashes {
43 snarked_ledger_hash: LedgerHash,
44 }, CopySnarkedLedgerContentsForSync {
46 origin_snarked_ledger_hash: Vec<LedgerHash>,
47 target_snarked_ledger_hash: LedgerHash,
48 overwrite: bool,
49 }, GetProducersWithDelegates {
51 ledger_hash: LedgerHash,
52 filter: fn(&CompressedPubKey) -> bool,
53 }, GetMask {
55 ledger_hash: LedgerHash,
56 }, InsertGenesisLedger {
58 mask: Mask,
59 },
60 StagedLedgerReconstructResult {
61 staged_ledger_hash: LedgerHash,
62 result: Result<StagedLedger, String>,
63 },
64}
65
66#[derive(Debug)]
67pub enum LedgerResponse {
68 Write(LedgerWriteResponse),
69 Read(LedgerReadId, LedgerReadResponse),
70 ChildHashes(Option<(LedgerHash, LedgerHash)>),
71 AccountsSet(Result<LedgerHash, String>),
72 AccountsGet(Result<Vec<Account>, String>),
73 LedgerMask(Option<(Mask, bool)>),
74 #[allow(clippy::type_complexity)]
75 ProducersWithDelegatesMap(
76 Option<BTreeMap<AccountPublicKey, Vec<(ledger::AccountIndex, AccountPublicKey, u64)>>>,
77 ),
78 SnarkedLedgerContentsCopied(Result<bool, String>),
79 Success, }
81
82impl LedgerRequest {
83 fn handle(
84 self,
85 ledger_ctx: &mut LedgerCtx,
86 caller: &LedgerCaller,
87 force_sync: bool,
88 ) -> LedgerResponse {
89 match self {
90 Self::Write(request) => LedgerResponse::Write(match request {
91 LedgerWriteRequest::StagedLedgerReconstruct {
92 snarked_ledger_hash,
93 parts,
94 } => {
95 if !force_sync {
96 let caller = caller.clone();
97 let cb = move |staged_ledger_hash, result| {
98 caller.call(LedgerRequest::StagedLedgerReconstructResult {
99 staged_ledger_hash,
100 result,
101 })
102 };
103 if let Err(e) =
104 ledger_ctx.staged_ledger_reconstruct(snarked_ledger_hash, parts, cb)
105 {
106 openmina_core::log::inner::error!(
107 "Failed to reconstruct staged ledger: {:?}",
108 e
109 );
110 }
112 return LedgerResponse::Success;
113 } else {
114 let (staged_ledger_hash, result) = match ledger_ctx
115 .staged_ledger_reconstruct_sync(snarked_ledger_hash, parts)
116 {
117 Ok(result) => result,
118 Err(e) => (v2::LedgerHash::zero(), Err(String::from(e))),
119 };
120
121 LedgerWriteResponse::StagedLedgerReconstruct {
122 staged_ledger_hash,
123 result,
124 }
125 }
126 }
127 LedgerWriteRequest::StagedLedgerDiffCreate {
128 pred_block,
129 global_slot_since_genesis: global_slot,
130 is_new_epoch,
131 producer,
132 delegator,
133 coinbase_receiver,
134 completed_snarks,
135 supercharge_coinbase,
136 transactions_by_fee,
137 } => {
138 let pred_block_hash = pred_block.hash().clone();
139 let global_slot_since_genesis = global_slot.clone();
140 let result = ledger_ctx.staged_ledger_diff_create(
141 pred_block,
142 global_slot,
143 is_new_epoch,
144 producer,
145 delegator,
146 coinbase_receiver,
147 completed_snarks,
148 supercharge_coinbase,
149 transactions_by_fee,
150 );
151 LedgerWriteResponse::StagedLedgerDiffCreate {
152 pred_block_hash,
153 global_slot_since_genesis,
154 result: result.map(Into::into),
155 }
156 }
157 LedgerWriteRequest::BlockApply {
158 block,
159 pred_block,
160 skip_verification,
161 } => {
162 let block_hash = block.hash().clone();
163 let skip_verification = if skip_verification {
164 Some(SkipVerification::All)
165 } else {
166 None
167 };
168 let result = ledger_ctx.block_apply(block, pred_block, skip_verification);
169 LedgerWriteResponse::BlockApply { block_hash, result }
170 }
171 LedgerWriteRequest::Commit {
172 ledgers_to_keep,
173 root_snarked_ledger_updates,
174 needed_protocol_states,
175 new_root,
176 new_best_tip,
177 } => {
178 let best_tip_hash = new_best_tip.hash().clone();
179 let result = ledger_ctx.commit(
180 ledgers_to_keep,
181 root_snarked_ledger_updates,
182 needed_protocol_states,
183 &new_root,
184 &new_best_tip,
185 );
186 LedgerWriteResponse::Commit {
187 best_tip_hash,
188 result,
189 }
190 }
191 }),
192 Self::Read(id, request) => LedgerResponse::Read(
193 id,
194 match request {
195 LedgerReadRequest::DelegatorTable(ledger_hash, producer) => {
196 let res = ledger_ctx
197 .producers_with_delegates(&ledger_hash, |pub_key| {
198 AccountPublicKey::from(pub_key.clone()) == producer
199 })
200 .and_then(|list| list.into_iter().next())
201 .map(|(_, table)| {
202 table
203 .into_iter()
204 .map(|(index, pub_key, balance)| (index, (pub_key, balance)))
205 .collect()
206 });
207
208 LedgerReadResponse::DelegatorTable(res)
209 }
210 LedgerReadRequest::GetNumAccounts(ledger_hash) => {
211 let res = ledger_ctx.get_num_accounts(ledger_hash);
212 LedgerReadResponse::GetNumAccounts(res)
213 }
214 LedgerReadRequest::GetChildHashesAtAddr(ledger_hash, addr) => {
215 let res = ledger_ctx.get_child_hashes(ledger_hash, addr);
216 LedgerReadResponse::GetChildHashesAtAddr(res)
217 }
218 LedgerReadRequest::GetChildAccountsAtAddr(ledger_hash, addr) => {
219 let res = ledger_ctx.get_child_accounts(ledger_hash, addr);
220 LedgerReadResponse::GetChildAccountsAtAddr(res)
221 }
222 LedgerReadRequest::GetStagedLedgerAuxAndPendingCoinbases(data) => {
223 let res = ledger_ctx.staged_ledger_aux_and_pending_coinbase(
224 &data.ledger_hash,
225 data.protocol_states,
226 );
227 LedgerReadResponse::GetStagedLedgerAuxAndPendingCoinbases(res)
228 }
229 LedgerReadRequest::ScanStateSummary(ledger_hash) => {
230 let res = ledger_ctx.scan_state_summary(&ledger_hash);
231 LedgerReadResponse::ScanStateSummary(res)
232 }
233 LedgerReadRequest::GetAccounts(ledger_hash, account_ids, rpc_id) => {
234 let res = ledger_ctx.get_accounts(ledger_hash, account_ids);
235 LedgerReadResponse::GetAccounts(res, rpc_id)
236 }
237 LedgerReadRequest::AccountsForRpc(rpc_id, ledger_hash, account_query) => {
238 let res = match &account_query {
239 AccountQuery::All => ledger_ctx.get_accounts_for_rpc(ledger_hash, None),
240 AccountQuery::SinglePublicKey(public_key) => ledger_ctx
241 .get_accounts_for_rpc(ledger_hash, Some(public_key.clone())),
242 AccountQuery::PubKeyWithTokenId(public_key, token_id_key_hash) => {
243 let id = AccountId {
244 public_key: public_key.clone().try_into().unwrap(),
245 token_id: token_id_key_hash.clone().into(),
246 };
247 ledger_ctx.get_accounts(ledger_hash, vec![id])
248 }
249 AccountQuery::MultipleIds(ids) => {
250 ledger_ctx.get_accounts(ledger_hash, ids.clone())
251 }
252 };
253
254 LedgerReadResponse::AccountsForRpc(rpc_id, res, account_query)
255 }
256 LedgerReadRequest::GetLedgerStatus(rpc_id, ledger_hash) => {
257 let res = ledger_ctx.get_num_accounts(ledger_hash).map(
258 |(num_accounts, ledger_hash)| LedgerStatus {
259 num_accounts,
260 best_tip_staged_ledger_hash: ledger_hash,
261 },
262 );
263
264 LedgerReadResponse::GetLedgerStatus(rpc_id, res)
265 }
266 LedgerReadRequest::GetAccountDelegators(rpc_id, ledger_hash, account_id) => {
267 let res = ledger_ctx.get_account_delegators(&ledger_hash, &account_id);
268 LedgerReadResponse::GetAccountDelegators(rpc_id, res)
269 }
270 },
271 ),
272 LedgerRequest::AccountsSet {
273 snarked_ledger_hash,
274 parent,
275 accounts,
276 } => LedgerResponse::AccountsSet(ledger_ctx.accounts_set(
277 snarked_ledger_hash,
278 &parent,
279 accounts,
280 )),
281 LedgerRequest::ChildHashesGet {
282 snarked_ledger_hash,
283 parent,
284 } => {
285 let res = ledger_ctx.get_child_hashes(snarked_ledger_hash, parent);
286 LedgerResponse::ChildHashes(res)
287 }
288 LedgerRequest::ComputeSnarkedLedgerHashes {
289 snarked_ledger_hash,
290 } => {
291 ledger_ctx
292 .compute_snarked_ledger_hashes(&snarked_ledger_hash)
293 .unwrap();
294 LedgerResponse::Success
295 }
296 LedgerRequest::CopySnarkedLedgerContentsForSync {
297 origin_snarked_ledger_hash,
298 target_snarked_ledger_hash,
299 overwrite,
300 } => {
301 let origin_snarked_ledger_hash = origin_snarked_ledger_hash
302 .iter()
303 .find(|hash| ledger_ctx.contains_snarked_ledger(hash))
304 .unwrap_or_else(|| {
305 origin_snarked_ledger_hash
306 .first()
307 .expect("origin_snarked_ledger_hash cannot be empty")
308 })
309 .clone();
310
311 let res = ledger_ctx.copy_snarked_ledger_contents_for_sync(
312 origin_snarked_ledger_hash,
313 target_snarked_ledger_hash,
314 overwrite,
315 );
316 LedgerResponse::SnarkedLedgerContentsCopied(res)
317 }
318 LedgerRequest::GetMask { ledger_hash } => {
319 LedgerResponse::LedgerMask(ledger_ctx.mask(&ledger_hash))
320 }
321 LedgerRequest::GetProducersWithDelegates {
322 ledger_hash,
323 filter,
324 } => {
325 let res = ledger_ctx.producers_with_delegates(&ledger_hash, filter);
326 LedgerResponse::ProducersWithDelegatesMap(res)
327 }
328 LedgerRequest::InsertGenesisLedger { mask } => {
329 ledger_ctx.insert_genesis_ledger(mask);
330 LedgerResponse::Success
331 }
332 LedgerRequest::StagedLedgerReconstructResult {
333 staged_ledger_hash,
334 result,
335 } => {
336 let result = match result {
337 Err(err) => Err(err),
338 Ok(ledger) => {
339 ledger_ctx.staged_ledger_reconstruct_result_store(ledger);
340 Ok(())
341 }
342 };
343 LedgerResponse::Write(LedgerWriteResponse::StagedLedgerReconstruct {
344 staged_ledger_hash,
345 result,
346 })
347 }
348 LedgerRequest::AccountsGet {
349 ledger_hash,
350 account_ids,
351 } => {
352 let res = ledger_ctx.get_accounts(ledger_hash, account_ids);
353 LedgerResponse::AccountsGet(Ok(res))
354 }
355 }
356 }
357}
358
359struct LedgerRequestWithChan {
360 request: LedgerRequest,
361 responder: Option<std::sync::mpsc::SyncSender<LedgerResponse>>,
362}
363
364pub struct LedgerManager {
365 caller: LedgerCaller,
366 join_handle: thread::JoinHandle<LedgerCtx>,
367}
368
369#[derive(Clone)]
370pub(super) struct LedgerCaller(mpsc::TrackedUnboundedSender<LedgerRequestWithChan>);
371
372impl LedgerManager {
373 pub fn spawn(mut ledger_ctx: LedgerCtx) -> LedgerManager {
374 let (sender, mut receiver) = mpsc::tracked_unbounded_channel();
375 let caller = LedgerCaller(sender);
376 let ledger_caller = caller.clone();
377
378 let ledger_manager_loop = move || {
379 while let Some(msg) = receiver.blocking_recv() {
380 let LedgerRequestWithChan { request, responder } = msg.0;
381 let response = request.handle(&mut ledger_ctx, &ledger_caller, responder.is_some());
382 match (response, responder) {
383 (LedgerResponse::Write(resp), None) => {
384 ledger_ctx.send_write_response(resp);
385 }
386 (LedgerResponse::Write(resp), Some(responder)) => {
387 ledger_ctx.send_write_response(resp.clone());
388 let _ = responder.send(LedgerResponse::Write(resp));
389 }
390 (LedgerResponse::Read(id, resp), None) => {
391 ledger_ctx.send_read_response(id, resp);
392 }
393 (LedgerResponse::Read(id, resp), Some(responder)) => {
394 ledger_ctx.send_read_response(id, resp.clone());
395 let _ = responder.send(LedgerResponse::Read(id, resp));
396 }
397 (resp, Some(responder)) => {
398 let _ = responder.send(resp);
399 }
400 (_, None) => {}
401 }
402 }
403 ledger_ctx
404 };
405 let join_handle = thread::Builder::new()
406 .name("ledger-manager".into())
407 .spawn(ledger_manager_loop)
408 .expect("Failed: ledger manager");
409 LedgerManager {
410 caller,
411 join_handle,
412 }
413 }
414
415 pub fn pending_calls(&self) -> usize {
416 self.caller.0.len()
417 }
418
419 pub(super) fn call(&self, request: LedgerRequest) {
420 self.caller.call(request)
421 }
422
423 pub(super) fn call_sync(
424 &self,
425 request: LedgerRequest,
426 ) -> Result<LedgerResponse, std::sync::mpsc::RecvError> {
427 self.caller.call_sync(request)
428 }
429
430 pub async fn wait_for_stop(self) -> thread::Result<LedgerCtx> {
431 self.join_handle.join()
432 }
433
434 pub fn insert_genesis_ledger(&self, mask: Mask) {
435 self.call(LedgerRequest::InsertGenesisLedger { mask });
436 }
437
438 pub fn get_mask(&self, ledger_hash: &LedgerHash) -> Option<(Mask, bool)> {
439 match self.call_sync(LedgerRequest::GetMask {
440 ledger_hash: ledger_hash.clone(),
441 }) {
442 Ok(LedgerResponse::LedgerMask(mask)) => mask,
443 _ => panic!("get_mask failed"),
444 }
445 }
446
447 pub fn get_accounts(
448 &self,
449 ledger_hash: &LedgerHash,
450 account_ids: Vec<AccountId>,
451 ) -> Result<Vec<Account>, String> {
452 match self.call_sync(LedgerRequest::AccountsGet {
454 ledger_hash: ledger_hash.clone(),
455 account_ids,
456 }) {
457 Ok(LedgerResponse::AccountsGet(result)) => result,
458 _ => panic!("get_accounts failed"),
459 }
460 }
461
462 #[allow(clippy::type_complexity)]
463 pub fn producers_with_delegates(
464 &self,
465 ledger_hash: &LedgerHash,
466 filter: fn(&CompressedPubKey) -> bool,
467 ) -> Option<BTreeMap<AccountPublicKey, Vec<(ledger::AccountIndex, AccountPublicKey, u64)>>>
468 {
469 match self.call_sync(LedgerRequest::GetProducersWithDelegates {
470 ledger_hash: ledger_hash.clone(),
471 filter,
472 }) {
473 Ok(LedgerResponse::ProducersWithDelegatesMap(map)) => map,
474 _ => panic!("producers_with_delegates failed"),
475 }
476 }
477}
478
479impl LedgerCaller {
480 pub fn call(&self, request: LedgerRequest) {
481 self.0
482 .tracked_send(LedgerRequestWithChan {
483 request,
484 responder: None,
485 })
486 .unwrap();
487 }
488
489 fn call_sync(
490 &self,
491 request: LedgerRequest,
492 ) -> Result<LedgerResponse, std::sync::mpsc::RecvError> {
493 let (responder, receiver) = std::sync::mpsc::sync_channel(0);
494 self.0
495 .tracked_send(LedgerRequestWithChan {
496 request,
497 responder: Some(responder),
498 })
499 .unwrap();
500 receiver.recv()
501 }
502}
503
504fn format_response_error(method: &str, res: LedgerResponse) -> String {
505 format!("LedgerManager::{method}: unexpected response: {res:?}")
506}
507
508impl<T: LedgerService> TransitionFrontierSyncLedgerSnarkedService for T {
509 fn compute_snarked_ledger_hashes(
510 &self,
511 snarked_ledger_hash: &LedgerHash,
512 ) -> Result<(), String> {
513 self.ledger_manager()
514 .call_sync(LedgerRequest::ComputeSnarkedLedgerHashes {
515 snarked_ledger_hash: snarked_ledger_hash.clone(),
516 })
517 .map_err(|_| "compute_snarked_ledger_hashes responder dropped".to_owned())
518 .and_then(|res| {
519 if let LedgerResponse::Success = res {
520 Ok(())
521 } else {
522 Err(format_response_error("compute_snarked_ledger_hashes", res))
523 }
524 })
525 }
526
527 fn copy_snarked_ledger_contents_for_sync(
528 &self,
529 origin_snarked_ledger_hash: Vec<LedgerHash>,
530 target_snarked_ledger_hash: LedgerHash,
531 overwrite: bool,
532 ) -> Result<bool, String> {
533 self.ledger_manager()
534 .call_sync(LedgerRequest::CopySnarkedLedgerContentsForSync {
535 origin_snarked_ledger_hash,
536 target_snarked_ledger_hash,
537 overwrite,
538 })
539 .map_err(|_| "copy_snarked_ledger_contents_for_sync responder dropped".to_owned())
540 .and_then(|res| {
541 if let LedgerResponse::SnarkedLedgerContentsCopied(copied) = res {
542 copied
543 } else {
544 Err(format_response_error(
545 "copy_snarked_ledger_contents_for_sync",
546 res,
547 ))
548 }
549 })
550 }
551
552 fn child_hashes_get(
553 &self,
554 snarked_ledger_hash: LedgerHash,
555 parent: &LedgerAddress,
556 ) -> Result<(LedgerHash, LedgerHash), String> {
557 self.ledger_manager()
558 .call_sync(LedgerRequest::ChildHashesGet {
559 snarked_ledger_hash,
560 parent: parent.clone(),
561 })
562 .map_err(|_| "child_hashes_get responder dropped".to_owned())
563 .and_then(|res| {
564 if let LedgerResponse::ChildHashes(Some(res)) = res {
565 Ok(res)
566 } else {
567 Err(format_response_error("child_hashes_get", res))
568 }
569 })
570 }
571
572 fn accounts_set(
573 &self,
574 snarked_ledger_hash: LedgerHash,
575 parent: &LedgerAddress,
576 accounts: Vec<MinaBaseAccountBinableArgStableV2>,
577 ) -> Result<LedgerHash, String> {
578 self.ledger_manager()
579 .call_sync(LedgerRequest::AccountsSet {
580 snarked_ledger_hash,
581 parent: parent.clone(),
582 accounts,
583 })
584 .map_err(|_| "accounts_set responder dropped".to_owned())
585 .and_then(|res| {
586 if let LedgerResponse::AccountsSet(res) = res {
587 res
588 } else {
589 Err(format_response_error("accounts_set", res))
590 }
591 })
592 }
593}