node/rpc_effectful/
rpc_effectful_effects.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    ffi::c_void,
4    time::Duration,
5};
6
7use super::{super::rpc, RpcEffectfulAction};
8use crate::{
9    block_producer::BlockProducerWonSlot,
10    external_snark_worker::available_job_to_snark_worker_spec,
11    p2p::connection::P2pConnectionResponse,
12    p2p_ready,
13    rpc::{
14        AccountQuery, AccountSlim, ActionStatsQuery, ActionStatsResponse, CurrentMessageProgress,
15        MessagesStats, NodeHeartbeat, ProducedBlockInfo, RootLedgerSyncProgress,
16        RootStagedLedgerSyncProgress, RpcAction, RpcBlockProducerStats, RpcMessageProgressResponse,
17        RpcNodeStatus, RpcNodeStatusLedger, RpcNodeStatusNetworkInfo, RpcNodeStatusResources,
18        RpcNodeStatusTransactionPool, RpcNodeStatusTransitionFrontier,
19        RpcNodeStatusTransitionFrontierBlockSummary, RpcNodeStatusTransitionFrontierSync,
20        RpcRequestExtraData, RpcScanStateSummary, RpcScanStateSummaryBlock,
21        RpcScanStateSummaryBlockTransaction, RpcScanStateSummaryBlockTransactionKind,
22        RpcScanStateSummaryScanStateJob, RpcSnarkPoolJobFull, RpcSnarkPoolJobSnarkWork,
23        RpcSnarkPoolJobSummary, RpcSnarkerJobCommitResponse, RpcSnarkerJobSpecResponse,
24        RpcTransactionInjectResponse, TransactionStatus,
25    },
26    snark_pool::SnarkPoolAction,
27    transition_frontier::sync::{
28        ledger::TransitionFrontierSyncLedgerState, TransitionFrontierSyncState,
29    },
30    Service, Store,
31};
32use ledger::{
33    scan_state::currency::{Balance, Magnitude},
34    Account,
35};
36use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
37use mina_p2p_messages::{rpc_kernel::QueryHeader, v2};
38use mina_signer::CompressedPubKey;
39use openmina_core::{block::ArcBlockWithHash, bug_condition};
40use openmina_node_account::AccountPublicKey;
41use p2p::channels::streaming_rpc::{
42    staged_ledger_parts::calc_total_pieces_to_transfer, P2pStreamingRpcReceiveProgress,
43};
44use redux::ActionWithMeta;
45
46macro_rules! respond_or_log {
47    ($e:expr, $t:expr) => {
48        if let Err(err) = $e {
49            openmina_core::log::warn!($t; "Failed to respond: {err}");
50        }
51    };
52}
53
54pub fn rpc_effects<S: Service>(store: &mut Store<S>, action: ActionWithMeta<RpcEffectfulAction>) {
55    let (action, meta) = action.split();
56
57    match action {
58        RpcEffectfulAction::GlobalStateGet { rpc_id, filter } => {
59            let _ = store
60                .service
61                .respond_state_get(rpc_id, (store.state.get(), filter.as_deref()));
62        }
63        RpcEffectfulAction::StatusGet { rpc_id } => {
64            let status = compute_node_status(store);
65            let _ = store.service.respond_status_get(rpc_id, Some(status));
66        }
67        RpcEffectfulAction::HeartbeatGet { rpc_id } => {
68            let status = compute_node_status(store);
69            let last_produced_block = store
70                .service
71                .stats()
72                .and_then(|stats| stats.block_producer().last_produced_block.take());
73
74            let last_produced_block_info = match make_produced_block_info(last_produced_block) {
75                Ok(data) => data,
76                Err(error) => {
77                    bug_condition!(
78                        "HeartbeatGet: Failed to encode block header, returning None: {error}"
79                    );
80                    None
81                }
82            };
83
84            let heartbeat = NodeHeartbeat {
85                status: status.into(),
86                node_timestamp: meta.time(),
87                peer_id: store.state().p2p.my_id(),
88                last_produced_block_info,
89            };
90            let response = store
91                .service()
92                .with_producer_keypair(move |sk| heartbeat.sign(sk));
93
94            let _ = store.service.respond_heartbeat_get(rpc_id, response);
95        }
96        RpcEffectfulAction::ActionStatsGet { rpc_id, query } => match query {
97            ActionStatsQuery::SinceStart => {
98                let resp = store
99                    .service
100                    .stats()
101                    .map(|s| s.collect_action_stats_since_start())
102                    .map(|stats| ActionStatsResponse::SinceStart { stats });
103                let _ = store.service.respond_action_stats_get(rpc_id, resp);
104            }
105            ActionStatsQuery::ForLatestBlock => {
106                let resp = store
107                    .service
108                    .stats()
109                    .and_then(|s| s.collect_action_stats_for_block_with_id(None))
110                    .map(ActionStatsResponse::ForBlock);
111                let _ = store.service.respond_action_stats_get(rpc_id, resp);
112            }
113            ActionStatsQuery::ForBlockWithId(id) => {
114                let resp = store
115                    .service
116                    .stats()
117                    .and_then(|s| s.collect_action_stats_for_block_with_id(Some(id)))
118                    .map(ActionStatsResponse::ForBlock);
119                let _ = store.service.respond_action_stats_get(rpc_id, resp);
120            }
121        },
122        RpcEffectfulAction::SyncStatsGet { rpc_id, query } => {
123            let resp = store
124                .service
125                .stats()
126                .map(|s| s.collect_sync_stats(query.limit));
127            let _ = store.service.respond_sync_stats_get(rpc_id, resp);
128        }
129        RpcEffectfulAction::BlockProducerStatsGet { rpc_id } => {
130            let mut create_response = || {
131                let state = store.state.get();
132                let best_tip = state.transition_frontier.best_tip()?;
133                let public_key = state.block_producer.config()?.pub_key.clone();
134                let won_slots = &state.block_producer.vrf_evaluator()?.won_slots;
135
136                let stats = store.service.stats()?;
137                let attempts = stats.block_producer().collect_attempts();
138                let future_slot = attempts.last().map_or(0, |v| {
139                    v.won_slot.global_slot.checked_add(1).expect("overflow")
140                });
141
142                let cur_global_slot = state.cur_global_slot();
143                let current_epoch = state.current_epoch();
144                let slots_per_epoch = best_tip.constants().slots_per_epoch.as_u32();
145                let epoch_start = cur_global_slot.map(|slot| {
146                    (slot.checked_div(slots_per_epoch).expect("division by 0"))
147                        .checked_mul(slots_per_epoch)
148                        .expect("overflow")
149                });
150
151                let current_epoch_vrf_stats = current_epoch
152                    .and_then(|epoch| stats.block_producer().vrf_evaluator.get(&epoch).cloned());
153                let vrf_stats = stats.block_producer().vrf_evaluator.clone();
154
155                Some(RpcBlockProducerStats {
156                    current_time: meta.time(),
157                    current_global_slot: cur_global_slot,
158                    current_epoch,
159                    current_epoch_vrf_stats,
160                    vrf_stats,
161                    epoch_start,
162                    epoch_end: epoch_start
163                        .map(|slot| slot.checked_add(slots_per_epoch).expect("overflow")),
164                    public_key: public_key.into(),
165                    attempts,
166                    future_won_slots: won_slots
167                        .range(future_slot..)
168                        .map(|(_, won_slot)| {
169                            let won_slot = BlockProducerWonSlot::from_vrf_won_slot(
170                                won_slot,
171                                best_tip.genesis_timestamp(),
172                            );
173                            (&won_slot).into()
174                        })
175                        .collect(),
176                })
177            };
178            let response = create_response();
179            let _ = store
180                .service
181                .respond_block_producer_stats_get(rpc_id, response);
182        }
183        RpcEffectfulAction::MessageProgressGet { rpc_id } => {
184            // TODO: move to stats
185            let p2p = p2p_ready!(store.state().p2p, meta.time());
186            let messages_stats = p2p
187                .network
188                .scheduler
189                .rpc_outgoing_streams
190                .iter()
191                .filter_map(|(peer_id, streams)| {
192                    let (_, rpc_state) = streams.first_key_value()?;
193                    let QueryHeader { tag: name, .. } = rpc_state.pending.clone()?;
194                    let name = name.to_string();
195                    let buffer = &rpc_state.buffer;
196                    let current_request = if buffer.len() < 8 {
197                        None
198                    } else {
199                        let received_bytes = buffer.len().saturating_sub(8);
200                        let total_bytes = u64::from_le_bytes(
201                            buffer
202                                .get(..8)
203                                .expect("slice with incorrect length")
204                                .try_into()
205                                .expect("cannot fail checked above"),
206                        ) as usize;
207                        Some(CurrentMessageProgress {
208                            name,
209                            received_bytes,
210                            total_bytes,
211                        })
212                    };
213
214                    Some((
215                        *peer_id,
216                        MessagesStats {
217                            current_request,
218                            responses: rpc_state
219                                .total_stats
220                                .iter()
221                                .map(|((name, _), count)| (name.to_string(), *count))
222                                .collect(),
223                        },
224                    ))
225                })
226                .collect();
227
228            let mut response = RpcMessageProgressResponse {
229                messages_stats,
230                staking_ledger_sync: None,
231                next_epoch_ledger_sync: None,
232                root_ledger_sync: None,
233            };
234
235            match &store.state().transition_frontier.sync {
236                TransitionFrontierSyncState::StakingLedgerPending(state) => {
237                    if let TransitionFrontierSyncLedgerState::Snarked(state) = &state.ledger {
238                        response.staking_ledger_sync = state.estimation()
239                    }
240                }
241                TransitionFrontierSyncState::NextEpochLedgerPending(state) => {
242                    if let TransitionFrontierSyncLedgerState::Snarked(state) = &state.ledger {
243                        response.next_epoch_ledger_sync = state.estimation()
244                    }
245                }
246                TransitionFrontierSyncState::RootLedgerPending(state) => match &state.ledger {
247                    TransitionFrontierSyncLedgerState::Snarked(state) => {
248                        response.root_ledger_sync =
249                            state.estimation().map(|data| RootLedgerSyncProgress {
250                                fetched: data.fetched,
251                                estimation: data.estimation,
252                                staged: None,
253                            });
254                    }
255                    TransitionFrontierSyncLedgerState::Staged(state) => {
256                        let unknown_staged_progress = || RootStagedLedgerSyncProgress {
257                            fetched: 0,
258                            total: 1,
259                        };
260                        let staged = match state.fetch_attempts() {
261                            None => state.target_with_parts().map(|(_, parts)| {
262                                let v = parts
263                                    .map(|parts| calc_total_pieces_to_transfer(parts))
264                                    .unwrap_or(0);
265                                RootStagedLedgerSyncProgress {
266                                    fetched: v,
267                                    total: v,
268                                }
269                            }),
270                            Some(attempts) => attempts
271                                .iter()
272                                .find(|(_, s)| s.fetch_pending_rpc_id().is_some())
273                                .map(|(id, _)| id)
274                                .and_then(|peer_id| store.state().p2p.get_ready_peer(peer_id))
275                                .map(|peer| {
276                                    match peer.channels.streaming_rpc.pending_local_rpc_progress() {
277                                        None => unknown_staged_progress(),
278                                        Some(
279                                            P2pStreamingRpcReceiveProgress::StagedLedgerParts(
280                                                progress,
281                                            ),
282                                        ) => {
283                                            let (fetched, total) = progress.progress();
284                                            RootStagedLedgerSyncProgress { fetched, total }
285                                        }
286                                    }
287                                }),
288                        };
289
290                        // We want to answer with a result that will serve as a 100% complete process for the
291                        // frontend while it is still waiting for the staged ledger to complete. Could be cleaner.
292                        response.root_ledger_sync = Some(RootLedgerSyncProgress {
293                            fetched: 1,
294                            estimation: 1,
295                            staged,
296                        });
297                    }
298                    _ => {}
299                },
300                _ => {}
301            }
302
303            let _ = store
304                .service
305                .respond_message_progress_stats_get(rpc_id, response);
306        }
307        RpcEffectfulAction::PeersGet { rpc_id, peers } => {
308            respond_or_log!(
309                store.service().respond_peers_get(rpc_id, peers),
310                meta.time()
311            );
312        }
313        RpcEffectfulAction::P2pConnectionOutgoingError { rpc_id, error } => {
314            let _ = store
315                .service
316                .respond_p2p_connection_outgoing(rpc_id, Err(error));
317
318            store.dispatch(RpcAction::Finish { rpc_id });
319        }
320        RpcEffectfulAction::P2pConnectionOutgoingSuccess { rpc_id } => {
321            let _ = store
322                .service
323                .respond_p2p_connection_outgoing(rpc_id, Ok(()));
324            store.dispatch(RpcAction::Finish { rpc_id });
325        }
326        RpcEffectfulAction::P2pConnectionIncomingRespond { rpc_id, response } => {
327            let error = match &response {
328                P2pConnectionResponse::Accepted(_) => None,
329                P2pConnectionResponse::Rejected(reason) => Some(format!("Rejected({:?})", reason)),
330                P2pConnectionResponse::SignalDecryptionFailed => {
331                    Some("RemoteSignalDecryptionFailed".to_owned())
332                }
333                P2pConnectionResponse::InternalError => Some("RemoteInternalError".to_owned()),
334            };
335            let _ = store
336                .service
337                .respond_p2p_connection_incoming_answer(rpc_id, response);
338            if let Some(error) = error {
339                store.dispatch(RpcAction::P2pConnectionIncomingError { rpc_id, error });
340            }
341        }
342        RpcEffectfulAction::P2pConnectionIncomingError { rpc_id, error } => {
343            let _ = store
344                .service
345                .respond_p2p_connection_incoming(rpc_id, Err(error));
346            store.dispatch(RpcAction::Finish { rpc_id });
347        }
348        RpcEffectfulAction::P2pConnectionIncomingSuccess { rpc_id } => {
349            let _ = store
350                .service
351                .respond_p2p_connection_incoming(rpc_id, Ok(()));
352            store.dispatch(RpcAction::Finish { rpc_id });
353        }
354        RpcEffectfulAction::ScanStateSummaryGetSuccess {
355            rpc_id,
356            mut scan_state,
357        } => {
358            let req = store.state().rpc.requests.get(&rpc_id);
359            let Some(block) = req.and_then(|req| match &req.data {
360                RpcRequestExtraData::FullBlockOpt(opt) => opt.as_ref(),
361                _ => None,
362            }) else {
363                let _ = store.service.respond_scan_state_summary_get(
364                    rpc_id,
365                    Err("target block not found".to_string()),
366                );
367                return;
368            };
369            let coinbases =
370                block
371                    .coinbase_fee_transfers_iter()
372                    .map(|_| RpcScanStateSummaryBlockTransaction {
373                        hash: None,
374                        kind: RpcScanStateSummaryBlockTransactionKind::Coinbase,
375                        status: v2::MinaBaseTransactionStatusStableV2::Applied,
376                    });
377            let block_summary = RpcScanStateSummaryBlock {
378                hash: block.hash().clone(),
379                height: block.height(),
380                global_slot: block.global_slot_since_genesis(),
381                transactions: block
382                    .commands_iter()
383                    .map(|tx| RpcScanStateSummaryBlockTransaction {
384                        hash: tx.data.hash().ok(),
385                        kind: (&tx.data).into(),
386                        status: tx.status.clone(),
387                    })
388                    .chain(coinbases)
389                    .collect(),
390                completed_works: block
391                    .completed_works_iter()
392                    .map(|work| (&work.proofs).into())
393                    .collect(),
394            };
395
396            let snark_pool = &store.state().snark_pool;
397            scan_state.iter_mut().flatten().flatten().for_each(|job| {
398                if let RpcScanStateSummaryScanStateJob::Todo {
399                    job_id,
400                    bundle_job_id,
401                    job: kind,
402                    seq_no,
403                } = job
404                {
405                    let Some(data) = snark_pool.get(bundle_job_id) else {
406                        return;
407                    };
408                    let commitment = data.commitment.clone().map(Box::new);
409                    let snark = data
410                        .snark
411                        .as_ref()
412                        .map(|snark| RpcSnarkPoolJobSnarkWork {
413                            snarker: snark.work.snarker.clone(),
414                            fee: snark.work.fee.clone(),
415                            received_t: snark.received_t,
416                            sender: snark.sender,
417                        })
418                        .map(Box::new);
419
420                    if commitment.is_none() && snark.is_none() {
421                        return;
422                    }
423                    *job = RpcScanStateSummaryScanStateJob::Pending {
424                        job_id: job_id.clone(),
425                        bundle_job_id: bundle_job_id.clone(),
426                        job: Box::new(kind.clone()),
427                        seq_no: *seq_no,
428                        commitment,
429                        snark,
430                    };
431                }
432            });
433            let res = scan_state.map(|scan_state| RpcScanStateSummary {
434                block: block_summary,
435                scan_state,
436            });
437            let _ = store.service.respond_scan_state_summary_get(rpc_id, res);
438        }
439        RpcEffectfulAction::SnarkPoolAvailableJobsGet { rpc_id } => {
440            let resp = store
441                .state()
442                .snark_pool
443                .range(..)
444                .map(|(_, job)| RpcSnarkPoolJobSummary {
445                    time: job.time,
446                    id: job.id.clone(),
447                    commitment: job.commitment.clone(),
448                    snark: job.snark.as_ref().map(|snark| RpcSnarkPoolJobSnarkWork {
449                        snarker: snark.work.snarker.clone(),
450                        fee: snark.work.fee.clone(),
451                        received_t: snark.received_t,
452                        sender: snark.sender,
453                    }),
454                })
455                .collect::<Vec<_>>();
456            let _ = store.service().respond_snark_pool_get(rpc_id, resp);
457        }
458        RpcEffectfulAction::SnarkPoolJobGet { job_id, rpc_id } => {
459            let resp = store.state().snark_pool.range(..).find_map(|(_, job)| {
460                if job.id == job_id {
461                    Some(RpcSnarkPoolJobFull {
462                        time: job.time,
463                        id: job.id.clone(),
464                        job: job.job.clone(),
465                        commitment: job.commitment.clone(),
466                        snark: job.snark.as_ref().map(|snark| RpcSnarkPoolJobSnarkWork {
467                            snarker: snark.work.snarker.clone(),
468                            fee: snark.work.fee.clone(),
469                            received_t: snark.received_t,
470                            sender: snark.sender,
471                        }),
472                    })
473                } else {
474                    None
475                }
476            });
477            let _ = store.service().respond_snark_pool_job_get(rpc_id, resp);
478        }
479        RpcEffectfulAction::SnarkPoolCompletedJobsGet { rpc_id, jobs } => {
480            respond_or_log!(
481                store
482                    .service()
483                    .respond_snark_pool_completed_jobs_get(rpc_id, jobs),
484                meta.time()
485            );
486        }
487        RpcEffectfulAction::SnarkPoolPendingJobsGet { rpc_id, jobs } => {
488            respond_or_log!(
489                store
490                    .service()
491                    .respond_snark_pool_pending_jobs_get(rpc_id, jobs),
492                meta.time()
493            );
494        }
495        RpcEffectfulAction::SnarkerConfigGet { rpc_id, config } => {
496            let _ = store.service().respond_snarker_config_get(rpc_id, config);
497        }
498        RpcEffectfulAction::SnarkerJobCommit { rpc_id, job_id } => {
499            if !store.state().snark_pool.should_create_commitment(&job_id) {
500                let _ = store
501                    .service()
502                    .respond_snarker_job_commit(rpc_id, RpcSnarkerJobCommitResponse::JobNotFound);
503                // TODO(binier): differentiate between job not found and job already taken.
504                return;
505            }
506            if !store.state().external_snark_worker.has_idle() {
507                let _ = store
508                    .service()
509                    .respond_snarker_job_commit(rpc_id, RpcSnarkerJobCommitResponse::SnarkerBusy);
510                return;
511            }
512            if store
513                .service()
514                .respond_snarker_job_commit(rpc_id, RpcSnarkerJobCommitResponse::Ok)
515                .is_err()
516            {
517                return;
518            }
519            store.dispatch(SnarkPoolAction::CommitmentCreate { job_id });
520        }
521        RpcEffectfulAction::SnarkerJobSpec { rpc_id, job_id } => {
522            let Some(job) = store.state().snark_pool.get(&job_id) else {
523                if store
524                    .service()
525                    .respond_snarker_job_spec(rpc_id, RpcSnarkerJobSpecResponse::JobNotFound)
526                    .is_err()
527                {
528                    return;
529                }
530                return;
531            };
532            let input = available_job_to_snark_worker_spec(
533                job.job.clone(),
534                &store.state().transition_frontier,
535            );
536            // TODO(binier): maybe don't require snarker to be enabled here.
537            let Some(config) = store.state.get().config.snarker.as_ref() else {
538                return;
539            };
540            let public_key = config.public_key.clone().into();
541            let fee = config.fee.clone();
542            let input = match input {
543                    Ok(instances) => RpcSnarkerJobSpecResponse::Ok(
544                        mina_p2p_messages::v2::SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponse(Some((
545                            mina_p2p_messages::v2::SnarkWorkerWorkerRpcsVersionedGetWorkV2TResponseA0 {
546                                instances,
547                                fee,
548                            },
549                            public_key,
550                        )))
551                    ),
552                    Err(err) => RpcSnarkerJobSpecResponse::Err(err),
553                };
554
555            // TODO: handle potential errors
556            let _ = store.service().respond_snarker_job_spec(rpc_id, input);
557        }
558        RpcEffectfulAction::SnarkerWorkersGet {
559            rpc_id,
560            snark_worker,
561        } => {
562            // TODO: handle potential errors
563            let _ = store
564                .service()
565                .respond_snarker_workers(rpc_id, vec![snark_worker.into()]);
566        }
567        RpcEffectfulAction::HealthCheck { rpc_id, has_peers } => {
568            respond_or_log!(
569                store.service().respond_health_check(rpc_id, has_peers),
570                meta.time()
571            );
572        }
573        RpcEffectfulAction::ReadinessCheck { rpc_id } => {
574            const THRESH: Duration = Duration::from_secs(60 * 3 * 10);
575            let synced = match store.state().transition_frontier.sync {
576                TransitionFrontierSyncState::Synced { time }
577                    if meta.time().checked_sub(time) <= Some(THRESH) =>
578                {
579                    Ok(())
580                }
581                TransitionFrontierSyncState::Synced { time } => Err(format!(
582                    "Synced {:?} ago, which is more than the threshold {:?}",
583                    meta.time().checked_sub(time),
584                    THRESH
585                )),
586                _ => Err("not synced".to_owned()),
587            };
588            // let synced = store
589            //     .service()
590            //     .stats()
591            //     .and_then(|stats| stats.get_sync_time())
592            //     .ok_or_else(|| String::from("Not synced"))
593            //     .and_then(|t| {
594            //         meta.time().checked_sub(t).ok_or_else(|| {
595            //             format!("Cannot get duration between {t:?} and {:?}", meta.time())
596            //         })
597            //     })
598            //     .and_then(|dur| {
599            //         const THRESH: Duration = Duration::from_secs(60 * 3 * 10);
600            //         if dur <= THRESH {
601            //             Ok(())
602            //         } else {
603            //             Err(format!(
604            //                 "Synced {:?} ago, which is more than the threshold {:?}",
605            //                 dur, THRESH
606            //             ))
607            //         }
608            //     });
609            // openmina_core::log::debug!(meta.time(); summary = "readiness check", result = format!("{synced:?}"));
610            respond_or_log!(
611                store.service().respond_readiness_check(rpc_id, synced),
612                meta.time()
613            );
614        }
615        RpcEffectfulAction::DiscoveryRoutingTable { rpc_id, response } => {
616            respond_or_log!(
617                store
618                    .service()
619                    .respond_discovery_routing_table(rpc_id, response),
620                meta.time()
621            );
622        }
623        RpcEffectfulAction::DiscoveryBoostrapStats { rpc_id, response } => {
624            respond_or_log!(
625                store
626                    .service()
627                    .respond_discovery_bootstrap_stats(rpc_id, response),
628                meta.time()
629            );
630        }
631        RpcEffectfulAction::TransactionPool { rpc_id, response } => {
632            respond_or_log!(
633                store.service().respond_transaction_pool(rpc_id, response),
634                meta.time()
635            )
636        }
637        RpcEffectfulAction::LedgerAccountsGetSuccess {
638            rpc_id,
639            accounts,
640            account_query,
641        } => {
642            // Is this todo still relevant?
643            // TODO(adonagy): maybe something more effective?
644            match account_query {
645                // all the accounts for the FE in Slim form
646                AccountQuery::All => {
647                    let mut accounts: BTreeMap<CompressedPubKey, Account> = accounts
648                        .into_iter()
649                        .map(|acc| (acc.public_key.clone(), acc))
650                        .collect();
651                    let nonces_and_amount = store
652                        .state()
653                        .transaction_pool
654                        .get_pending_amount_and_nonce();
655
656                    nonces_and_amount
657                        .iter()
658                        .for_each(|(account_id, (nonce, amount))| {
659                            if let Some(account) = accounts.get_mut(&account_id.public_key) {
660                                if let Some(nonce) = nonce {
661                                    if nonce >= &account.nonce {
662                                        // increment the last nonce in the pool
663                                        account.nonce = nonce.incr();
664                                    }
665                                }
666                                account.balance = account
667                                    .balance
668                                    .sub_amount(*amount)
669                                    .unwrap_or(Balance::zero());
670                            }
671                        });
672
673                    let accounts = accounts
674                        .into_values()
675                        .map(|v| v.into())
676                        .collect::<Vec<AccountSlim>>();
677
678                    respond_or_log!(
679                        store
680                            .service()
681                            .respond_ledger_slim_accounts(rpc_id, accounts),
682                        meta.time()
683                    )
684                }
685                // for the graphql endpoint
686                AccountQuery::SinglePublicKey(..) | AccountQuery::PubKeyWithTokenId(..) => {
687                    respond_or_log!(
688                        store.service().respond_ledger_accounts(rpc_id, accounts),
689                        meta.time()
690                    )
691                }
692                AccountQuery::MultipleIds(..) => {
693                    respond_or_log!(
694                        store.service().respond_ledger_accounts(rpc_id, accounts),
695                        meta.time()
696                    )
697                }
698            }
699        }
700        RpcEffectfulAction::TransactionInjectSuccess { rpc_id, response } => {
701            respond_or_log!(
702                store.service().respond_transaction_inject(
703                    rpc_id,
704                    RpcTransactionInjectResponse::Success(response)
705                ),
706                meta.time()
707            )
708        }
709        RpcEffectfulAction::TransactionInjectRejected { rpc_id, response } => {
710            respond_or_log!(
711                store.service().respond_transaction_inject(
712                    rpc_id,
713                    RpcTransactionInjectResponse::Rejected(response)
714                ),
715                meta.time()
716            )
717        }
718        RpcEffectfulAction::TransactionInjectFailure { rpc_id, errors } => {
719            respond_or_log!(
720                store.service().respond_transaction_inject(
721                    rpc_id,
722                    RpcTransactionInjectResponse::Failure(errors)
723                ),
724                meta.time()
725            )
726        }
727        RpcEffectfulAction::TransitionFrontierUserCommandsGet { rpc_id, commands } => {
728            respond_or_log!(
729                store
730                    .service()
731                    .respond_transition_frontier_commands(rpc_id, commands),
732                meta.time()
733            )
734        }
735        RpcEffectfulAction::BestChain { rpc_id, best_chain } => {
736            respond_or_log!(
737                store.service().respond_best_chain(rpc_id, best_chain),
738                meta.time()
739            )
740        }
741        RpcEffectfulAction::ConsensusConstantsGet { rpc_id, response } => {
742            respond_or_log!(
743                store
744                    .service()
745                    .respond_consensus_constants(rpc_id, response),
746                meta.time()
747            )
748        }
749        RpcEffectfulAction::TransactionStatusGet { rpc_id, tx } => {
750            // Check if the transaction is in the pool, if it is, return PENDING
751            let tx_hash = tx.hash().ok();
752
753            let in_tx_pool = store
754                .state()
755                .transaction_pool
756                .get_all_transactions()
757                .iter()
758                .any(|tx_with_hash| Some(&tx_with_hash.hash) == tx_hash.as_ref());
759
760            if in_tx_pool {
761                respond_or_log!(
762                    store
763                        .service()
764                        .respond_transaction_status(rpc_id, TransactionStatus::Pending),
765                    meta.time()
766                );
767                return;
768            }
769
770            let in_transition_frontier = if let Some(hash) = tx_hash {
771                store
772                    .state()
773                    .transition_frontier
774                    .contains_transaction(&hash)
775            } else {
776                false
777            };
778
779            // Check whether the transaction is in the transition frontier, if it is, return INCLUDED
780            if in_transition_frontier {
781                respond_or_log!(
782                    store
783                        .service()
784                        .respond_transaction_status(rpc_id, TransactionStatus::Included),
785                    meta.time()
786                )
787            // Otherwise, return UNKNOWN
788            } else {
789                respond_or_log!(
790                    store
791                        .service()
792                        .respond_transaction_status(rpc_id, TransactionStatus::Unknown),
793                    meta.time()
794                )
795            }
796        }
797        RpcEffectfulAction::BlockGet { rpc_id, block } => {
798            respond_or_log!(
799                store.service().respond_block_get(rpc_id, block),
800                meta.time()
801            )
802        }
803
804        RpcEffectfulAction::PooledUserCommands {
805            rpc_id,
806            user_commands,
807        } => {
808            respond_or_log!(
809                store
810                    .service()
811                    .respond_pooled_user_commands(rpc_id, user_commands),
812                meta.time()
813            )
814        }
815
816        RpcEffectfulAction::PooledZkappCommands {
817            rpc_id,
818            zkapp_commands,
819        } => {
820            respond_or_log!(
821                store
822                    .service()
823                    .respond_pooled_zkapp_commands(rpc_id, zkapp_commands),
824                meta.time()
825            )
826        }
827        RpcEffectfulAction::GenesisBlock {
828            rpc_id,
829            genesis_block,
830        } => {
831            respond_or_log!(
832                store.service().respond_genesis_block(rpc_id, genesis_block),
833                meta.time()
834            )
835        }
836
837        RpcEffectfulAction::ConsensusTimeGet {
838            rpc_id,
839            consensus_time,
840        } => {
841            respond_or_log!(
842                store
843                    .service()
844                    .respond_consensus_time_get(rpc_id, consensus_time),
845                meta.time()
846            )
847        }
848        RpcEffectfulAction::LedgerStatusGetSuccess { rpc_id, response } => {
849            respond_or_log!(
850                store.service().respond_ledger_status_get(rpc_id, response),
851                meta.time()
852            )
853        }
854        RpcEffectfulAction::LedgerAccountDelegatorsGetSuccess { rpc_id, response } => {
855            respond_or_log!(
856                store
857                    .service()
858                    .respond_ledger_account_delegators_get(rpc_id, response),
859                meta.time()
860            )
861        }
862    }
863}
864
865fn compute_node_status<S: Service>(store: &mut Store<S>) -> RpcNodeStatus {
866    let state = store.state.get();
867    let chain_id = state.p2p.ready().map(|p2p| p2p.chain_id.to_hex());
868    let block_summary = |b: &ArcBlockWithHash| RpcNodeStatusTransitionFrontierBlockSummary {
869        hash: b.hash().clone(),
870        height: b.height(),
871        global_slot: b.global_slot(),
872    };
873
874    let block_production_attempts = store
875        .service
876        .stats()
877        .map_or_else(Vec::new, |stats| stats.block_producer().collect_attempts());
878
879    let current_block_production_attempt = block_production_attempts.last().cloned();
880
881    let previous_block_production_attempt = block_production_attempts
882        .len()
883        .checked_sub(2)
884        .and_then(|idx| block_production_attempts.get(idx))
885        .cloned();
886
887    let network_info = RpcNodeStatusNetworkInfo {
888        bind_ip: "0.0.0.0".to_string(),
889        external_ip: state
890            .p2p
891            .config()
892            .external_addrs
893            .first()
894            .map(|addr| addr.to_string()),
895        client_port: state.config.client_port,
896        libp2p_port: state.p2p.config().libp2p_port,
897    };
898
899    let block_producer = state
900        .block_producer
901        .config()
902        .map(|config| AccountPublicKey::from(config.pub_key.clone()));
903    let coinbase_receiver = state
904        .block_producer
905        .config()
906        .map(|config| AccountPublicKey::from(config.coinbase_receiver().clone()));
907
908    let status = RpcNodeStatus {
909        chain_id,
910        block_producer,
911        coinbase_receiver,
912        transition_frontier: RpcNodeStatusTransitionFrontier {
913            best_tip: state.transition_frontier.best_tip().map(block_summary),
914            sync: RpcNodeStatusTransitionFrontierSync {
915                time: state.transition_frontier.sync.time(),
916                status: state.transition_frontier.sync.to_string(),
917                phase: state.transition_frontier.sync.sync_phase().to_string(),
918                target: state.transition_frontier.sync.best_tip().map(block_summary),
919            },
920        },
921        ledger: RpcNodeStatusLedger {
922            alive_masks_after_last_commit: state.ledger.alive_masks,
923            pending_writes: state
924                .ledger
925                .write
926                .pending_requests()
927                .map(|(req, time)| (req.kind(), time))
928                .collect(),
929            pending_reads: state
930                .ledger
931                .read
932                .pending_requests()
933                .map(|(id, req, time)| (id, req.kind(), time))
934                .collect(),
935        },
936        peers: rpc::collect_rpc_peers_info(state),
937        snark_pool: state
938            .snark_pool
939            .jobs_iter()
940            .fold(Default::default(), |mut acc, job| {
941                if job.snark.is_some() {
942                    acc.snarks = acc.snarks.saturating_add(1);
943                }
944                acc.total_jobs = acc.total_jobs.saturating_add(1);
945                acc
946            }),
947        transaction_pool: RpcNodeStatusTransactionPool {
948            transactions: state.transaction_pool.size(),
949            transactions_for_propagation: state.transaction_pool.for_propagation_size(),
950            transaction_candidates: state.transaction_pool.candidates.transactions_count(),
951        },
952        current_block_production_attempt,
953        previous_block_production_attempt,
954        resources_status: RpcNodeStatusResources {
955            p2p_malloc_size: {
956                let mut set = BTreeSet::new();
957                let fun = move |ptr: *const c_void| !set.insert(ptr.addr());
958                let mut ops = MallocSizeOfOps::new(None, Some(Box::new(fun)));
959                size_of_val(&state.p2p).saturating_add(state.p2p.size_of(&mut ops))
960            },
961            transition_frontier: state.transition_frontier.resources_usage(),
962            snark_pool: state.snark_pool.resources_usage(),
963        },
964        service_queues: store.service.queues(),
965        network_info,
966    };
967    status
968}
969
970fn make_produced_block_info(
971    block: Option<ArcBlockWithHash>,
972) -> std::io::Result<Option<ProducedBlockInfo>> {
973    use base64::{engine::general_purpose::URL_SAFE, Engine as _};
974    use mina_p2p_messages::binprot::BinProtWrite;
975
976    let Some(block) = block else { return Ok(None) };
977
978    let height = block.height();
979    let global_slot = block.global_slot();
980    let hash = block.hash().to_string();
981    let mut buf = Vec::with_capacity(5 * 1024 * 1024);
982    v2::MinaBlockHeaderStableV2::binprot_write(block.header(), &mut buf)?;
983
984    let base64_encoded_header = URL_SAFE.encode(&buf);
985
986    Ok(Some(ProducedBlockInfo {
987        height,
988        global_slot,
989        hash,
990        base64_encoded_header,
991    }))
992}