1use std::{
2 collections::{BTreeMap, BTreeSet},
3 ffi::c_void,
4 time::Duration,
5};
6
7use super::{super::rpc, RpcEffectfulAction};
8use crate::{
9 block_producer::BlockProducerWonSlot,
10 external_snark_worker::available_job_to_snark_worker_spec,
11 p2p::{
12 channels::streaming_rpc::{
13 staged_ledger_parts::calc_total_pieces_to_transfer, P2pStreamingRpcReceiveProgress,
14 },
15 connection::P2pConnectionResponse,
16 },
17 p2p_ready,
18 rpc::{
19 AccountQuery, AccountSlim, ActionStatsQuery, ActionStatsResponse, CurrentMessageProgress,
20 MessagesStats, NodeHeartbeat, ProducedBlockInfo, RootLedgerSyncProgress,
21 RootStagedLedgerSyncProgress, RpcAction, RpcBlockProducerStats, RpcMessageProgressResponse,
22 RpcNodeStatus, RpcNodeStatusLedger, RpcNodeStatusNetworkInfo, RpcNodeStatusResources,
23 RpcNodeStatusTransactionPool, RpcNodeStatusTransitionFrontier,
24 RpcNodeStatusTransitionFrontierBlockSummary, RpcNodeStatusTransitionFrontierSync,
25 RpcRequestExtraData, RpcScanStateSummary, RpcScanStateSummaryBlock,
26 RpcScanStateSummaryBlockTransaction, RpcScanStateSummaryBlockTransactionKind,
27 RpcScanStateSummaryScanStateJob, RpcSnarkPoolJobFull, RpcSnarkPoolJobSnarkWork,
28 RpcSnarkPoolJobSummary, RpcSnarkerJobCommitResponse, RpcSnarkerJobSpecResponse,
29 RpcTransactionInjectResponse, TransactionStatus,
30 },
31 snark_pool::SnarkPoolAction,
32 transition_frontier::sync::{
33 ledger::TransitionFrontierSyncLedgerState, TransitionFrontierSyncState,
34 },
35 Service, Store,
36};
37use ledger::{
38 scan_state::currency::{Balance, Magnitude},
39 Account,
40};
41use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
42use mina_core::{block::ArcBlockWithHash, bug_condition};
43use mina_node_account::AccountPublicKey;
44use mina_p2p_messages::{rpc_kernel::QueryHeader, v2};
45use mina_signer::CompressedPubKey;
46use redux::ActionWithMeta;
47
48macro_rules! respond_or_log {
49 ($e:expr, $t:expr) => {
50 if let Err(err) = $e {
51 mina_core::log::warn!($t; "Failed to respond: {err}");
52 }
53 };
54}
55
56pub fn rpc_effects<S: Service>(store: &mut Store<S>, action: ActionWithMeta<RpcEffectfulAction>) {
57 let (action, meta) = action.split();
58
59 match action {
60 RpcEffectfulAction::GlobalStateGet { rpc_id, filter } => {
61 let _ = store
62 .service
63 .respond_state_get(rpc_id, (store.state.get(), filter.as_deref()));
64 }
65 RpcEffectfulAction::StatusGet { rpc_id } => {
66 let status = compute_node_status(store);
67 let _ = store.service.respond_status_get(rpc_id, Some(status));
68 }
69 RpcEffectfulAction::HeartbeatGet { rpc_id } => {
70 let status = compute_node_status(store);
71 let last_produced_block = store
72 .service
73 .stats()
74 .and_then(|stats| stats.block_producer().last_produced_block.take());
75
76 let last_produced_block_info = match make_produced_block_info(last_produced_block) {
77 Ok(data) => data,
78 Err(error) => {
79 bug_condition!(
80 "HeartbeatGet: Failed to encode block header, returning None: {error}"
81 );
82 None
83 }
84 };
85
86 let heartbeat = NodeHeartbeat {
87 status: status.into(),
88 node_timestamp: meta.time(),
89 peer_id: store.state().p2p.my_id(),
90 last_produced_block_info,
91 };
92 let response = store
93 .service()
94 .with_producer_keypair(move |sk| heartbeat.sign(sk));
95
96 let _ = store.service.respond_heartbeat_get(rpc_id, response);
97 }
98 RpcEffectfulAction::ActionStatsGet { rpc_id, query } => match query {
99 ActionStatsQuery::SinceStart => {
100 let resp = store
101 .service
102 .stats()
103 .map(|s| s.collect_action_stats_since_start())
104 .map(|stats| ActionStatsResponse::SinceStart { stats });
105 let _ = store.service.respond_action_stats_get(rpc_id, resp);
106 }
107 ActionStatsQuery::ForLatestBlock => {
108 let resp = store
109 .service
110 .stats()
111 .and_then(|s| s.collect_action_stats_for_block_with_id(None))
112 .map(ActionStatsResponse::ForBlock);
113 let _ = store.service.respond_action_stats_get(rpc_id, resp);
114 }
115 ActionStatsQuery::ForBlockWithId(id) => {
116 let resp = store
117 .service
118 .stats()
119 .and_then(|s| s.collect_action_stats_for_block_with_id(Some(id)))
120 .map(ActionStatsResponse::ForBlock);
121 let _ = store.service.respond_action_stats_get(rpc_id, resp);
122 }
123 },
124 RpcEffectfulAction::SyncStatsGet { rpc_id, query } => {
125 let resp = store
126 .service
127 .stats()
128 .map(|s| s.collect_sync_stats(query.limit));
129 let _ = store.service.respond_sync_stats_get(rpc_id, resp);
130 }
131 RpcEffectfulAction::BlockProducerStatsGet { rpc_id } => {
132 let mut create_response = || {
133 let state = store.state.get();
134 let best_tip = state.transition_frontier.best_tip()?;
135 let public_key = state.block_producer.config()?.pub_key.clone();
136 let won_slots = &state.block_producer.vrf_evaluator()?.won_slots;
137
138 let stats = store.service.stats()?;
139 let attempts = stats.block_producer().collect_attempts();
140 let future_slot = attempts.last().map_or(0, |v| {
141 v.won_slot.global_slot.checked_add(1).expect("overflow")
142 });
143
144 let cur_global_slot = state.cur_global_slot();
145 let current_epoch = state.current_epoch();
146 let slots_per_epoch = best_tip.constants().slots_per_epoch.as_u32();
147 let epoch_start = cur_global_slot.map(|slot| {
148 (slot.checked_div(slots_per_epoch).expect("division by 0"))
149 .checked_mul(slots_per_epoch)
150 .expect("overflow")
151 });
152
153 let current_epoch_vrf_stats = current_epoch
154 .and_then(|epoch| stats.block_producer().vrf_evaluator.get(&epoch).cloned());
155 let vrf_stats = stats.block_producer().vrf_evaluator.clone();
156
157 Some(RpcBlockProducerStats {
158 current_time: meta.time(),
159 current_global_slot: cur_global_slot,
160 current_epoch,
161 current_epoch_vrf_stats,
162 vrf_stats,
163 epoch_start,
164 epoch_end: epoch_start
165 .map(|slot| slot.checked_add(slots_per_epoch).expect("overflow")),
166 public_key: public_key.into(),
167 attempts,
168 future_won_slots: won_slots
169 .range(future_slot..)
170 .map(|(_, won_slot)| {
171 let won_slot = BlockProducerWonSlot::from_vrf_won_slot(
172 won_slot,
173 best_tip.genesis_timestamp(),
174 );
175 (&won_slot).into()
176 })
177 .collect(),
178 })
179 };
180 let response = create_response();
181 let _ = store
182 .service
183 .respond_block_producer_stats_get(rpc_id, response);
184 }
185 RpcEffectfulAction::MessageProgressGet { rpc_id } => {
186 let p2p = p2p_ready!(store.state().p2p, meta.time());
188 let messages_stats = p2p
189 .network
190 .scheduler
191 .rpc_outgoing_streams
192 .iter()
193 .filter_map(|(peer_id, streams)| {
194 let (_, rpc_state) = streams.first_key_value()?;
195 let QueryHeader { tag: name, .. } = rpc_state.pending.clone()?;
196 let name = name.to_string();
197 let buffer = &rpc_state.buffer;
198 let current_request = if buffer.len() < 8 {
199 None
200 } else {
201 let received_bytes = buffer.len().saturating_sub(8);
202 let total_bytes = u64::from_le_bytes(
203 buffer
204 .get(..8)
205 .expect("slice with incorrect length")
206 .try_into()
207 .expect("cannot fail checked above"),
208 ) as usize;
209 Some(CurrentMessageProgress {
210 name,
211 received_bytes,
212 total_bytes,
213 })
214 };
215
216 Some((
217 *peer_id,
218 MessagesStats {
219 current_request,
220 responses: rpc_state
221 .total_stats
222 .iter()
223 .map(|((name, _), count)| (name.to_string(), *count))
224 .collect(),
225 },
226 ))
227 })
228 .collect();
229
230 let mut response = RpcMessageProgressResponse {
231 messages_stats,
232 staking_ledger_sync: None,
233 next_epoch_ledger_sync: None,
234 root_ledger_sync: None,
235 };
236
237 match &store.state().transition_frontier.sync {
238 TransitionFrontierSyncState::StakingLedgerPending(state) => {
239 if let TransitionFrontierSyncLedgerState::Snarked(state) = &state.ledger {
240 response.staking_ledger_sync = state.estimation()
241 }
242 }
243 TransitionFrontierSyncState::NextEpochLedgerPending(state) => {
244 if let TransitionFrontierSyncLedgerState::Snarked(state) = &state.ledger {
245 response.next_epoch_ledger_sync = state.estimation()
246 }
247 }
248 TransitionFrontierSyncState::RootLedgerPending(state) => match &state.ledger {
249 TransitionFrontierSyncLedgerState::Snarked(state) => {
250 response.root_ledger_sync =
251 state.estimation().map(|data| RootLedgerSyncProgress {
252 fetched: data.fetched,
253 estimation: data.estimation,
254 staged: None,
255 });
256 }
257 TransitionFrontierSyncLedgerState::Staged(state) => {
258 let unknown_staged_progress = || RootStagedLedgerSyncProgress {
259 fetched: 0,
260 total: 1,
261 };
262 let staged = match state.fetch_attempts() {
263 None => state.target_with_parts().map(|(_, parts)| {
264 let v = parts
265 .map(|parts| calc_total_pieces_to_transfer(parts))
266 .unwrap_or(0);
267 RootStagedLedgerSyncProgress {
268 fetched: v,
269 total: v,
270 }
271 }),
272 Some(attempts) => attempts
273 .iter()
274 .find(|(_, s)| s.fetch_pending_rpc_id().is_some())
275 .map(|(id, _)| id)
276 .and_then(|peer_id| store.state().p2p.get_ready_peer(peer_id))
277 .map(|peer| {
278 match peer.channels.streaming_rpc.pending_local_rpc_progress() {
279 None => unknown_staged_progress(),
280 Some(
281 P2pStreamingRpcReceiveProgress::StagedLedgerParts(
282 progress,
283 ),
284 ) => {
285 let (fetched, total) = progress.progress();
286 RootStagedLedgerSyncProgress { fetched, total }
287 }
288 }
289 }),
290 };
291
292 response.root_ledger_sync = Some(RootLedgerSyncProgress {
295 fetched: 1,
296 estimation: 1,
297 staged,
298 });
299 }
300 _ => {}
301 },
302 _ => {}
303 }
304
305 let _ = store
306 .service
307 .respond_message_progress_stats_get(rpc_id, response);
308 }
309 RpcEffectfulAction::PeersGet { rpc_id, peers } => {
310 respond_or_log!(
311 store.service().respond_peers_get(rpc_id, peers),
312 meta.time()
313 );
314 }
315 RpcEffectfulAction::P2pConnectionOutgoingError { rpc_id, error } => {
316 let _ = store
317 .service
318 .respond_p2p_connection_outgoing(rpc_id, Err(error));
319
320 store.dispatch(RpcAction::Finish { rpc_id });
321 }
322 RpcEffectfulAction::P2pConnectionOutgoingSuccess { rpc_id } => {
323 let _ = store
324 .service
325 .respond_p2p_connection_outgoing(rpc_id, Ok(()));
326 store.dispatch(RpcAction::Finish { rpc_id });
327 }
328 RpcEffectfulAction::P2pConnectionIncomingRespond { rpc_id, response } => {
329 let error = match &response {
330 P2pConnectionResponse::Accepted(_) => None,
331 P2pConnectionResponse::Rejected(reason) => Some(format!("Rejected({:?})", reason)),
332 P2pConnectionResponse::SignalDecryptionFailed => {
333 Some("RemoteSignalDecryptionFailed".to_owned())
334 }
335 P2pConnectionResponse::InternalError => Some("RemoteInternalError".to_owned()),
336 };
337 let _ = store
338 .service
339 .respond_p2p_connection_incoming_answer(rpc_id, response);
340 if let Some(error) = error {
341 store.dispatch(RpcAction::P2pConnectionIncomingError { rpc_id, error });
342 }
343 }
344 RpcEffectfulAction::P2pConnectionIncomingError { rpc_id, error } => {
345 let _ = store
346 .service
347 .respond_p2p_connection_incoming(rpc_id, Err(error));
348 store.dispatch(RpcAction::Finish { rpc_id });
349 }
350 RpcEffectfulAction::P2pConnectionIncomingSuccess { rpc_id } => {
351 let _ = store
352 .service
353 .respond_p2p_connection_incoming(rpc_id, Ok(()));
354 store.dispatch(RpcAction::Finish { rpc_id });
355 }
356 RpcEffectfulAction::ScanStateSummaryGetSuccess {
357 rpc_id,
358 mut scan_state,
359 } => {
360 let req = store.state().rpc.requests.get(&rpc_id);
361 let Some(block) = req.and_then(|req| match &req.data {
362 RpcRequestExtraData::FullBlockOpt(opt) => opt.as_ref(),
363 _ => None,
364 }) else {
365 let _ = store.service.respond_scan_state_summary_get(
366 rpc_id,
367 Err("target block not found".to_string()),
368 );
369 return;
370 };
371 let coinbases =
372 block
373 .coinbase_fee_transfers_iter()
374 .map(|_| RpcScanStateSummaryBlockTransaction {
375 hash: None,
376 kind: RpcScanStateSummaryBlockTransactionKind::Coinbase,
377 status: v2::MinaBaseTransactionStatusStableV2::Applied,
378 });
379 let block_summary = RpcScanStateSummaryBlock {
380 hash: block.hash().clone(),
381 height: block.height(),
382 global_slot: block.global_slot_since_genesis(),
383 transactions: block
384 .commands_iter()
385 .map(|tx| RpcScanStateSummaryBlockTransaction {
386 hash: tx.data.hash().ok(),
387 kind: (&tx.data).into(),
388 status: tx.status.clone(),
389 })
390 .chain(coinbases)
391 .collect(),
392 completed_works: block
393 .completed_works_iter()
394 .map(|work| (&work.proofs).into())
395 .collect(),
396 };
397
398 let snark_pool = &store.state().snark_pool;
399 scan_state.iter_mut().flatten().flatten().for_each(|job| {
400 if let RpcScanStateSummaryScanStateJob::Todo {
401 job_id,
402 bundle_job_id,
403 job: kind,
404 seq_no,
405 } = job
406 {
407 let Some(data) = snark_pool.get(bundle_job_id) else {
408 return;
409 };
410 let commitment = data.commitment.clone().map(Box::new);
411 let snark = data
412 .snark
413 .as_ref()
414 .map(|snark| RpcSnarkPoolJobSnarkWork {
415 snarker: snark.work.snarker.clone(),
416 fee: snark.work.fee.clone(),
417 received_t: snark.received_t,
418 sender: snark.sender,
419 })
420 .map(Box::new);
421
422 if commitment.is_none() && snark.is_none() {
423 return;
424 }
425 *job = RpcScanStateSummaryScanStateJob::Pending {
426 job_id: job_id.clone(),
427 bundle_job_id: bundle_job_id.clone(),
428 job: Box::new(kind.clone()),
429 seq_no: *seq_no,
430 commitment,
431 snark,
432 };
433 }
434 });
435 let res = scan_state.map(|scan_state| RpcScanStateSummary {
436 block: block_summary,
437 scan_state,
438 });
439 let _ = store.service.respond_scan_state_summary_get(rpc_id, res);
440 }
441 RpcEffectfulAction::SnarkPoolAvailableJobsGet { rpc_id } => {
442 let resp = store
443 .state()
444 .snark_pool
445 .range(..)
446 .map(|(_, job)| RpcSnarkPoolJobSummary {
447 time: job.time,
448 id: job.id.clone(),
449 commitment: job.commitment.clone(),
450 snark: job.snark.as_ref().map(|snark| RpcSnarkPoolJobSnarkWork {
451 snarker: snark.work.snarker.clone(),
452 fee: snark.work.fee.clone(),
453 received_t: snark.received_t,
454 sender: snark.sender,
455 }),
456 })
457 .collect::<Vec<_>>();
458 let _ = store.service().respond_snark_pool_get(rpc_id, resp);
459 }
460 RpcEffectfulAction::SnarkPoolJobGet { job_id, rpc_id } => {
461 let resp = store.state().snark_pool.range(..).find_map(|(_, job)| {
462 if job.id == job_id {
463 Some(RpcSnarkPoolJobFull {
464 time: job.time,
465 id: job.id.clone(),
466 job: job.job.clone(),
467 commitment: job.commitment.clone(),
468 snark: job.snark.as_ref().map(|snark| RpcSnarkPoolJobSnarkWork {
469 snarker: snark.work.snarker.clone(),
470 fee: snark.work.fee.clone(),
471 received_t: snark.received_t,
472 sender: snark.sender,
473 }),
474 })
475 } else {
476 None
477 }
478 });
479 let _ = store.service().respond_snark_pool_job_get(rpc_id, resp);
480 }
481 RpcEffectfulAction::SnarkPoolCompletedJobsGet { rpc_id, jobs } => {
482 respond_or_log!(
483 store
484 .service()
485 .respond_snark_pool_completed_jobs_get(rpc_id, jobs),
486 meta.time()
487 );
488 }
489 RpcEffectfulAction::SnarkPoolPendingJobsGet { rpc_id, jobs } => {
490 respond_or_log!(
491 store
492 .service()
493 .respond_snark_pool_pending_jobs_get(rpc_id, jobs),
494 meta.time()
495 );
496 }
497 RpcEffectfulAction::SnarkerConfigGet { rpc_id, config } => {
498 let _ = store.service().respond_snarker_config_get(rpc_id, config);
499 }
500 RpcEffectfulAction::SnarkerJobCommit { rpc_id, job_id } => {
501 if !store.state().snark_pool.should_create_commitment(&job_id) {
502 let _ = store
503 .service()
504 .respond_snarker_job_commit(rpc_id, RpcSnarkerJobCommitResponse::JobNotFound);
505 return;
507 }
508 if !store.state().external_snark_worker.has_idle() {
509 let _ = store
510 .service()
511 .respond_snarker_job_commit(rpc_id, RpcSnarkerJobCommitResponse::SnarkerBusy);
512 return;
513 }
514 if store
515 .service()
516 .respond_snarker_job_commit(rpc_id, RpcSnarkerJobCommitResponse::Ok)
517 .is_err()
518 {
519 return;
520 }
521 store.dispatch(SnarkPoolAction::CommitmentCreate { job_id });
522 }
523 RpcEffectfulAction::SnarkerJobSpec { rpc_id, job_id } => {
524 let Some(job) = store.state().snark_pool.get(&job_id) else {
525 if store
526 .service()
527 .respond_snarker_job_spec(rpc_id, RpcSnarkerJobSpecResponse::JobNotFound)
528 .is_err()
529 {
530 return;
531 }
532 return;
533 };
534 let input = available_job_to_snark_worker_spec(
535 job.job.clone(),
536 &store.state().transition_frontier,
537 );
538 let Some(config) = store.state.get().config.snarker.as_ref() else {
540 return;
541 };
542 let public_key = config.public_key.clone().into();
543 let fee = config.fee.clone();
544 let input = match input {
545 Ok(instances) => RpcSnarkerJobSpecResponse::Ok(
546 Box::new(mina_p2p_messages::v2::SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponse(Some((
547 mina_p2p_messages::v2::SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponseA0 {
548 instances,
549 fee,
550 },
551 public_key,
552 )))
553 )),
554 Err(err) => RpcSnarkerJobSpecResponse::Err(err),
555 };
556
557 let _ = store.service().respond_snarker_job_spec(rpc_id, input);
559 }
560 RpcEffectfulAction::SnarkerWorkersGet {
561 rpc_id,
562 snark_worker,
563 } => {
564 let _ = store
566 .service()
567 .respond_snarker_workers(rpc_id, vec![snark_worker.into()]);
568 }
569 RpcEffectfulAction::HealthCheck { rpc_id, has_peers } => {
570 respond_or_log!(
571 store.service().respond_health_check(rpc_id, has_peers),
572 meta.time()
573 );
574 }
575 RpcEffectfulAction::ReadinessCheck { rpc_id } => {
576 const THRESH: Duration = Duration::from_secs(60 * 3 * 10);
577 let synced = match store.state().transition_frontier.sync {
578 TransitionFrontierSyncState::Synced { time }
579 if meta.time().checked_sub(time) <= Some(THRESH) =>
580 {
581 Ok(())
582 }
583 TransitionFrontierSyncState::Synced { time } => Err(format!(
584 "Synced {:?} ago, which is more than the threshold {:?}",
585 meta.time().checked_sub(time),
586 THRESH
587 )),
588 _ => Err("not synced".to_owned()),
589 };
590 respond_or_log!(
613 store.service().respond_readiness_check(rpc_id, synced),
614 meta.time()
615 );
616 }
617 RpcEffectfulAction::DiscoveryRoutingTable { rpc_id, response } => {
618 respond_or_log!(
619 store
620 .service()
621 .respond_discovery_routing_table(rpc_id, response),
622 meta.time()
623 );
624 }
625 RpcEffectfulAction::DiscoveryBoostrapStats { rpc_id, response } => {
626 respond_or_log!(
627 store
628 .service()
629 .respond_discovery_bootstrap_stats(rpc_id, response),
630 meta.time()
631 );
632 }
633 RpcEffectfulAction::TransactionPool { rpc_id, response } => {
634 respond_or_log!(
635 store.service().respond_transaction_pool(rpc_id, response),
636 meta.time()
637 )
638 }
639 RpcEffectfulAction::LedgerAccountsGetSuccess {
640 rpc_id,
641 accounts,
642 account_query,
643 } => {
644 match account_query {
647 AccountQuery::All => {
649 let mut accounts: BTreeMap<CompressedPubKey, Account> = accounts
650 .into_iter()
651 .map(|acc| (acc.public_key.clone(), acc))
652 .collect();
653 let nonces_and_amount = store
654 .state()
655 .transaction_pool
656 .get_pending_amount_and_nonce();
657
658 nonces_and_amount
659 .iter()
660 .for_each(|(account_id, (nonce, amount))| {
661 if let Some(account) = accounts.get_mut(&account_id.public_key) {
662 if let Some(nonce) = nonce {
663 if nonce >= &account.nonce {
664 account.nonce = nonce.incr();
666 }
667 }
668 account.balance = account
669 .balance
670 .sub_amount(*amount)
671 .unwrap_or(Balance::zero());
672 }
673 });
674
675 let accounts = accounts
676 .into_values()
677 .map(|v| v.into())
678 .collect::<Vec<AccountSlim>>();
679
680 respond_or_log!(
681 store
682 .service()
683 .respond_ledger_slim_accounts(rpc_id, accounts),
684 meta.time()
685 )
686 }
687 AccountQuery::SinglePublicKey(..) | AccountQuery::PubKeyWithTokenId(..) => {
689 respond_or_log!(
690 store.service().respond_ledger_accounts(rpc_id, accounts),
691 meta.time()
692 )
693 }
694 AccountQuery::MultipleIds(..) => {
695 respond_or_log!(
696 store.service().respond_ledger_accounts(rpc_id, accounts),
697 meta.time()
698 )
699 }
700 }
701 }
702 RpcEffectfulAction::TransactionInjectSuccess { rpc_id, response } => {
703 respond_or_log!(
704 store.service().respond_transaction_inject(
705 rpc_id,
706 RpcTransactionInjectResponse::Success(response)
707 ),
708 meta.time()
709 )
710 }
711 RpcEffectfulAction::TransactionInjectRejected { rpc_id, response } => {
712 respond_or_log!(
713 store.service().respond_transaction_inject(
714 rpc_id,
715 RpcTransactionInjectResponse::Rejected(response)
716 ),
717 meta.time()
718 )
719 }
720 RpcEffectfulAction::TransactionInjectFailure { rpc_id, errors } => {
721 respond_or_log!(
722 store.service().respond_transaction_inject(
723 rpc_id,
724 RpcTransactionInjectResponse::Failure(errors)
725 ),
726 meta.time()
727 )
728 }
729 RpcEffectfulAction::TransitionFrontierUserCommandsGet { rpc_id, commands } => {
730 respond_or_log!(
731 store
732 .service()
733 .respond_transition_frontier_commands(rpc_id, commands),
734 meta.time()
735 )
736 }
737 RpcEffectfulAction::BestChain { rpc_id, best_chain } => {
738 respond_or_log!(
739 store.service().respond_best_chain(rpc_id, best_chain),
740 meta.time()
741 )
742 }
743 RpcEffectfulAction::ConsensusConstantsGet { rpc_id, response } => {
744 respond_or_log!(
745 store
746 .service()
747 .respond_consensus_constants(rpc_id, response),
748 meta.time()
749 )
750 }
751 RpcEffectfulAction::TransactionStatusGet { rpc_id, tx } => {
752 let tx_hash = tx.hash().ok();
754
755 let in_tx_pool = store
756 .state()
757 .transaction_pool
758 .get_all_transactions()
759 .iter()
760 .any(|tx_with_hash| Some(&tx_with_hash.hash) == tx_hash.as_ref());
761
762 if in_tx_pool {
763 respond_or_log!(
764 store
765 .service()
766 .respond_transaction_status(rpc_id, TransactionStatus::Pending),
767 meta.time()
768 );
769 return;
770 }
771
772 let in_transition_frontier = if let Some(hash) = tx_hash {
773 store
774 .state()
775 .transition_frontier
776 .contains_transaction(&hash)
777 } else {
778 false
779 };
780
781 if in_transition_frontier {
783 respond_or_log!(
784 store
785 .service()
786 .respond_transaction_status(rpc_id, TransactionStatus::Included),
787 meta.time()
788 )
789 } else {
791 respond_or_log!(
792 store
793 .service()
794 .respond_transaction_status(rpc_id, TransactionStatus::Unknown),
795 meta.time()
796 )
797 }
798 }
799 RpcEffectfulAction::BlockGet { rpc_id, block } => {
800 respond_or_log!(
801 store.service().respond_block_get(rpc_id, block),
802 meta.time()
803 )
804 }
805
806 RpcEffectfulAction::PooledUserCommands {
807 rpc_id,
808 user_commands,
809 } => {
810 respond_or_log!(
811 store
812 .service()
813 .respond_pooled_user_commands(rpc_id, user_commands),
814 meta.time()
815 )
816 }
817
818 RpcEffectfulAction::PooledZkappCommands {
819 rpc_id,
820 zkapp_commands,
821 } => {
822 respond_or_log!(
823 store
824 .service()
825 .respond_pooled_zkapp_commands(rpc_id, zkapp_commands),
826 meta.time()
827 )
828 }
829 RpcEffectfulAction::GenesisBlock {
830 rpc_id,
831 genesis_block,
832 } => {
833 respond_or_log!(
834 store.service().respond_genesis_block(rpc_id, genesis_block),
835 meta.time()
836 )
837 }
838
839 RpcEffectfulAction::ConsensusTimeGet {
840 rpc_id,
841 consensus_time,
842 } => {
843 respond_or_log!(
844 store
845 .service()
846 .respond_consensus_time_get(rpc_id, consensus_time),
847 meta.time()
848 )
849 }
850 RpcEffectfulAction::LedgerStatusGetSuccess { rpc_id, response } => {
851 respond_or_log!(
852 store.service().respond_ledger_status_get(rpc_id, response),
853 meta.time()
854 )
855 }
856 RpcEffectfulAction::LedgerAccountDelegatorsGetSuccess { rpc_id, response } => {
857 respond_or_log!(
858 store
859 .service()
860 .respond_ledger_account_delegators_get(rpc_id, response),
861 meta.time()
862 )
863 }
864 }
865}
866
867fn compute_node_status<S: Service>(store: &mut Store<S>) -> RpcNodeStatus {
868 let state = store.state.get();
869 let chain_id = state.p2p.ready().map(|p2p| p2p.chain_id.to_hex());
870 let block_summary = |b: &ArcBlockWithHash| RpcNodeStatusTransitionFrontierBlockSummary {
871 hash: b.hash().clone(),
872 height: b.height(),
873 global_slot: b.global_slot(),
874 };
875
876 let block_production_attempts = store
877 .service
878 .stats()
879 .map_or_else(Vec::new, |stats| stats.block_producer().collect_attempts());
880
881 let current_block_production_attempt = block_production_attempts.last().cloned();
882
883 let previous_block_production_attempt = block_production_attempts
884 .len()
885 .checked_sub(2)
886 .and_then(|idx| block_production_attempts.get(idx))
887 .cloned();
888
889 let network_info = RpcNodeStatusNetworkInfo {
890 bind_ip: "0.0.0.0".to_string(),
891 external_ip: state
892 .p2p
893 .config()
894 .external_addrs
895 .first()
896 .map(|addr| addr.to_string()),
897 client_port: state.config.client_port,
898 libp2p_port: state.p2p.config().libp2p_port,
899 };
900
901 let block_producer = state
902 .block_producer
903 .config()
904 .map(|config| AccountPublicKey::from(config.pub_key.clone()));
905 let coinbase_receiver = state
906 .block_producer
907 .config()
908 .map(|config| AccountPublicKey::from(config.coinbase_receiver().clone()));
909
910 let status = RpcNodeStatus {
911 chain_id,
912 block_producer,
913 coinbase_receiver,
914 transition_frontier: RpcNodeStatusTransitionFrontier {
915 best_tip: state.transition_frontier.best_tip().map(block_summary),
916 sync: RpcNodeStatusTransitionFrontierSync {
917 time: state.transition_frontier.sync.time(),
918 status: state.transition_frontier.sync.to_string(),
919 phase: state.transition_frontier.sync.sync_phase().to_string(),
920 target: state.transition_frontier.sync.best_tip().map(block_summary),
921 },
922 },
923 ledger: RpcNodeStatusLedger {
924 alive_masks_after_last_commit: state.ledger.alive_masks,
925 pending_writes: state
926 .ledger
927 .write
928 .pending_requests()
929 .map(|(req, time)| (req.kind(), time))
930 .collect(),
931 pending_reads: state
932 .ledger
933 .read
934 .pending_requests()
935 .map(|(id, req, time)| (id, req.kind(), time))
936 .collect(),
937 },
938 peers: rpc::collect_rpc_peers_info(state),
939 snark_pool: state
940 .snark_pool
941 .jobs_iter()
942 .fold(Default::default(), |mut acc, job| {
943 if job.snark.is_some() {
944 acc.snarks = acc.snarks.saturating_add(1);
945 }
946 acc.total_jobs = acc.total_jobs.saturating_add(1);
947 acc
948 }),
949 transaction_pool: RpcNodeStatusTransactionPool {
950 transactions: state.transaction_pool.size(),
951 transactions_for_propagation: state.transaction_pool.for_propagation_size(),
952 transaction_candidates: state.transaction_pool.candidates.transactions_count(),
953 },
954 current_block_production_attempt,
955 previous_block_production_attempt,
956 resources_status: RpcNodeStatusResources {
957 p2p_malloc_size: {
958 let mut set = BTreeSet::new();
959 let fun = move |ptr: *const c_void| !set.insert(ptr.addr());
960 let mut ops = MallocSizeOfOps::new(None, Some(Box::new(fun)));
961 size_of_val(&state.p2p).saturating_add(state.p2p.size_of(&mut ops))
962 },
963 transition_frontier: state.transition_frontier.resources_usage(),
964 snark_pool: state.snark_pool.resources_usage(),
965 },
966 service_queues: store.service.queues(),
967 network_info,
968 };
969 status
970}
971
972fn make_produced_block_info(
973 block: Option<ArcBlockWithHash>,
974) -> std::io::Result<Option<ProducedBlockInfo>> {
975 use base64::{engine::general_purpose::URL_SAFE, Engine as _};
976 use mina_p2p_messages::binprot::BinProtWrite;
977
978 let Some(block) = block else { return Ok(None) };
979
980 let height = block.height();
981 let global_slot = block.global_slot();
982 let hash = block.hash().to_string();
983 let mut buf = Vec::with_capacity(5 * 1024 * 1024);
984 v2::MinaBlockHeaderStableV2::binprot_write(block.header(), &mut buf)?;
985
986 let base64_encoded_header = URL_SAFE.encode(&buf);
987
988 Ok(Some(ProducedBlockInfo {
989 height,
990 global_slot,
991 hash,
992 base64_encoded_header,
993 }))
994}