1use std::{
2 collections::{BTreeMap, BTreeSet},
3 ffi::c_void,
4 time::Duration,
5};
6
7use super::{super::rpc, RpcEffectfulAction};
8use crate::{
9 block_producer::BlockProducerWonSlot,
10 external_snark_worker::available_job_to_snark_worker_spec,
11 p2p::connection::P2pConnectionResponse,
12 p2p_ready,
13 rpc::{
14 AccountQuery, AccountSlim, ActionStatsQuery, ActionStatsResponse, CurrentMessageProgress,
15 MessagesStats, NodeHeartbeat, ProducedBlockInfo, RootLedgerSyncProgress,
16 RootStagedLedgerSyncProgress, RpcAction, RpcBlockProducerStats, RpcMessageProgressResponse,
17 RpcNodeStatus, RpcNodeStatusLedger, RpcNodeStatusNetworkInfo, RpcNodeStatusResources,
18 RpcNodeStatusTransactionPool, RpcNodeStatusTransitionFrontier,
19 RpcNodeStatusTransitionFrontierBlockSummary, RpcNodeStatusTransitionFrontierSync,
20 RpcRequestExtraData, RpcScanStateSummary, RpcScanStateSummaryBlock,
21 RpcScanStateSummaryBlockTransaction, RpcScanStateSummaryBlockTransactionKind,
22 RpcScanStateSummaryScanStateJob, RpcSnarkPoolJobFull, RpcSnarkPoolJobSnarkWork,
23 RpcSnarkPoolJobSummary, RpcSnarkerJobCommitResponse, RpcSnarkerJobSpecResponse,
24 RpcTransactionInjectResponse, TransactionStatus,
25 },
26 snark_pool::SnarkPoolAction,
27 transition_frontier::sync::{
28 ledger::TransitionFrontierSyncLedgerState, TransitionFrontierSyncState,
29 },
30 Service, Store,
31};
32use ledger::{
33 scan_state::currency::{Balance, Magnitude},
34 Account,
35};
36use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
37use mina_p2p_messages::{rpc_kernel::QueryHeader, v2};
38use mina_signer::CompressedPubKey;
39use openmina_core::{block::ArcBlockWithHash, bug_condition};
40use openmina_node_account::AccountPublicKey;
41use p2p::channels::streaming_rpc::{
42 staged_ledger_parts::calc_total_pieces_to_transfer, P2pStreamingRpcReceiveProgress,
43};
44use redux::ActionWithMeta;
45
46macro_rules! respond_or_log {
47 ($e:expr, $t:expr) => {
48 if let Err(err) = $e {
49 openmina_core::log::warn!($t; "Failed to respond: {err}");
50 }
51 };
52}
53
54pub fn rpc_effects<S: Service>(store: &mut Store<S>, action: ActionWithMeta<RpcEffectfulAction>) {
55 let (action, meta) = action.split();
56
57 match action {
58 RpcEffectfulAction::GlobalStateGet { rpc_id, filter } => {
59 let _ = store
60 .service
61 .respond_state_get(rpc_id, (store.state.get(), filter.as_deref()));
62 }
63 RpcEffectfulAction::StatusGet { rpc_id } => {
64 let status = compute_node_status(store);
65 let _ = store.service.respond_status_get(rpc_id, Some(status));
66 }
67 RpcEffectfulAction::HeartbeatGet { rpc_id } => {
68 let status = compute_node_status(store);
69 let last_produced_block = store
70 .service
71 .stats()
72 .and_then(|stats| stats.block_producer().last_produced_block.take());
73
74 let last_produced_block_info = match make_produced_block_info(last_produced_block) {
75 Ok(data) => data,
76 Err(error) => {
77 bug_condition!(
78 "HeartbeatGet: Failed to encode block header, returning None: {error}"
79 );
80 None
81 }
82 };
83
84 let heartbeat = NodeHeartbeat {
85 status: status.into(),
86 node_timestamp: meta.time(),
87 peer_id: store.state().p2p.my_id(),
88 last_produced_block_info,
89 };
90 let response = store
91 .service()
92 .with_producer_keypair(move |sk| heartbeat.sign(sk));
93
94 let _ = store.service.respond_heartbeat_get(rpc_id, response);
95 }
96 RpcEffectfulAction::ActionStatsGet { rpc_id, query } => match query {
97 ActionStatsQuery::SinceStart => {
98 let resp = store
99 .service
100 .stats()
101 .map(|s| s.collect_action_stats_since_start())
102 .map(|stats| ActionStatsResponse::SinceStart { stats });
103 let _ = store.service.respond_action_stats_get(rpc_id, resp);
104 }
105 ActionStatsQuery::ForLatestBlock => {
106 let resp = store
107 .service
108 .stats()
109 .and_then(|s| s.collect_action_stats_for_block_with_id(None))
110 .map(ActionStatsResponse::ForBlock);
111 let _ = store.service.respond_action_stats_get(rpc_id, resp);
112 }
113 ActionStatsQuery::ForBlockWithId(id) => {
114 let resp = store
115 .service
116 .stats()
117 .and_then(|s| s.collect_action_stats_for_block_with_id(Some(id)))
118 .map(ActionStatsResponse::ForBlock);
119 let _ = store.service.respond_action_stats_get(rpc_id, resp);
120 }
121 },
122 RpcEffectfulAction::SyncStatsGet { rpc_id, query } => {
123 let resp = store
124 .service
125 .stats()
126 .map(|s| s.collect_sync_stats(query.limit));
127 let _ = store.service.respond_sync_stats_get(rpc_id, resp);
128 }
129 RpcEffectfulAction::BlockProducerStatsGet { rpc_id } => {
130 let mut create_response = || {
131 let state = store.state.get();
132 let best_tip = state.transition_frontier.best_tip()?;
133 let public_key = state.block_producer.config()?.pub_key.clone();
134 let won_slots = &state.block_producer.vrf_evaluator()?.won_slots;
135
136 let stats = store.service.stats()?;
137 let attempts = stats.block_producer().collect_attempts();
138 let future_slot = attempts.last().map_or(0, |v| {
139 v.won_slot.global_slot.checked_add(1).expect("overflow")
140 });
141
142 let cur_global_slot = state.cur_global_slot();
143 let current_epoch = state.current_epoch();
144 let slots_per_epoch = best_tip.constants().slots_per_epoch.as_u32();
145 let epoch_start = cur_global_slot.map(|slot| {
146 (slot.checked_div(slots_per_epoch).expect("division by 0"))
147 .checked_mul(slots_per_epoch)
148 .expect("overflow")
149 });
150
151 let current_epoch_vrf_stats = current_epoch
152 .and_then(|epoch| stats.block_producer().vrf_evaluator.get(&epoch).cloned());
153 let vrf_stats = stats.block_producer().vrf_evaluator.clone();
154
155 Some(RpcBlockProducerStats {
156 current_time: meta.time(),
157 current_global_slot: cur_global_slot,
158 current_epoch,
159 current_epoch_vrf_stats,
160 vrf_stats,
161 epoch_start,
162 epoch_end: epoch_start
163 .map(|slot| slot.checked_add(slots_per_epoch).expect("overflow")),
164 public_key: public_key.into(),
165 attempts,
166 future_won_slots: won_slots
167 .range(future_slot..)
168 .map(|(_, won_slot)| {
169 let won_slot = BlockProducerWonSlot::from_vrf_won_slot(
170 won_slot,
171 best_tip.genesis_timestamp(),
172 );
173 (&won_slot).into()
174 })
175 .collect(),
176 })
177 };
178 let response = create_response();
179 let _ = store
180 .service
181 .respond_block_producer_stats_get(rpc_id, response);
182 }
183 RpcEffectfulAction::MessageProgressGet { rpc_id } => {
184 let p2p = p2p_ready!(store.state().p2p, meta.time());
186 let messages_stats = p2p
187 .network
188 .scheduler
189 .rpc_outgoing_streams
190 .iter()
191 .filter_map(|(peer_id, streams)| {
192 let (_, rpc_state) = streams.first_key_value()?;
193 let QueryHeader { tag: name, .. } = rpc_state.pending.clone()?;
194 let name = name.to_string();
195 let buffer = &rpc_state.buffer;
196 let current_request = if buffer.len() < 8 {
197 None
198 } else {
199 let received_bytes = buffer.len().saturating_sub(8);
200 let total_bytes = u64::from_le_bytes(
201 buffer
202 .get(..8)
203 .expect("slice with incorrect length")
204 .try_into()
205 .expect("cannot fail checked above"),
206 ) as usize;
207 Some(CurrentMessageProgress {
208 name,
209 received_bytes,
210 total_bytes,
211 })
212 };
213
214 Some((
215 *peer_id,
216 MessagesStats {
217 current_request,
218 responses: rpc_state
219 .total_stats
220 .iter()
221 .map(|((name, _), count)| (name.to_string(), *count))
222 .collect(),
223 },
224 ))
225 })
226 .collect();
227
228 let mut response = RpcMessageProgressResponse {
229 messages_stats,
230 staking_ledger_sync: None,
231 next_epoch_ledger_sync: None,
232 root_ledger_sync: None,
233 };
234
235 match &store.state().transition_frontier.sync {
236 TransitionFrontierSyncState::StakingLedgerPending(state) => {
237 if let TransitionFrontierSyncLedgerState::Snarked(state) = &state.ledger {
238 response.staking_ledger_sync = state.estimation()
239 }
240 }
241 TransitionFrontierSyncState::NextEpochLedgerPending(state) => {
242 if let TransitionFrontierSyncLedgerState::Snarked(state) = &state.ledger {
243 response.next_epoch_ledger_sync = state.estimation()
244 }
245 }
246 TransitionFrontierSyncState::RootLedgerPending(state) => match &state.ledger {
247 TransitionFrontierSyncLedgerState::Snarked(state) => {
248 response.root_ledger_sync =
249 state.estimation().map(|data| RootLedgerSyncProgress {
250 fetched: data.fetched,
251 estimation: data.estimation,
252 staged: None,
253 });
254 }
255 TransitionFrontierSyncLedgerState::Staged(state) => {
256 let unknown_staged_progress = || RootStagedLedgerSyncProgress {
257 fetched: 0,
258 total: 1,
259 };
260 let staged = match state.fetch_attempts() {
261 None => state.target_with_parts().map(|(_, parts)| {
262 let v = parts
263 .map(|parts| calc_total_pieces_to_transfer(parts))
264 .unwrap_or(0);
265 RootStagedLedgerSyncProgress {
266 fetched: v,
267 total: v,
268 }
269 }),
270 Some(attempts) => attempts
271 .iter()
272 .find(|(_, s)| s.fetch_pending_rpc_id().is_some())
273 .map(|(id, _)| id)
274 .and_then(|peer_id| store.state().p2p.get_ready_peer(peer_id))
275 .map(|peer| {
276 match peer.channels.streaming_rpc.pending_local_rpc_progress() {
277 None => unknown_staged_progress(),
278 Some(
279 P2pStreamingRpcReceiveProgress::StagedLedgerParts(
280 progress,
281 ),
282 ) => {
283 let (fetched, total) = progress.progress();
284 RootStagedLedgerSyncProgress { fetched, total }
285 }
286 }
287 }),
288 };
289
290 response.root_ledger_sync = Some(RootLedgerSyncProgress {
293 fetched: 1,
294 estimation: 1,
295 staged,
296 });
297 }
298 _ => {}
299 },
300 _ => {}
301 }
302
303 let _ = store
304 .service
305 .respond_message_progress_stats_get(rpc_id, response);
306 }
307 RpcEffectfulAction::PeersGet { rpc_id, peers } => {
308 respond_or_log!(
309 store.service().respond_peers_get(rpc_id, peers),
310 meta.time()
311 );
312 }
313 RpcEffectfulAction::P2pConnectionOutgoingError { rpc_id, error } => {
314 let _ = store
315 .service
316 .respond_p2p_connection_outgoing(rpc_id, Err(error));
317
318 store.dispatch(RpcAction::Finish { rpc_id });
319 }
320 RpcEffectfulAction::P2pConnectionOutgoingSuccess { rpc_id } => {
321 let _ = store
322 .service
323 .respond_p2p_connection_outgoing(rpc_id, Ok(()));
324 store.dispatch(RpcAction::Finish { rpc_id });
325 }
326 RpcEffectfulAction::P2pConnectionIncomingRespond { rpc_id, response } => {
327 let error = match &response {
328 P2pConnectionResponse::Accepted(_) => None,
329 P2pConnectionResponse::Rejected(reason) => Some(format!("Rejected({:?})", reason)),
330 P2pConnectionResponse::SignalDecryptionFailed => {
331 Some("RemoteSignalDecryptionFailed".to_owned())
332 }
333 P2pConnectionResponse::InternalError => Some("RemoteInternalError".to_owned()),
334 };
335 let _ = store
336 .service
337 .respond_p2p_connection_incoming_answer(rpc_id, response);
338 if let Some(error) = error {
339 store.dispatch(RpcAction::P2pConnectionIncomingError { rpc_id, error });
340 }
341 }
342 RpcEffectfulAction::P2pConnectionIncomingError { rpc_id, error } => {
343 let _ = store
344 .service
345 .respond_p2p_connection_incoming(rpc_id, Err(error));
346 store.dispatch(RpcAction::Finish { rpc_id });
347 }
348 RpcEffectfulAction::P2pConnectionIncomingSuccess { rpc_id } => {
349 let _ = store
350 .service
351 .respond_p2p_connection_incoming(rpc_id, Ok(()));
352 store.dispatch(RpcAction::Finish { rpc_id });
353 }
354 RpcEffectfulAction::ScanStateSummaryGetSuccess {
355 rpc_id,
356 mut scan_state,
357 } => {
358 let req = store.state().rpc.requests.get(&rpc_id);
359 let Some(block) = req.and_then(|req| match &req.data {
360 RpcRequestExtraData::FullBlockOpt(opt) => opt.as_ref(),
361 _ => None,
362 }) else {
363 let _ = store.service.respond_scan_state_summary_get(
364 rpc_id,
365 Err("target block not found".to_string()),
366 );
367 return;
368 };
369 let coinbases =
370 block
371 .coinbase_fee_transfers_iter()
372 .map(|_| RpcScanStateSummaryBlockTransaction {
373 hash: None,
374 kind: RpcScanStateSummaryBlockTransactionKind::Coinbase,
375 status: v2::MinaBaseTransactionStatusStableV2::Applied,
376 });
377 let block_summary = RpcScanStateSummaryBlock {
378 hash: block.hash().clone(),
379 height: block.height(),
380 global_slot: block.global_slot_since_genesis(),
381 transactions: block
382 .commands_iter()
383 .map(|tx| RpcScanStateSummaryBlockTransaction {
384 hash: tx.data.hash().ok(),
385 kind: (&tx.data).into(),
386 status: tx.status.clone(),
387 })
388 .chain(coinbases)
389 .collect(),
390 completed_works: block
391 .completed_works_iter()
392 .map(|work| (&work.proofs).into())
393 .collect(),
394 };
395
396 let snark_pool = &store.state().snark_pool;
397 scan_state.iter_mut().flatten().flatten().for_each(|job| {
398 if let RpcScanStateSummaryScanStateJob::Todo {
399 job_id,
400 bundle_job_id,
401 job: kind,
402 seq_no,
403 } = job
404 {
405 let Some(data) = snark_pool.get(bundle_job_id) else {
406 return;
407 };
408 let commitment = data.commitment.clone().map(Box::new);
409 let snark = data
410 .snark
411 .as_ref()
412 .map(|snark| RpcSnarkPoolJobSnarkWork {
413 snarker: snark.work.snarker.clone(),
414 fee: snark.work.fee.clone(),
415 received_t: snark.received_t,
416 sender: snark.sender,
417 })
418 .map(Box::new);
419
420 if commitment.is_none() && snark.is_none() {
421 return;
422 }
423 *job = RpcScanStateSummaryScanStateJob::Pending {
424 job_id: job_id.clone(),
425 bundle_job_id: bundle_job_id.clone(),
426 job: Box::new(kind.clone()),
427 seq_no: *seq_no,
428 commitment,
429 snark,
430 };
431 }
432 });
433 let res = scan_state.map(|scan_state| RpcScanStateSummary {
434 block: block_summary,
435 scan_state,
436 });
437 let _ = store.service.respond_scan_state_summary_get(rpc_id, res);
438 }
439 RpcEffectfulAction::SnarkPoolAvailableJobsGet { rpc_id } => {
440 let resp = store
441 .state()
442 .snark_pool
443 .range(..)
444 .map(|(_, job)| RpcSnarkPoolJobSummary {
445 time: job.time,
446 id: job.id.clone(),
447 commitment: job.commitment.clone(),
448 snark: job.snark.as_ref().map(|snark| RpcSnarkPoolJobSnarkWork {
449 snarker: snark.work.snarker.clone(),
450 fee: snark.work.fee.clone(),
451 received_t: snark.received_t,
452 sender: snark.sender,
453 }),
454 })
455 .collect::<Vec<_>>();
456 let _ = store.service().respond_snark_pool_get(rpc_id, resp);
457 }
458 RpcEffectfulAction::SnarkPoolJobGet { job_id, rpc_id } => {
459 let resp = store.state().snark_pool.range(..).find_map(|(_, job)| {
460 if job.id == job_id {
461 Some(RpcSnarkPoolJobFull {
462 time: job.time,
463 id: job.id.clone(),
464 job: job.job.clone(),
465 commitment: job.commitment.clone(),
466 snark: job.snark.as_ref().map(|snark| RpcSnarkPoolJobSnarkWork {
467 snarker: snark.work.snarker.clone(),
468 fee: snark.work.fee.clone(),
469 received_t: snark.received_t,
470 sender: snark.sender,
471 }),
472 })
473 } else {
474 None
475 }
476 });
477 let _ = store.service().respond_snark_pool_job_get(rpc_id, resp);
478 }
479 RpcEffectfulAction::SnarkPoolCompletedJobsGet { rpc_id, jobs } => {
480 respond_or_log!(
481 store
482 .service()
483 .respond_snark_pool_completed_jobs_get(rpc_id, jobs),
484 meta.time()
485 );
486 }
487 RpcEffectfulAction::SnarkPoolPendingJobsGet { rpc_id, jobs } => {
488 respond_or_log!(
489 store
490 .service()
491 .respond_snark_pool_pending_jobs_get(rpc_id, jobs),
492 meta.time()
493 );
494 }
495 RpcEffectfulAction::SnarkerConfigGet { rpc_id, config } => {
496 let _ = store.service().respond_snarker_config_get(rpc_id, config);
497 }
498 RpcEffectfulAction::SnarkerJobCommit { rpc_id, job_id } => {
499 if !store.state().snark_pool.should_create_commitment(&job_id) {
500 let _ = store
501 .service()
502 .respond_snarker_job_commit(rpc_id, RpcSnarkerJobCommitResponse::JobNotFound);
503 return;
505 }
506 if !store.state().external_snark_worker.has_idle() {
507 let _ = store
508 .service()
509 .respond_snarker_job_commit(rpc_id, RpcSnarkerJobCommitResponse::SnarkerBusy);
510 return;
511 }
512 if store
513 .service()
514 .respond_snarker_job_commit(rpc_id, RpcSnarkerJobCommitResponse::Ok)
515 .is_err()
516 {
517 return;
518 }
519 store.dispatch(SnarkPoolAction::CommitmentCreate { job_id });
520 }
521 RpcEffectfulAction::SnarkerJobSpec { rpc_id, job_id } => {
522 let Some(job) = store.state().snark_pool.get(&job_id) else {
523 if store
524 .service()
525 .respond_snarker_job_spec(rpc_id, RpcSnarkerJobSpecResponse::JobNotFound)
526 .is_err()
527 {
528 return;
529 }
530 return;
531 };
532 let input = available_job_to_snark_worker_spec(
533 job.job.clone(),
534 &store.state().transition_frontier,
535 );
536 let Some(config) = store.state.get().config.snarker.as_ref() else {
538 return;
539 };
540 let public_key = config.public_key.clone().into();
541 let fee = config.fee.clone();
542 let input = match input {
543 Ok(instances) => RpcSnarkerJobSpecResponse::Ok(
544 mina_p2p_messages::v2::SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponse(Some((
545 mina_p2p_messages::v2::SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponseA0 {
546 instances,
547 fee,
548 },
549 public_key,
550 )))
551 ),
552 Err(err) => RpcSnarkerJobSpecResponse::Err(err),
553 };
554
555 let _ = store.service().respond_snarker_job_spec(rpc_id, input);
557 }
558 RpcEffectfulAction::SnarkerWorkersGet {
559 rpc_id,
560 snark_worker,
561 } => {
562 let _ = store
564 .service()
565 .respond_snarker_workers(rpc_id, vec![snark_worker.into()]);
566 }
567 RpcEffectfulAction::HealthCheck { rpc_id, has_peers } => {
568 respond_or_log!(
569 store.service().respond_health_check(rpc_id, has_peers),
570 meta.time()
571 );
572 }
573 RpcEffectfulAction::ReadinessCheck { rpc_id } => {
574 const THRESH: Duration = Duration::from_secs(60 * 3 * 10);
575 let synced = match store.state().transition_frontier.sync {
576 TransitionFrontierSyncState::Synced { time }
577 if meta.time().checked_sub(time) <= Some(THRESH) =>
578 {
579 Ok(())
580 }
581 TransitionFrontierSyncState::Synced { time } => Err(format!(
582 "Synced {:?} ago, which is more than the threshold {:?}",
583 meta.time().checked_sub(time),
584 THRESH
585 )),
586 _ => Err("not synced".to_owned()),
587 };
588 respond_or_log!(
611 store.service().respond_readiness_check(rpc_id, synced),
612 meta.time()
613 );
614 }
615 RpcEffectfulAction::DiscoveryRoutingTable { rpc_id, response } => {
616 respond_or_log!(
617 store
618 .service()
619 .respond_discovery_routing_table(rpc_id, response),
620 meta.time()
621 );
622 }
623 RpcEffectfulAction::DiscoveryBoostrapStats { rpc_id, response } => {
624 respond_or_log!(
625 store
626 .service()
627 .respond_discovery_bootstrap_stats(rpc_id, response),
628 meta.time()
629 );
630 }
631 RpcEffectfulAction::TransactionPool { rpc_id, response } => {
632 respond_or_log!(
633 store.service().respond_transaction_pool(rpc_id, response),
634 meta.time()
635 )
636 }
637 RpcEffectfulAction::LedgerAccountsGetSuccess {
638 rpc_id,
639 accounts,
640 account_query,
641 } => {
642 match account_query {
645 AccountQuery::All => {
647 let mut accounts: BTreeMap<CompressedPubKey, Account> = accounts
648 .into_iter()
649 .map(|acc| (acc.public_key.clone(), acc))
650 .collect();
651 let nonces_and_amount = store
652 .state()
653 .transaction_pool
654 .get_pending_amount_and_nonce();
655
656 nonces_and_amount
657 .iter()
658 .for_each(|(account_id, (nonce, amount))| {
659 if let Some(account) = accounts.get_mut(&account_id.public_key) {
660 if let Some(nonce) = nonce {
661 if nonce >= &account.nonce {
662 account.nonce = nonce.incr();
664 }
665 }
666 account.balance = account
667 .balance
668 .sub_amount(*amount)
669 .unwrap_or(Balance::zero());
670 }
671 });
672
673 let accounts = accounts
674 .into_values()
675 .map(|v| v.into())
676 .collect::<Vec<AccountSlim>>();
677
678 respond_or_log!(
679 store
680 .service()
681 .respond_ledger_slim_accounts(rpc_id, accounts),
682 meta.time()
683 )
684 }
685 AccountQuery::SinglePublicKey(..) | AccountQuery::PubKeyWithTokenId(..) => {
687 respond_or_log!(
688 store.service().respond_ledger_accounts(rpc_id, accounts),
689 meta.time()
690 )
691 }
692 AccountQuery::MultipleIds(..) => {
693 respond_or_log!(
694 store.service().respond_ledger_accounts(rpc_id, accounts),
695 meta.time()
696 )
697 }
698 }
699 }
700 RpcEffectfulAction::TransactionInjectSuccess { rpc_id, response } => {
701 respond_or_log!(
702 store.service().respond_transaction_inject(
703 rpc_id,
704 RpcTransactionInjectResponse::Success(response)
705 ),
706 meta.time()
707 )
708 }
709 RpcEffectfulAction::TransactionInjectRejected { rpc_id, response } => {
710 respond_or_log!(
711 store.service().respond_transaction_inject(
712 rpc_id,
713 RpcTransactionInjectResponse::Rejected(response)
714 ),
715 meta.time()
716 )
717 }
718 RpcEffectfulAction::TransactionInjectFailure { rpc_id, errors } => {
719 respond_or_log!(
720 store.service().respond_transaction_inject(
721 rpc_id,
722 RpcTransactionInjectResponse::Failure(errors)
723 ),
724 meta.time()
725 )
726 }
727 RpcEffectfulAction::TransitionFrontierUserCommandsGet { rpc_id, commands } => {
728 respond_or_log!(
729 store
730 .service()
731 .respond_transition_frontier_commands(rpc_id, commands),
732 meta.time()
733 )
734 }
735 RpcEffectfulAction::BestChain { rpc_id, best_chain } => {
736 respond_or_log!(
737 store.service().respond_best_chain(rpc_id, best_chain),
738 meta.time()
739 )
740 }
741 RpcEffectfulAction::ConsensusConstantsGet { rpc_id, response } => {
742 respond_or_log!(
743 store
744 .service()
745 .respond_consensus_constants(rpc_id, response),
746 meta.time()
747 )
748 }
749 RpcEffectfulAction::TransactionStatusGet { rpc_id, tx } => {
750 let tx_hash = tx.hash().ok();
752
753 let in_tx_pool = store
754 .state()
755 .transaction_pool
756 .get_all_transactions()
757 .iter()
758 .any(|tx_with_hash| Some(&tx_with_hash.hash) == tx_hash.as_ref());
759
760 if in_tx_pool {
761 respond_or_log!(
762 store
763 .service()
764 .respond_transaction_status(rpc_id, TransactionStatus::Pending),
765 meta.time()
766 );
767 return;
768 }
769
770 let in_transition_frontier = if let Some(hash) = tx_hash {
771 store
772 .state()
773 .transition_frontier
774 .contains_transaction(&hash)
775 } else {
776 false
777 };
778
779 if in_transition_frontier {
781 respond_or_log!(
782 store
783 .service()
784 .respond_transaction_status(rpc_id, TransactionStatus::Included),
785 meta.time()
786 )
787 } else {
789 respond_or_log!(
790 store
791 .service()
792 .respond_transaction_status(rpc_id, TransactionStatus::Unknown),
793 meta.time()
794 )
795 }
796 }
797 RpcEffectfulAction::BlockGet { rpc_id, block } => {
798 respond_or_log!(
799 store.service().respond_block_get(rpc_id, block),
800 meta.time()
801 )
802 }
803
804 RpcEffectfulAction::PooledUserCommands {
805 rpc_id,
806 user_commands,
807 } => {
808 respond_or_log!(
809 store
810 .service()
811 .respond_pooled_user_commands(rpc_id, user_commands),
812 meta.time()
813 )
814 }
815
816 RpcEffectfulAction::PooledZkappCommands {
817 rpc_id,
818 zkapp_commands,
819 } => {
820 respond_or_log!(
821 store
822 .service()
823 .respond_pooled_zkapp_commands(rpc_id, zkapp_commands),
824 meta.time()
825 )
826 }
827 RpcEffectfulAction::GenesisBlock {
828 rpc_id,
829 genesis_block,
830 } => {
831 respond_or_log!(
832 store.service().respond_genesis_block(rpc_id, genesis_block),
833 meta.time()
834 )
835 }
836
837 RpcEffectfulAction::ConsensusTimeGet {
838 rpc_id,
839 consensus_time,
840 } => {
841 respond_or_log!(
842 store
843 .service()
844 .respond_consensus_time_get(rpc_id, consensus_time),
845 meta.time()
846 )
847 }
848 RpcEffectfulAction::LedgerStatusGetSuccess { rpc_id, response } => {
849 respond_or_log!(
850 store.service().respond_ledger_status_get(rpc_id, response),
851 meta.time()
852 )
853 }
854 RpcEffectfulAction::LedgerAccountDelegatorsGetSuccess { rpc_id, response } => {
855 respond_or_log!(
856 store
857 .service()
858 .respond_ledger_account_delegators_get(rpc_id, response),
859 meta.time()
860 )
861 }
862 }
863}
864
865fn compute_node_status<S: Service>(store: &mut Store<S>) -> RpcNodeStatus {
866 let state = store.state.get();
867 let chain_id = state.p2p.ready().map(|p2p| p2p.chain_id.to_hex());
868 let block_summary = |b: &ArcBlockWithHash| RpcNodeStatusTransitionFrontierBlockSummary {
869 hash: b.hash().clone(),
870 height: b.height(),
871 global_slot: b.global_slot(),
872 };
873
874 let block_production_attempts = store
875 .service
876 .stats()
877 .map_or_else(Vec::new, |stats| stats.block_producer().collect_attempts());
878
879 let current_block_production_attempt = block_production_attempts.last().cloned();
880
881 let previous_block_production_attempt = block_production_attempts
882 .len()
883 .checked_sub(2)
884 .and_then(|idx| block_production_attempts.get(idx))
885 .cloned();
886
887 let network_info = RpcNodeStatusNetworkInfo {
888 bind_ip: "0.0.0.0".to_string(),
889 external_ip: state
890 .p2p
891 .config()
892 .external_addrs
893 .first()
894 .map(|addr| addr.to_string()),
895 client_port: state.config.client_port,
896 libp2p_port: state.p2p.config().libp2p_port,
897 };
898
899 let block_producer = state
900 .block_producer
901 .config()
902 .map(|config| AccountPublicKey::from(config.pub_key.clone()));
903 let coinbase_receiver = state
904 .block_producer
905 .config()
906 .map(|config| AccountPublicKey::from(config.coinbase_receiver().clone()));
907
908 let status = RpcNodeStatus {
909 chain_id,
910 block_producer,
911 coinbase_receiver,
912 transition_frontier: RpcNodeStatusTransitionFrontier {
913 best_tip: state.transition_frontier.best_tip().map(block_summary),
914 sync: RpcNodeStatusTransitionFrontierSync {
915 time: state.transition_frontier.sync.time(),
916 status: state.transition_frontier.sync.to_string(),
917 phase: state.transition_frontier.sync.sync_phase().to_string(),
918 target: state.transition_frontier.sync.best_tip().map(block_summary),
919 },
920 },
921 ledger: RpcNodeStatusLedger {
922 alive_masks_after_last_commit: state.ledger.alive_masks,
923 pending_writes: state
924 .ledger
925 .write
926 .pending_requests()
927 .map(|(req, time)| (req.kind(), time))
928 .collect(),
929 pending_reads: state
930 .ledger
931 .read
932 .pending_requests()
933 .map(|(id, req, time)| (id, req.kind(), time))
934 .collect(),
935 },
936 peers: rpc::collect_rpc_peers_info(state),
937 snark_pool: state
938 .snark_pool
939 .jobs_iter()
940 .fold(Default::default(), |mut acc, job| {
941 if job.snark.is_some() {
942 acc.snarks = acc.snarks.saturating_add(1);
943 }
944 acc.total_jobs = acc.total_jobs.saturating_add(1);
945 acc
946 }),
947 transaction_pool: RpcNodeStatusTransactionPool {
948 transactions: state.transaction_pool.size(),
949 transactions_for_propagation: state.transaction_pool.for_propagation_size(),
950 transaction_candidates: state.transaction_pool.candidates.transactions_count(),
951 },
952 current_block_production_attempt,
953 previous_block_production_attempt,
954 resources_status: RpcNodeStatusResources {
955 p2p_malloc_size: {
956 let mut set = BTreeSet::new();
957 let fun = move |ptr: *const c_void| !set.insert(ptr.addr());
958 let mut ops = MallocSizeOfOps::new(None, Some(Box::new(fun)));
959 size_of_val(&state.p2p).saturating_add(state.p2p.size_of(&mut ops))
960 },
961 transition_frontier: state.transition_frontier.resources_usage(),
962 snark_pool: state.snark_pool.resources_usage(),
963 },
964 service_queues: store.service.queues(),
965 network_info,
966 };
967 status
968}
969
970fn make_produced_block_info(
971 block: Option<ArcBlockWithHash>,
972) -> std::io::Result<Option<ProducedBlockInfo>> {
973 use base64::{engine::general_purpose::URL_SAFE, Engine as _};
974 use mina_p2p_messages::binprot::BinProtWrite;
975
976 let Some(block) = block else { return Ok(None) };
977
978 let height = block.height();
979 let global_slot = block.global_slot();
980 let hash = block.hash().to_string();
981 let mut buf = Vec::with_capacity(5 * 1024 * 1024);
982 v2::MinaBlockHeaderStableV2::binprot_write(block.header(), &mut buf)?;
983
984 let base64_encoded_header = URL_SAFE.encode(&buf);
985
986 Ok(Some(ProducedBlockInfo {
987 height,
988 global_slot,
989 hash,
990 base64_encoded_header,
991 }))
992}