mina_node_common/service/rpc/
mod.rs

1//! # RPC Service Layer
2//!
3//! This module provides the service layer that connects HTTP endpoints to the
4//! node's state machine. It handles the communication between external RPC
5//! requests and the internal event-driven architecture.
6//!
7//! ## Architecture
8//!
9//! The RPC system follows a request-response pattern through channels:
10//!
11//! ```text
12//! HTTP Server -> RpcSender -> RpcService -> Event -> State Machine
13//!                                                         |
14//! HTTP Response <- oneshot channel <- respond_* methods <-+
15//! ```
16//!
17//! 1. HTTP server receives a request and sends it via [`RpcSender`]
18//! 2. [`RpcService`] wraps the request with a unique ID and responder channel
19//! 3. Request becomes an [`Event::Rpc`] dispatched to the state machine
20//! 4. State machine processes the request and calls a `respond_*` method
21//! 5. Response is sent back through the oneshot channel to the HTTP server
22//!
23//! ## Adding a new RPC endpoint
24//!
25//! To add a new RPC endpoint, follow these steps:
26//!
27//! 1. **Define the request type** in [`node::rpc`]:
28//!    Add a variant to `RpcRequest` enum in `node/src/rpc/mod.rs`
29//!
30//! 2. **Define the response type** in [`node::rpc`]:
31//!    Create a type alias like `pub type RpcMyNewResponse = ...;`
32//!
33//! 3. **Add the respond method** to `RpcService` trait in
34//!    `node/src/rpc_effectful/mod.rs`:
35//!    ```ignore
36//!    fn respond_my_new_endpoint(
37//!        &mut self,
38//!        rpc_id: RpcId,
39//!        response: RpcMyNewResponse,
40//!    ) -> Result<(), RespondError>;
41//!    ```
42//!
43//! 4. **Implement the respond method** in this file using the macro:
44//!    ```ignore
45//!    rpc_service_impl!(respond_my_new_endpoint, RpcMyNewResponse);
46//!    ```
47//!
48//! 5. **Handle the request** in `node/src/rpc/rpc_reducer.rs`:
49//!    Add a match arm for your new `RpcRequest` variant
50//!
51//! 6. **Add the HTTP route** in `node/native/src/http_server.rs`:
52//!    Define the route and call `rpc_sender.my_new_endpoint()`
53//!
54//! 7. **Add the sender method** in `sender.rs`:
55//!    Implement the method on [`RpcSender`] that sends the request through the
56//!    channel
57//!
58//! 8. **Document the endpoint** in the website documentation:
59//!    Add the new endpoint to `website/docs/developers/api-and-data/rpc-api.md`
60//!    so node operators can discover and use it
61
62mod sender;
63pub use sender::RpcSender;
64
65pub mod ledger;
66pub mod state;
67pub mod stats;
68pub mod transaction_pool;
69pub mod transition_frontier;
70
71use node::rpc::{
72    RpcBestChainResponse, RpcBlockProducerStatsGetResponse, RpcConsensusConstantsGetResponse,
73    RpcConsensusTimeGetResponse, RpcDiscoveryBoostrapStatsResponse,
74    RpcDiscoveryRoutingTableResponse, RpcGenesisBlockResponse, RpcGetBlockResponse,
75    RpcHealthCheckResponse, RpcHeartbeatGetResponse, RpcLedgerAccountDelegatorsGetResponse,
76    RpcLedgerAccountsResponse, RpcLedgerSlimAccountsResponse, RpcLedgerStatusGetResponse,
77    RpcMessageProgressResponse, RpcPeersGetResponse, RpcPooledUserCommandsResponse,
78    RpcPooledZkappCommandsResponse, RpcReadinessCheckResponse, RpcRequest,
79    RpcSnarkPoolCompletedJobsResponse, RpcSnarkPoolPendingJobsGetResponse, RpcStateGetError,
80    RpcStatusGetResponse, RpcTransactionInjectResponse, RpcTransactionPoolResponse,
81    RpcTransactionStatusGetResponse, RpcTransitionFrontierUserCommandsResponse,
82};
83use serde::{Deserialize, Serialize};
84
85use node::{
86    core::{
87        channels::{mpsc, oneshot},
88        requests::PendingRequests,
89    },
90    event_source::Event,
91    p2p::connection::P2pConnectionResponse,
92    rpc::RpcSnarkPoolJobGetResponse,
93    State,
94};
95pub use node::{
96    rpc::{
97        ActionStatsResponse, RpcActionStatsGetResponse, RpcId, RpcIdType,
98        RpcP2pConnectionOutgoingResponse, RpcScanStateSummaryGetResponse, RpcSnarkPoolGetResponse,
99        RpcSnarkerJobCommitResponse, RpcSnarkerJobSpecResponse, RpcStateGetResponse,
100        RpcSyncStatsGetResponse, RpcTransactionInjectSuccess,
101    },
102    rpc_effectful::RespondError,
103};
104
105use crate::NodeService;
106
107#[derive(Serialize, Deserialize, Debug)]
108pub enum RpcP2pConnectionIncomingResponse {
109    Answer(P2pConnectionResponse),
110    Result(Result<(), String>),
111}
112
113pub struct NodeRpcRequest {
114    pub req: RpcRequest,
115    pub responder: Box<dyn Send + std::any::Any>,
116}
117
118pub type RpcReceiver = mpsc::Receiver<NodeRpcRequest>;
119
120pub struct RpcService {
121    pending: PendingRequests<RpcIdType, Box<dyn Send + std::any::Any>>,
122
123    req_sender: mpsc::Sender<NodeRpcRequest>,
124    req_receiver: mpsc::Receiver<NodeRpcRequest>,
125}
126
127impl Default for RpcService {
128    fn default() -> Self {
129        Self::new()
130    }
131}
132
133impl RpcService {
134    pub fn new() -> Self {
135        let (tx, rx) = mpsc::channel(8);
136        Self {
137            pending: Default::default(),
138            req_sender: tx,
139            req_receiver: rx,
140        }
141    }
142
143    /// Channel for sending the rpc request to state machine.
144    pub fn req_sender(&self) -> RpcSender {
145        RpcSender::new(self.req_sender.clone())
146    }
147
148    /// Channel for receiving rpc requests in state machine.
149    pub fn req_receiver(&mut self) -> &mut RpcReceiver {
150        &mut self.req_receiver
151    }
152
153    pub fn process_request(&mut self, req: NodeRpcRequest) -> Event {
154        let rpc_id = self.pending.add(req.responder);
155        let req = req.req;
156        Event::Rpc(rpc_id, Box::new(req))
157    }
158}
159
160impl NodeService {
161    pub fn process_rpc_request(&mut self, req: NodeRpcRequest) {
162        let rpc_id = self.rpc.pending.add(req.responder);
163        let req = req.req;
164        let tx = self.event_sender.clone();
165
166        let _ = tx.send(Event::Rpc(rpc_id, Box::new(req)));
167    }
168}
169
170macro_rules! rpc_service_impl {
171    ($name:ident, $ty:ty) => {
172        fn $name(&mut self, rpc_id: RpcId, response: $ty) -> Result<(), RespondError> {
173            let entry = self.rpc.pending.remove(rpc_id);
174            let chan = entry.ok_or(RespondError::UnknownRpcId)?;
175            let chan = chan
176                .downcast::<oneshot::Sender<$ty>>()
177                .map_err(|_| RespondError::UnexpectedResponseType)?;
178            chan.send(response)
179                .map_err(|_| RespondError::RespondingFailed)?;
180            Ok(())
181        }
182    };
183}
184
185macro_rules! state_field_filter {
186    ($state:expr, $($part:ident)|*, $filter:expr ) => {
187        $(
188            if let Some(filter) = strip_root_field($filter, stringify!($part)) {
189                (serde_json::to_value(&$state.$part)?, format!("${filter}"))
190            } else
191        )*
192        {
193            (serde_json::to_value($state)?, $filter.to_string())
194        }
195    };
196}
197
198/// Strips topmost field name `field` from the jsonpath expression `filter`,
199/// returning modified filter. If the `filter` does not start with the specified
200/// field, returns [None].
201///
202/// ```ignore
203/// use mina_node_native::rpc::strip_root_field;
204///
205/// let filter = strip_root_field("$.field", "field");
206/// assert_eq!(filter, Some(""));
207///
208/// let filter = strip_root_field("$.field.another", "field");
209/// assert_eq!(filter, Some(".another"));
210///
211/// let filter = strip_root_field("$.field_other", "field");
212/// assert_eq!(filter, None);
213/// ```
214fn strip_root_field<'a>(filter: &'a str, field: &str) -> Option<&'a str> {
215    let strip_root = |f: &'a str| f.strip_prefix('$');
216    let field_char = |c: char| c.is_alphabetic() || c == '_';
217    let strip_dot_field = |f: &'a str| {
218        f.strip_prefix('.').and_then(|f| {
219            f.strip_prefix(field)
220                .and_then(|f| (!f.starts_with(field_char)).then_some(f))
221        })
222    };
223    let strip_index_field = |f: &'a str| {
224        f.strip_prefix("['")
225            .and_then(|f| f.strip_prefix(field))
226            .and_then(|f| f.strip_prefix("']"))
227    };
228    strip_root(filter).and_then(|f| strip_dot_field(f).or_else(|| strip_index_field(f)))
229}
230
231fn optimize_filtered_state(
232    state: &State,
233    filter: &str,
234) -> Result<(serde_json::Value, String), serde_json::Error> {
235    let (value, filter) = state_field_filter!(
236        state,
237        config
238            | p2p
239            | snark
240            | transition_frontier
241            | snark_pool
242            | external_snark_worker
243            | block_producer
244            | rpc
245            | watched_accounts,
246        filter
247    );
248    Ok((value, filter))
249}
250
251impl node::rpc_effectful::RpcService for NodeService {
252    fn respond_state_get(
253        &mut self,
254        rpc_id: RpcId,
255        (state, filter): (&State, Option<&str>),
256    ) -> Result<(), RespondError> {
257        let entry = self.rpc.pending.remove(rpc_id);
258        let chan = entry.ok_or(RespondError::UnknownRpcId)?;
259        let chan = chan
260            .downcast::<oneshot::Sender<RpcStateGetResponse>>()
261            .or(Err(RespondError::UnexpectedResponseType))?;
262        let response = if let Some(filter) = filter {
263            let (json_state, filter) = optimize_filtered_state(state, filter)?;
264            match filter.parse::<jsonpath_rust::JsonPathInst>() {
265                Ok(filter) => {
266                    let values = filter
267                        .find_slice(&json_state, Default::default())
268                        .into_iter()
269                        .map(|p| (*p).clone())
270                        .collect::<Vec<_>>();
271                    Ok(if values.len() == 1 {
272                        values[0].clone()
273                    } else {
274                        serde_json::Value::Array(values)
275                    })
276                }
277                Err(err) => Err(RpcStateGetError::FilterError(err)),
278            }
279        } else {
280            Ok(serde_json::to_value(state)?)
281        };
282        chan.send(response)
283            .or(Err(RespondError::RespondingFailed))?;
284        Ok(())
285    }
286    rpc_service_impl!(respond_status_get, RpcStatusGetResponse);
287
288    rpc_service_impl!(respond_heartbeat_get, RpcHeartbeatGetResponse);
289
290    rpc_service_impl!(respond_sync_stats_get, RpcSyncStatsGetResponse);
291    rpc_service_impl!(respond_action_stats_get, RpcActionStatsGetResponse);
292    rpc_service_impl!(
293        respond_block_producer_stats_get,
294        RpcBlockProducerStatsGetResponse
295    );
296    rpc_service_impl!(
297        respond_message_progress_stats_get,
298        RpcMessageProgressResponse
299    );
300    rpc_service_impl!(respond_peers_get, RpcPeersGetResponse);
301    rpc_service_impl!(
302        respond_p2p_connection_outgoing,
303        RpcP2pConnectionOutgoingResponse
304    );
305
306    fn respond_p2p_connection_incoming_answer(
307        &mut self,
308        rpc_id: RpcId,
309        response: P2pConnectionResponse,
310    ) -> Result<(), RespondError> {
311        let entry = self.rpc.pending.get(rpc_id);
312        let chan = entry.ok_or(RespondError::UnknownRpcId)?;
313        let chan = chan
314            .downcast_ref::<mpsc::Sender<RpcP2pConnectionIncomingResponse>>()
315            .ok_or(RespondError::UnexpectedResponseType)?
316            .clone();
317        chan.try_send(RpcP2pConnectionIncomingResponse::Answer(response))
318            .or(Err(RespondError::RespondingFailed))?;
319        Ok(())
320    }
321
322    fn respond_p2p_connection_incoming(
323        &mut self,
324        rpc_id: RpcId,
325        response: Result<(), String>,
326    ) -> Result<(), RespondError> {
327        let entry = self.rpc.pending.remove(rpc_id);
328        let chan = entry.ok_or(RespondError::UnknownRpcId)?;
329        let chan = chan
330            .downcast::<mpsc::Sender<RpcP2pConnectionIncomingResponse>>()
331            .or(Err(RespondError::UnexpectedResponseType))?;
332        chan.try_send(RpcP2pConnectionIncomingResponse::Result(response))
333            .or(Err(RespondError::RespondingFailed))?;
334        Ok(())
335    }
336
337    rpc_service_impl!(
338        respond_scan_state_summary_get,
339        RpcScanStateSummaryGetResponse
340    );
341    rpc_service_impl!(respond_snark_pool_get, RpcSnarkPoolGetResponse);
342    rpc_service_impl!(respond_snark_pool_job_get, RpcSnarkPoolJobGetResponse);
343    rpc_service_impl!(
344        respond_snark_pool_completed_jobs_get,
345        RpcSnarkPoolCompletedJobsResponse
346    );
347    rpc_service_impl!(
348        respond_snark_pool_pending_jobs_get,
349        RpcSnarkPoolPendingJobsGetResponse
350    );
351    rpc_service_impl!(respond_snarker_job_commit, RpcSnarkerJobCommitResponse);
352    rpc_service_impl!(
353        respond_snarker_job_spec,
354        node::rpc::RpcSnarkerJobSpecResponse
355    );
356    rpc_service_impl!(
357        respond_snarker_workers,
358        node::rpc::RpcSnarkerWorkersResponse
359    );
360    rpc_service_impl!(
361        respond_snarker_config_get,
362        node::rpc::RpcSnarkerConfigGetResponse
363    );
364    rpc_service_impl!(respond_health_check, RpcHealthCheckResponse);
365    rpc_service_impl!(respond_readiness_check, RpcReadinessCheckResponse);
366    rpc_service_impl!(
367        respond_discovery_routing_table,
368        RpcDiscoveryRoutingTableResponse
369    );
370    rpc_service_impl!(
371        respond_discovery_bootstrap_stats,
372        RpcDiscoveryBoostrapStatsResponse
373    );
374    rpc_service_impl!(respond_transaction_pool, RpcTransactionPoolResponse);
375    rpc_service_impl!(respond_ledger_slim_accounts, RpcLedgerSlimAccountsResponse);
376    rpc_service_impl!(respond_ledger_accounts, RpcLedgerAccountsResponse);
377    rpc_service_impl!(respond_transaction_inject, RpcTransactionInjectResponse);
378    rpc_service_impl!(
379        respond_transition_frontier_commands,
380        RpcTransitionFrontierUserCommandsResponse
381    );
382    rpc_service_impl!(respond_best_chain, RpcBestChainResponse);
383    rpc_service_impl!(
384        respond_consensus_constants,
385        RpcConsensusConstantsGetResponse
386    );
387    rpc_service_impl!(respond_transaction_status, RpcTransactionStatusGetResponse);
388    rpc_service_impl!(respond_block_get, RpcGetBlockResponse);
389    rpc_service_impl!(respond_pooled_user_commands, RpcPooledUserCommandsResponse);
390    rpc_service_impl!(
391        respond_pooled_zkapp_commands,
392        RpcPooledZkappCommandsResponse
393    );
394    rpc_service_impl!(respond_genesis_block, RpcGenesisBlockResponse);
395    rpc_service_impl!(respond_consensus_time_get, RpcConsensusTimeGetResponse);
396    rpc_service_impl!(respond_ledger_status_get, RpcLedgerStatusGetResponse);
397    rpc_service_impl!(
398        respond_ledger_account_delegators_get,
399        RpcLedgerAccountDelegatorsGetResponse
400    );
401}
402
403#[cfg(test)]
404mod tests {
405    use super::strip_root_field;
406
407    #[test]
408    fn strip_root_field_test() {
409        for (filter, expected) in [
410            ("$.field", Some("")),
411            ("$['field']", Some("")),
412            ("$.field.another", Some(".another")),
413            ("$['field'].another", Some(".another")),
414            ("$.another", None),
415            ("$.field_1", None),
416            ("$.fields", None),
417        ] {
418            let actual = strip_root_field(filter, "field");
419            assert_eq!(actual, expected)
420        }
421    }
422}