mina_node/rpc_effectful/
rpc_effectful_effects.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    ffi::c_void,
4    time::Duration,
5};
6
7use super::{super::rpc, RpcEffectfulAction};
8use crate::{
9    block_producer::BlockProducerWonSlot,
10    external_snark_worker::available_job_to_snark_worker_spec,
11    p2p::{
12        channels::streaming_rpc::{
13            staged_ledger_parts::calc_total_pieces_to_transfer, P2pStreamingRpcReceiveProgress,
14        },
15        connection::P2pConnectionResponse,
16    },
17    p2p_ready,
18    rpc::{
19        AccountQuery, AccountSlim, ActionStatsQuery, ActionStatsResponse, CurrentMessageProgress,
20        MessagesStats, NodeHeartbeat, ProducedBlockInfo, RootLedgerSyncProgress,
21        RootStagedLedgerSyncProgress, RpcAction, RpcBlockProducerStats, RpcMessageProgressResponse,
22        RpcNodeStatus, RpcNodeStatusLedger, RpcNodeStatusNetworkInfo, RpcNodeStatusResources,
23        RpcNodeStatusTransactionPool, RpcNodeStatusTransitionFrontier,
24        RpcNodeStatusTransitionFrontierBlockSummary, RpcNodeStatusTransitionFrontierSync,
25        RpcRequestExtraData, RpcScanStateSummary, RpcScanStateSummaryBlock,
26        RpcScanStateSummaryBlockTransaction, RpcScanStateSummaryBlockTransactionKind,
27        RpcScanStateSummaryScanStateJob, RpcSnarkPoolJobFull, RpcSnarkPoolJobSnarkWork,
28        RpcSnarkPoolJobSummary, RpcSnarkerJobCommitResponse, RpcSnarkerJobSpecResponse,
29        RpcTransactionInjectResponse, TransactionStatus,
30    },
31    snark_pool::SnarkPoolAction,
32    transition_frontier::sync::{
33        ledger::TransitionFrontierSyncLedgerState, TransitionFrontierSyncState,
34    },
35    Service, Store,
36};
37use ledger::{
38    scan_state::currency::{Balance, Magnitude},
39    Account,
40};
41use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
42use mina_core::{block::ArcBlockWithHash, bug_condition};
43use mina_node_account::AccountPublicKey;
44use mina_p2p_messages::{rpc_kernel::QueryHeader, v2};
45use mina_signer::CompressedPubKey;
46use redux::ActionWithMeta;
47
48macro_rules! respond_or_log {
49    ($e:expr, $t:expr) => {
50        if let Err(err) = $e {
51            mina_core::log::warn!($t; "Failed to respond: {err}");
52        }
53    };
54}
55
56pub fn rpc_effects<S: Service>(store: &mut Store<S>, action: ActionWithMeta<RpcEffectfulAction>) {
57    let (action, meta) = action.split();
58
59    match action {
60        RpcEffectfulAction::GlobalStateGet { rpc_id, filter } => {
61            let _ = store
62                .service
63                .respond_state_get(rpc_id, (store.state.get(), filter.as_deref()));
64        }
65        RpcEffectfulAction::StatusGet { rpc_id } => {
66            let status = compute_node_status(store);
67            let _ = store.service.respond_status_get(rpc_id, Some(status));
68        }
69        RpcEffectfulAction::HeartbeatGet { rpc_id } => {
70            let status = compute_node_status(store);
71            let last_produced_block = store
72                .service
73                .stats()
74                .and_then(|stats| stats.block_producer().last_produced_block.take());
75
76            let last_produced_block_info = match make_produced_block_info(last_produced_block) {
77                Ok(data) => data,
78                Err(error) => {
79                    bug_condition!(
80                        "HeartbeatGet: Failed to encode block header, returning None: {error}"
81                    );
82                    None
83                }
84            };
85
86            let heartbeat = NodeHeartbeat {
87                status: status.into(),
88                node_timestamp: meta.time(),
89                peer_id: store.state().p2p.my_id(),
90                last_produced_block_info,
91            };
92            let response = store
93                .service()
94                .with_producer_keypair(move |sk| heartbeat.sign(sk));
95
96            let _ = store.service.respond_heartbeat_get(rpc_id, response);
97        }
98        RpcEffectfulAction::ActionStatsGet { rpc_id, query } => match query {
99            ActionStatsQuery::SinceStart => {
100                let resp = store
101                    .service
102                    .stats()
103                    .map(|s| s.collect_action_stats_since_start())
104                    .map(|stats| ActionStatsResponse::SinceStart { stats });
105                let _ = store.service.respond_action_stats_get(rpc_id, resp);
106            }
107            ActionStatsQuery::ForLatestBlock => {
108                let resp = store
109                    .service
110                    .stats()
111                    .and_then(|s| s.collect_action_stats_for_block_with_id(None))
112                    .map(ActionStatsResponse::ForBlock);
113                let _ = store.service.respond_action_stats_get(rpc_id, resp);
114            }
115            ActionStatsQuery::ForBlockWithId(id) => {
116                let resp = store
117                    .service
118                    .stats()
119                    .and_then(|s| s.collect_action_stats_for_block_with_id(Some(id)))
120                    .map(ActionStatsResponse::ForBlock);
121                let _ = store.service.respond_action_stats_get(rpc_id, resp);
122            }
123        },
124        RpcEffectfulAction::SyncStatsGet { rpc_id, query } => {
125            let resp = store
126                .service
127                .stats()
128                .map(|s| s.collect_sync_stats(query.limit));
129            let _ = store.service.respond_sync_stats_get(rpc_id, resp);
130        }
131        RpcEffectfulAction::BlockProducerStatsGet { rpc_id } => {
132            let mut create_response = || {
133                let state = store.state.get();
134                let best_tip = state.transition_frontier.best_tip()?;
135                let public_key = state.block_producer.config()?.pub_key.clone();
136                let won_slots = &state.block_producer.vrf_evaluator()?.won_slots;
137
138                let stats = store.service.stats()?;
139                let attempts = stats.block_producer().collect_attempts();
140                let future_slot = attempts.last().map_or(0, |v| {
141                    v.won_slot.global_slot.checked_add(1).expect("overflow")
142                });
143
144                let cur_global_slot = state.cur_global_slot();
145                let current_epoch = state.current_epoch();
146                let slots_per_epoch = best_tip.constants().slots_per_epoch.as_u32();
147                let epoch_start = cur_global_slot.map(|slot| {
148                    (slot.checked_div(slots_per_epoch).expect("division by 0"))
149                        .checked_mul(slots_per_epoch)
150                        .expect("overflow")
151                });
152
153                let current_epoch_vrf_stats = current_epoch
154                    .and_then(|epoch| stats.block_producer().vrf_evaluator.get(&epoch).cloned());
155                let vrf_stats = stats.block_producer().vrf_evaluator.clone();
156
157                Some(RpcBlockProducerStats {
158                    current_time: meta.time(),
159                    current_global_slot: cur_global_slot,
160                    current_epoch,
161                    current_epoch_vrf_stats,
162                    vrf_stats,
163                    epoch_start,
164                    epoch_end: epoch_start
165                        .map(|slot| slot.checked_add(slots_per_epoch).expect("overflow")),
166                    public_key: public_key.into(),
167                    attempts,
168                    future_won_slots: won_slots
169                        .range(future_slot..)
170                        .map(|(_, won_slot)| {
171                            let won_slot = BlockProducerWonSlot::from_vrf_won_slot(
172                                won_slot,
173                                best_tip.genesis_timestamp(),
174                            );
175                            (&won_slot).into()
176                        })
177                        .collect(),
178                })
179            };
180            let response = create_response();
181            let _ = store
182                .service
183                .respond_block_producer_stats_get(rpc_id, response);
184        }
185        RpcEffectfulAction::MessageProgressGet { rpc_id } => {
186            // TODO: move to stats
187            let p2p = p2p_ready!(store.state().p2p, meta.time());
188            let messages_stats = p2p
189                .network
190                .scheduler
191                .rpc_outgoing_streams
192                .iter()
193                .filter_map(|(peer_id, streams)| {
194                    let (_, rpc_state) = streams.first_key_value()?;
195                    let QueryHeader { tag: name, .. } = rpc_state.pending.clone()?;
196                    let name = name.to_string();
197                    let buffer = &rpc_state.buffer;
198                    let current_request = if buffer.len() < 8 {
199                        None
200                    } else {
201                        let received_bytes = buffer.len().saturating_sub(8);
202                        let total_bytes = u64::from_le_bytes(
203                            buffer
204                                .get(..8)
205                                .expect("slice with incorrect length")
206                                .try_into()
207                                .expect("cannot fail checked above"),
208                        ) as usize;
209                        Some(CurrentMessageProgress {
210                            name,
211                            received_bytes,
212                            total_bytes,
213                        })
214                    };
215
216                    Some((
217                        *peer_id,
218                        MessagesStats {
219                            current_request,
220                            responses: rpc_state
221                                .total_stats
222                                .iter()
223                                .map(|((name, _), count)| (name.to_string(), *count))
224                                .collect(),
225                        },
226                    ))
227                })
228                .collect();
229
230            let mut response = RpcMessageProgressResponse {
231                messages_stats,
232                staking_ledger_sync: None,
233                next_epoch_ledger_sync: None,
234                root_ledger_sync: None,
235            };
236
237            match &store.state().transition_frontier.sync {
238                TransitionFrontierSyncState::StakingLedgerPending(state) => {
239                    if let TransitionFrontierSyncLedgerState::Snarked(state) = &state.ledger {
240                        response.staking_ledger_sync = state.estimation()
241                    }
242                }
243                TransitionFrontierSyncState::NextEpochLedgerPending(state) => {
244                    if let TransitionFrontierSyncLedgerState::Snarked(state) = &state.ledger {
245                        response.next_epoch_ledger_sync = state.estimation()
246                    }
247                }
248                TransitionFrontierSyncState::RootLedgerPending(state) => match &state.ledger {
249                    TransitionFrontierSyncLedgerState::Snarked(state) => {
250                        response.root_ledger_sync =
251                            state.estimation().map(|data| RootLedgerSyncProgress {
252                                fetched: data.fetched,
253                                estimation: data.estimation,
254                                staged: None,
255                            });
256                    }
257                    TransitionFrontierSyncLedgerState::Staged(state) => {
258                        let unknown_staged_progress = || RootStagedLedgerSyncProgress {
259                            fetched: 0,
260                            total: 1,
261                        };
262                        let staged = match state.fetch_attempts() {
263                            None => state.target_with_parts().map(|(_, parts)| {
264                                let v = parts
265                                    .map(|parts| calc_total_pieces_to_transfer(parts))
266                                    .unwrap_or(0);
267                                RootStagedLedgerSyncProgress {
268                                    fetched: v,
269                                    total: v,
270                                }
271                            }),
272                            Some(attempts) => attempts
273                                .iter()
274                                .find(|(_, s)| s.fetch_pending_rpc_id().is_some())
275                                .map(|(id, _)| id)
276                                .and_then(|peer_id| store.state().p2p.get_ready_peer(peer_id))
277                                .map(|peer| {
278                                    match peer.channels.streaming_rpc.pending_local_rpc_progress() {
279                                        None => unknown_staged_progress(),
280                                        Some(
281                                            P2pStreamingRpcReceiveProgress::StagedLedgerParts(
282                                                progress,
283                                            ),
284                                        ) => {
285                                            let (fetched, total) = progress.progress();
286                                            RootStagedLedgerSyncProgress { fetched, total }
287                                        }
288                                    }
289                                }),
290                        };
291
292                        // We want to answer with a result that will serve as a 100% complete process for the
293                        // frontend while it is still waiting for the staged ledger to complete. Could be cleaner.
294                        response.root_ledger_sync = Some(RootLedgerSyncProgress {
295                            fetched: 1,
296                            estimation: 1,
297                            staged,
298                        });
299                    }
300                    _ => {}
301                },
302                _ => {}
303            }
304
305            let _ = store
306                .service
307                .respond_message_progress_stats_get(rpc_id, response);
308        }
309        RpcEffectfulAction::PeersGet { rpc_id, peers } => {
310            respond_or_log!(
311                store.service().respond_peers_get(rpc_id, peers),
312                meta.time()
313            );
314        }
315        RpcEffectfulAction::P2pConnectionOutgoingError { rpc_id, error } => {
316            let _ = store
317                .service
318                .respond_p2p_connection_outgoing(rpc_id, Err(error));
319
320            store.dispatch(RpcAction::Finish { rpc_id });
321        }
322        RpcEffectfulAction::P2pConnectionOutgoingSuccess { rpc_id } => {
323            let _ = store
324                .service
325                .respond_p2p_connection_outgoing(rpc_id, Ok(()));
326            store.dispatch(RpcAction::Finish { rpc_id });
327        }
328        RpcEffectfulAction::P2pConnectionIncomingRespond { rpc_id, response } => {
329            let error = match &response {
330                P2pConnectionResponse::Accepted(_) => None,
331                P2pConnectionResponse::Rejected(reason) => Some(format!("Rejected({:?})", reason)),
332                P2pConnectionResponse::SignalDecryptionFailed => {
333                    Some("RemoteSignalDecryptionFailed".to_owned())
334                }
335                P2pConnectionResponse::InternalError => Some("RemoteInternalError".to_owned()),
336            };
337            let _ = store
338                .service
339                .respond_p2p_connection_incoming_answer(rpc_id, response);
340            if let Some(error) = error {
341                store.dispatch(RpcAction::P2pConnectionIncomingError { rpc_id, error });
342            }
343        }
344        RpcEffectfulAction::P2pConnectionIncomingError { rpc_id, error } => {
345            let _ = store
346                .service
347                .respond_p2p_connection_incoming(rpc_id, Err(error));
348            store.dispatch(RpcAction::Finish { rpc_id });
349        }
350        RpcEffectfulAction::P2pConnectionIncomingSuccess { rpc_id } => {
351            let _ = store
352                .service
353                .respond_p2p_connection_incoming(rpc_id, Ok(()));
354            store.dispatch(RpcAction::Finish { rpc_id });
355        }
356        RpcEffectfulAction::ScanStateSummaryGetSuccess {
357            rpc_id,
358            mut scan_state,
359        } => {
360            let req = store.state().rpc.requests.get(&rpc_id);
361            let Some(block) = req.and_then(|req| match &req.data {
362                RpcRequestExtraData::FullBlockOpt(opt) => opt.as_ref(),
363                _ => None,
364            }) else {
365                let _ = store.service.respond_scan_state_summary_get(
366                    rpc_id,
367                    Err("target block not found".to_string()),
368                );
369                return;
370            };
371            let coinbases =
372                block
373                    .coinbase_fee_transfers_iter()
374                    .map(|_| RpcScanStateSummaryBlockTransaction {
375                        hash: None,
376                        kind: RpcScanStateSummaryBlockTransactionKind::Coinbase,
377                        status: v2::MinaBaseTransactionStatusStableV2::Applied,
378                    });
379            let block_summary = RpcScanStateSummaryBlock {
380                hash: block.hash().clone(),
381                height: block.height(),
382                global_slot: block.global_slot_since_genesis(),
383                transactions: block
384                    .commands_iter()
385                    .map(|tx| RpcScanStateSummaryBlockTransaction {
386                        hash: tx.data.hash().ok(),
387                        kind: (&tx.data).into(),
388                        status: tx.status.clone(),
389                    })
390                    .chain(coinbases)
391                    .collect(),
392                completed_works: block
393                    .completed_works_iter()
394                    .map(|work| (&work.proofs).into())
395                    .collect(),
396            };
397
398            let snark_pool = &store.state().snark_pool;
399            scan_state.iter_mut().flatten().flatten().for_each(|job| {
400                if let RpcScanStateSummaryScanStateJob::Todo {
401                    job_id,
402                    bundle_job_id,
403                    job: kind,
404                    seq_no,
405                } = job
406                {
407                    let Some(data) = snark_pool.get(bundle_job_id) else {
408                        return;
409                    };
410                    let commitment = data.commitment.clone().map(Box::new);
411                    let snark = data
412                        .snark
413                        .as_ref()
414                        .map(|snark| RpcSnarkPoolJobSnarkWork {
415                            snarker: snark.work.snarker.clone(),
416                            fee: snark.work.fee.clone(),
417                            received_t: snark.received_t,
418                            sender: snark.sender,
419                        })
420                        .map(Box::new);
421
422                    if commitment.is_none() && snark.is_none() {
423                        return;
424                    }
425                    *job = RpcScanStateSummaryScanStateJob::Pending {
426                        job_id: job_id.clone(),
427                        bundle_job_id: bundle_job_id.clone(),
428                        job: Box::new(kind.clone()),
429                        seq_no: *seq_no,
430                        commitment,
431                        snark,
432                    };
433                }
434            });
435            let res = scan_state.map(|scan_state| RpcScanStateSummary {
436                block: block_summary,
437                scan_state,
438            });
439            let _ = store.service.respond_scan_state_summary_get(rpc_id, res);
440        }
441        RpcEffectfulAction::SnarkPoolAvailableJobsGet { rpc_id } => {
442            let resp = store
443                .state()
444                .snark_pool
445                .range(..)
446                .map(|(_, job)| RpcSnarkPoolJobSummary {
447                    time: job.time,
448                    id: job.id.clone(),
449                    commitment: job.commitment.clone(),
450                    snark: job.snark.as_ref().map(|snark| RpcSnarkPoolJobSnarkWork {
451                        snarker: snark.work.snarker.clone(),
452                        fee: snark.work.fee.clone(),
453                        received_t: snark.received_t,
454                        sender: snark.sender,
455                    }),
456                })
457                .collect::<Vec<_>>();
458            let _ = store.service().respond_snark_pool_get(rpc_id, resp);
459        }
460        RpcEffectfulAction::SnarkPoolJobGet { job_id, rpc_id } => {
461            let resp = store.state().snark_pool.range(..).find_map(|(_, job)| {
462                if job.id == job_id {
463                    Some(RpcSnarkPoolJobFull {
464                        time: job.time,
465                        id: job.id.clone(),
466                        job: job.job.clone(),
467                        commitment: job.commitment.clone(),
468                        snark: job.snark.as_ref().map(|snark| RpcSnarkPoolJobSnarkWork {
469                            snarker: snark.work.snarker.clone(),
470                            fee: snark.work.fee.clone(),
471                            received_t: snark.received_t,
472                            sender: snark.sender,
473                        }),
474                    })
475                } else {
476                    None
477                }
478            });
479            let _ = store.service().respond_snark_pool_job_get(rpc_id, resp);
480        }
481        RpcEffectfulAction::SnarkPoolCompletedJobsGet { rpc_id, jobs } => {
482            respond_or_log!(
483                store
484                    .service()
485                    .respond_snark_pool_completed_jobs_get(rpc_id, jobs),
486                meta.time()
487            );
488        }
489        RpcEffectfulAction::SnarkPoolPendingJobsGet { rpc_id, jobs } => {
490            respond_or_log!(
491                store
492                    .service()
493                    .respond_snark_pool_pending_jobs_get(rpc_id, jobs),
494                meta.time()
495            );
496        }
497        RpcEffectfulAction::SnarkerConfigGet { rpc_id, config } => {
498            let _ = store.service().respond_snarker_config_get(rpc_id, config);
499        }
500        RpcEffectfulAction::SnarkerJobCommit { rpc_id, job_id } => {
501            if !store.state().snark_pool.should_create_commitment(&job_id) {
502                let _ = store
503                    .service()
504                    .respond_snarker_job_commit(rpc_id, RpcSnarkerJobCommitResponse::JobNotFound);
505                // TODO(binier): differentiate between job not found and job already taken.
506                return;
507            }
508            if !store.state().external_snark_worker.has_idle() {
509                let _ = store
510                    .service()
511                    .respond_snarker_job_commit(rpc_id, RpcSnarkerJobCommitResponse::SnarkerBusy);
512                return;
513            }
514            if store
515                .service()
516                .respond_snarker_job_commit(rpc_id, RpcSnarkerJobCommitResponse::Ok)
517                .is_err()
518            {
519                return;
520            }
521            store.dispatch(SnarkPoolAction::CommitmentCreate { job_id });
522        }
523        RpcEffectfulAction::SnarkerJobSpec { rpc_id, job_id } => {
524            let Some(job) = store.state().snark_pool.get(&job_id) else {
525                if store
526                    .service()
527                    .respond_snarker_job_spec(rpc_id, RpcSnarkerJobSpecResponse::JobNotFound)
528                    .is_err()
529                {
530                    return;
531                }
532                return;
533            };
534            let input = available_job_to_snark_worker_spec(
535                job.job.clone(),
536                &store.state().transition_frontier,
537            );
538            // TODO(binier): maybe don't require snarker to be enabled here.
539            let Some(config) = store.state.get().config.snarker.as_ref() else {
540                return;
541            };
542            let public_key = config.public_key.clone().into();
543            let fee = config.fee.clone();
544            let input = match input {
545                    Ok(instances) => RpcSnarkerJobSpecResponse::Ok(
546                        Box::new(mina_p2p_messages::v2::SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponse(Some((
547                            mina_p2p_messages::v2::SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponseA0 {
548                                instances,
549                                fee,
550                            },
551                            public_key,
552                        )))
553                    )),
554                    Err(err) => RpcSnarkerJobSpecResponse::Err(err),
555                };
556
557            // TODO: handle potential errors
558            let _ = store.service().respond_snarker_job_spec(rpc_id, input);
559        }
560        RpcEffectfulAction::SnarkerWorkersGet {
561            rpc_id,
562            snark_worker,
563        } => {
564            // TODO: handle potential errors
565            let _ = store
566                .service()
567                .respond_snarker_workers(rpc_id, vec![snark_worker.into()]);
568        }
569        RpcEffectfulAction::HealthCheck { rpc_id, has_peers } => {
570            respond_or_log!(
571                store.service().respond_health_check(rpc_id, has_peers),
572                meta.time()
573            );
574        }
575        RpcEffectfulAction::ReadinessCheck { rpc_id } => {
576            const THRESH: Duration = Duration::from_secs(60 * 3 * 10);
577            let synced = match store.state().transition_frontier.sync {
578                TransitionFrontierSyncState::Synced { time }
579                    if meta.time().checked_sub(time) <= Some(THRESH) =>
580                {
581                    Ok(())
582                }
583                TransitionFrontierSyncState::Synced { time } => Err(format!(
584                    "Synced {:?} ago, which is more than the threshold {:?}",
585                    meta.time().checked_sub(time),
586                    THRESH
587                )),
588                _ => Err("not synced".to_owned()),
589            };
590            // let synced = store
591            //     .service()
592            //     .stats()
593            //     .and_then(|stats| stats.get_sync_time())
594            //     .ok_or_else(|| String::from("Not synced"))
595            //     .and_then(|t| {
596            //         meta.time().checked_sub(t).ok_or_else(|| {
597            //             format!("Cannot get duration between {t:?} and {:?}", meta.time())
598            //         })
599            //     })
600            //     .and_then(|dur| {
601            //         const THRESH: Duration = Duration::from_secs(60 * 3 * 10);
602            //         if dur <= THRESH {
603            //             Ok(())
604            //         } else {
605            //             Err(format!(
606            //                 "Synced {:?} ago, which is more than the threshold {:?}",
607            //                 dur, THRESH
608            //             ))
609            //         }
610            //     });
611            // mina_core::log::debug!(meta.time(); summary = "readiness check", result = format!("{synced:?}"));
612            respond_or_log!(
613                store.service().respond_readiness_check(rpc_id, synced),
614                meta.time()
615            );
616        }
617        RpcEffectfulAction::DiscoveryRoutingTable { rpc_id, response } => {
618            respond_or_log!(
619                store
620                    .service()
621                    .respond_discovery_routing_table(rpc_id, response),
622                meta.time()
623            );
624        }
625        RpcEffectfulAction::DiscoveryBoostrapStats { rpc_id, response } => {
626            respond_or_log!(
627                store
628                    .service()
629                    .respond_discovery_bootstrap_stats(rpc_id, response),
630                meta.time()
631            );
632        }
633        RpcEffectfulAction::TransactionPool { rpc_id, response } => {
634            respond_or_log!(
635                store.service().respond_transaction_pool(rpc_id, response),
636                meta.time()
637            )
638        }
639        RpcEffectfulAction::LedgerAccountsGetSuccess {
640            rpc_id,
641            accounts,
642            account_query,
643        } => {
644            // Is this todo still relevant?
645            // TODO(adonagy): maybe something more effective?
646            match account_query {
647                // all the accounts for the FE in Slim form
648                AccountQuery::All => {
649                    let mut accounts: BTreeMap<CompressedPubKey, Account> = accounts
650                        .into_iter()
651                        .map(|acc| (acc.public_key.clone(), acc))
652                        .collect();
653                    let nonces_and_amount = store
654                        .state()
655                        .transaction_pool
656                        .get_pending_amount_and_nonce();
657
658                    nonces_and_amount
659                        .iter()
660                        .for_each(|(account_id, (nonce, amount))| {
661                            if let Some(account) = accounts.get_mut(&account_id.public_key) {
662                                if let Some(nonce) = nonce {
663                                    if nonce >= &account.nonce {
664                                        // increment the last nonce in the pool
665                                        account.nonce = nonce.incr();
666                                    }
667                                }
668                                account.balance = account
669                                    .balance
670                                    .sub_amount(*amount)
671                                    .unwrap_or(Balance::zero());
672                            }
673                        });
674
675                    let accounts = accounts
676                        .into_values()
677                        .map(|v| v.into())
678                        .collect::<Vec<AccountSlim>>();
679
680                    respond_or_log!(
681                        store
682                            .service()
683                            .respond_ledger_slim_accounts(rpc_id, accounts),
684                        meta.time()
685                    )
686                }
687                // for the graphql endpoint
688                AccountQuery::SinglePublicKey(..) | AccountQuery::PubKeyWithTokenId(..) => {
689                    respond_or_log!(
690                        store.service().respond_ledger_accounts(rpc_id, accounts),
691                        meta.time()
692                    )
693                }
694                AccountQuery::MultipleIds(..) => {
695                    respond_or_log!(
696                        store.service().respond_ledger_accounts(rpc_id, accounts),
697                        meta.time()
698                    )
699                }
700            }
701        }
702        RpcEffectfulAction::TransactionInjectSuccess { rpc_id, response } => {
703            respond_or_log!(
704                store.service().respond_transaction_inject(
705                    rpc_id,
706                    RpcTransactionInjectResponse::Success(response)
707                ),
708                meta.time()
709            )
710        }
711        RpcEffectfulAction::TransactionInjectRejected { rpc_id, response } => {
712            respond_or_log!(
713                store.service().respond_transaction_inject(
714                    rpc_id,
715                    RpcTransactionInjectResponse::Rejected(response)
716                ),
717                meta.time()
718            )
719        }
720        RpcEffectfulAction::TransactionInjectFailure { rpc_id, errors } => {
721            respond_or_log!(
722                store.service().respond_transaction_inject(
723                    rpc_id,
724                    RpcTransactionInjectResponse::Failure(errors)
725                ),
726                meta.time()
727            )
728        }
729        RpcEffectfulAction::TransitionFrontierUserCommandsGet { rpc_id, commands } => {
730            respond_or_log!(
731                store
732                    .service()
733                    .respond_transition_frontier_commands(rpc_id, commands),
734                meta.time()
735            )
736        }
737        RpcEffectfulAction::BestChain { rpc_id, best_chain } => {
738            respond_or_log!(
739                store.service().respond_best_chain(rpc_id, best_chain),
740                meta.time()
741            )
742        }
743        RpcEffectfulAction::ConsensusConstantsGet { rpc_id, response } => {
744            respond_or_log!(
745                store
746                    .service()
747                    .respond_consensus_constants(rpc_id, response),
748                meta.time()
749            )
750        }
751        RpcEffectfulAction::TransactionStatusGet { rpc_id, tx } => {
752            // Check if the transaction is in the pool, if it is, return PENDING
753            let tx_hash = tx.hash().ok();
754
755            let in_tx_pool = store
756                .state()
757                .transaction_pool
758                .get_all_transactions()
759                .iter()
760                .any(|tx_with_hash| Some(&tx_with_hash.hash) == tx_hash.as_ref());
761
762            if in_tx_pool {
763                respond_or_log!(
764                    store
765                        .service()
766                        .respond_transaction_status(rpc_id, TransactionStatus::Pending),
767                    meta.time()
768                );
769                return;
770            }
771
772            let in_transition_frontier = if let Some(hash) = tx_hash {
773                store
774                    .state()
775                    .transition_frontier
776                    .contains_transaction(&hash)
777            } else {
778                false
779            };
780
781            // Check whether the transaction is in the transition frontier, if it is, return INCLUDED
782            if in_transition_frontier {
783                respond_or_log!(
784                    store
785                        .service()
786                        .respond_transaction_status(rpc_id, TransactionStatus::Included),
787                    meta.time()
788                )
789            // Otherwise, return UNKNOWN
790            } else {
791                respond_or_log!(
792                    store
793                        .service()
794                        .respond_transaction_status(rpc_id, TransactionStatus::Unknown),
795                    meta.time()
796                )
797            }
798        }
799        RpcEffectfulAction::BlockGet { rpc_id, block } => {
800            respond_or_log!(
801                store.service().respond_block_get(rpc_id, block),
802                meta.time()
803            )
804        }
805
806        RpcEffectfulAction::PooledUserCommands {
807            rpc_id,
808            user_commands,
809        } => {
810            respond_or_log!(
811                store
812                    .service()
813                    .respond_pooled_user_commands(rpc_id, user_commands),
814                meta.time()
815            )
816        }
817
818        RpcEffectfulAction::PooledZkappCommands {
819            rpc_id,
820            zkapp_commands,
821        } => {
822            respond_or_log!(
823                store
824                    .service()
825                    .respond_pooled_zkapp_commands(rpc_id, zkapp_commands),
826                meta.time()
827            )
828        }
829        RpcEffectfulAction::GenesisBlock {
830            rpc_id,
831            genesis_block,
832        } => {
833            respond_or_log!(
834                store.service().respond_genesis_block(rpc_id, genesis_block),
835                meta.time()
836            )
837        }
838
839        RpcEffectfulAction::ConsensusTimeGet {
840            rpc_id,
841            consensus_time,
842        } => {
843            respond_or_log!(
844                store
845                    .service()
846                    .respond_consensus_time_get(rpc_id, consensus_time),
847                meta.time()
848            )
849        }
850        RpcEffectfulAction::LedgerStatusGetSuccess { rpc_id, response } => {
851            respond_or_log!(
852                store.service().respond_ledger_status_get(rpc_id, response),
853                meta.time()
854            )
855        }
856        RpcEffectfulAction::LedgerAccountDelegatorsGetSuccess { rpc_id, response } => {
857            respond_or_log!(
858                store
859                    .service()
860                    .respond_ledger_account_delegators_get(rpc_id, response),
861                meta.time()
862            )
863        }
864    }
865}
866
867fn compute_node_status<S: Service>(store: &mut Store<S>) -> RpcNodeStatus {
868    let state = store.state.get();
869    let chain_id = state.p2p.ready().map(|p2p| p2p.chain_id.to_hex());
870    let block_summary = |b: &ArcBlockWithHash| RpcNodeStatusTransitionFrontierBlockSummary {
871        hash: b.hash().clone(),
872        height: b.height(),
873        global_slot: b.global_slot(),
874    };
875
876    let block_production_attempts = store
877        .service
878        .stats()
879        .map_or_else(Vec::new, |stats| stats.block_producer().collect_attempts());
880
881    let current_block_production_attempt = block_production_attempts.last().cloned();
882
883    let previous_block_production_attempt = block_production_attempts
884        .len()
885        .checked_sub(2)
886        .and_then(|idx| block_production_attempts.get(idx))
887        .cloned();
888
889    let network_info = RpcNodeStatusNetworkInfo {
890        bind_ip: "0.0.0.0".to_string(),
891        external_ip: state
892            .p2p
893            .config()
894            .external_addrs
895            .first()
896            .map(|addr| addr.to_string()),
897        client_port: state.config.client_port,
898        libp2p_port: state.p2p.config().libp2p_port,
899    };
900
901    let block_producer = state
902        .block_producer
903        .config()
904        .map(|config| AccountPublicKey::from(config.pub_key.clone()));
905    let coinbase_receiver = state
906        .block_producer
907        .config()
908        .map(|config| AccountPublicKey::from(config.coinbase_receiver().clone()));
909
910    let status = RpcNodeStatus {
911        chain_id,
912        block_producer,
913        coinbase_receiver,
914        transition_frontier: RpcNodeStatusTransitionFrontier {
915            best_tip: state.transition_frontier.best_tip().map(block_summary),
916            sync: RpcNodeStatusTransitionFrontierSync {
917                time: state.transition_frontier.sync.time(),
918                status: state.transition_frontier.sync.to_string(),
919                phase: state.transition_frontier.sync.sync_phase().to_string(),
920                target: state.transition_frontier.sync.best_tip().map(block_summary),
921            },
922        },
923        ledger: RpcNodeStatusLedger {
924            alive_masks_after_last_commit: state.ledger.alive_masks,
925            pending_writes: state
926                .ledger
927                .write
928                .pending_requests()
929                .map(|(req, time)| (req.kind(), time))
930                .collect(),
931            pending_reads: state
932                .ledger
933                .read
934                .pending_requests()
935                .map(|(id, req, time)| (id, req.kind(), time))
936                .collect(),
937        },
938        peers: rpc::collect_rpc_peers_info(state),
939        snark_pool: state
940            .snark_pool
941            .jobs_iter()
942            .fold(Default::default(), |mut acc, job| {
943                if job.snark.is_some() {
944                    acc.snarks = acc.snarks.saturating_add(1);
945                }
946                acc.total_jobs = acc.total_jobs.saturating_add(1);
947                acc
948            }),
949        transaction_pool: RpcNodeStatusTransactionPool {
950            transactions: state.transaction_pool.size(),
951            transactions_for_propagation: state.transaction_pool.for_propagation_size(),
952            transaction_candidates: state.transaction_pool.candidates.transactions_count(),
953        },
954        current_block_production_attempt,
955        previous_block_production_attempt,
956        resources_status: RpcNodeStatusResources {
957            p2p_malloc_size: {
958                let mut set = BTreeSet::new();
959                let fun = move |ptr: *const c_void| !set.insert(ptr.addr());
960                let mut ops = MallocSizeOfOps::new(None, Some(Box::new(fun)));
961                size_of_val(&state.p2p).saturating_add(state.p2p.size_of(&mut ops))
962            },
963            transition_frontier: state.transition_frontier.resources_usage(),
964            snark_pool: state.snark_pool.resources_usage(),
965        },
966        service_queues: store.service.queues(),
967        network_info,
968    };
969    status
970}
971
972fn make_produced_block_info(
973    block: Option<ArcBlockWithHash>,
974) -> std::io::Result<Option<ProducedBlockInfo>> {
975    use base64::{engine::general_purpose::URL_SAFE, Engine as _};
976    use mina_p2p_messages::binprot::BinProtWrite;
977
978    let Some(block) = block else { return Ok(None) };
979
980    let height = block.height();
981    let global_slot = block.global_slot();
982    let hash = block.hash().to_string();
983    let mut buf = Vec::with_capacity(5 * 1024 * 1024);
984    v2::MinaBlockHeaderStableV2::binprot_write(block.header(), &mut buf)?;
985
986    let base64_encoded_header = URL_SAFE.encode(&buf);
987
988    Ok(Some(ProducedBlockInfo {
989        height,
990        global_slot,
991        hash,
992        base64_encoded_header,
993    }))
994}