openmina_node_common/service/rpc/
mod.rs

1mod sender;
2pub use sender::RpcSender;
3
4pub mod ledger;
5pub mod state;
6pub mod stats;
7pub mod transaction_pool;
8pub mod transition_frontier;
9
10use node::rpc::{
11    RpcBestChainResponse, RpcBlockProducerStatsGetResponse, RpcConsensusConstantsGetResponse,
12    RpcConsensusTimeGetResponse, RpcDiscoveryBoostrapStatsResponse,
13    RpcDiscoveryRoutingTableResponse, RpcGenesisBlockResponse, RpcGetBlockResponse,
14    RpcHealthCheckResponse, RpcHeartbeatGetResponse, RpcLedgerAccountDelegatorsGetResponse,
15    RpcLedgerAccountsResponse, RpcLedgerSlimAccountsResponse, RpcLedgerStatusGetResponse,
16    RpcMessageProgressResponse, RpcPeersGetResponse, RpcPooledUserCommandsResponse,
17    RpcPooledZkappCommandsResponse, RpcReadinessCheckResponse, RpcRequest,
18    RpcSnarkPoolCompletedJobsResponse, RpcSnarkPoolPendingJobsGetResponse, RpcStateGetError,
19    RpcStatusGetResponse, RpcTransactionInjectResponse, RpcTransactionPoolResponse,
20    RpcTransactionStatusGetResponse, RpcTransitionFrontierUserCommandsResponse,
21};
22use serde::{Deserialize, Serialize};
23
24use node::{
25    core::{
26        channels::{mpsc, oneshot},
27        requests::PendingRequests,
28    },
29    event_source::Event,
30    p2p::connection::P2pConnectionResponse,
31    rpc::RpcSnarkPoolJobGetResponse,
32    State,
33};
34pub use node::{
35    rpc::{
36        ActionStatsResponse, RpcActionStatsGetResponse, RpcId, RpcIdType,
37        RpcP2pConnectionOutgoingResponse, RpcScanStateSummaryGetResponse, RpcSnarkPoolGetResponse,
38        RpcSnarkerJobCommitResponse, RpcSnarkerJobSpecResponse, RpcStateGetResponse,
39        RpcSyncStatsGetResponse, RpcTransactionInjectSuccess,
40    },
41    rpc_effectful::RespondError,
42};
43
44use crate::NodeService;
45
46#[derive(Serialize, Deserialize, Debug)]
47pub enum RpcP2pConnectionIncomingResponse {
48    Answer(P2pConnectionResponse),
49    Result(Result<(), String>),
50}
51
52pub struct NodeRpcRequest {
53    pub req: RpcRequest,
54    pub responder: Box<dyn Send + std::any::Any>,
55}
56
57pub type RpcReceiver = mpsc::Receiver<NodeRpcRequest>;
58
59pub struct RpcService {
60    pending: PendingRequests<RpcIdType, Box<dyn Send + std::any::Any>>,
61
62    req_sender: mpsc::Sender<NodeRpcRequest>,
63    req_receiver: mpsc::Receiver<NodeRpcRequest>,
64}
65
66impl Default for RpcService {
67    fn default() -> Self {
68        Self::new()
69    }
70}
71
72impl RpcService {
73    pub fn new() -> Self {
74        let (tx, rx) = mpsc::channel(8);
75        Self {
76            pending: Default::default(),
77            req_sender: tx,
78            req_receiver: rx,
79        }
80    }
81
82    /// Channel for sending the rpc request to state machine.
83    pub fn req_sender(&self) -> RpcSender {
84        RpcSender::new(self.req_sender.clone())
85    }
86
87    /// Channel for receiving rpc requests in state machine.
88    pub fn req_receiver(&mut self) -> &mut RpcReceiver {
89        &mut self.req_receiver
90    }
91
92    pub fn process_request(&mut self, req: NodeRpcRequest) -> Event {
93        let rpc_id = self.pending.add(req.responder);
94        let req = req.req;
95        Event::Rpc(rpc_id, Box::new(req))
96    }
97}
98
99impl NodeService {
100    pub fn process_rpc_request(&mut self, req: NodeRpcRequest) {
101        let rpc_id = self.rpc.pending.add(req.responder);
102        let req = req.req;
103        let tx = self.event_sender.clone();
104
105        let _ = tx.send(Event::Rpc(rpc_id, Box::new(req)));
106    }
107}
108
109macro_rules! rpc_service_impl {
110    ($name:ident, $ty:ty) => {
111        fn $name(&mut self, rpc_id: RpcId, response: $ty) -> Result<(), RespondError> {
112            let entry = self.rpc.pending.remove(rpc_id);
113            let chan = entry.ok_or(RespondError::UnknownRpcId)?;
114            let chan = chan
115                .downcast::<oneshot::Sender<$ty>>()
116                .map_err(|_| RespondError::UnexpectedResponseType)?;
117            chan.send(response)
118                .map_err(|_| RespondError::RespondingFailed)?;
119            Ok(())
120        }
121    };
122}
123
124macro_rules! state_field_filter {
125    ($state:expr, $($part:ident)|*, $filter:expr ) => {
126        $(
127            if let Some(filter) = strip_root_field($filter, stringify!($part)) {
128                (serde_json::to_value(&$state.$part)?, format!("${filter}"))
129            } else
130        )*
131        {
132            (serde_json::to_value($state)?, $filter.to_string())
133        }
134    };
135}
136
137/// Strips topmost field name `field` from the jsonpath expression `filter`,
138/// returning modified filter. If the `filter` does not start with the specified
139/// field, returns [None].
140///
141/// ```ignore
142/// use openmina_node_native::rpc::strip_root_field;
143///
144/// let filter = strip_root_field("$.field", "field");
145/// assert_eq!(filter, Some(""));
146///
147/// let filter = strip_root_field("$.field.another", "field");
148/// assert_eq!(filter, Some(".another"));
149///
150/// let filter = strip_root_field("$.field_other", "field");
151/// assert_eq!(filter, None);
152/// ```
153fn strip_root_field<'a>(filter: &'a str, field: &str) -> Option<&'a str> {
154    let strip_root = |f: &'a str| f.strip_prefix('$');
155    let field_char = |c: char| c.is_alphabetic() || c == '_';
156    let strip_dot_field = |f: &'a str| {
157        f.strip_prefix('.').and_then(|f| {
158            f.strip_prefix(field)
159                .and_then(|f| (!f.starts_with(field_char)).then_some(f))
160        })
161    };
162    let strip_index_field = |f: &'a str| {
163        f.strip_prefix("['")
164            .and_then(|f| f.strip_prefix(field))
165            .and_then(|f| f.strip_prefix("']"))
166    };
167    strip_root(filter).and_then(|f| strip_dot_field(f).or_else(|| strip_index_field(f)))
168}
169
170fn optimize_filtered_state(
171    state: &State,
172    filter: &str,
173) -> Result<(serde_json::Value, String), serde_json::Error> {
174    let (value, filter) = state_field_filter!(
175        state,
176        config
177            | p2p
178            | snark
179            | transition_frontier
180            | snark_pool
181            | external_snark_worker
182            | block_producer
183            | rpc
184            | watched_accounts,
185        filter
186    );
187    Ok((value, filter))
188}
189
190impl node::rpc_effectful::RpcService for NodeService {
191    fn respond_state_get(
192        &mut self,
193        rpc_id: RpcId,
194        (state, filter): (&State, Option<&str>),
195    ) -> Result<(), RespondError> {
196        let entry = self.rpc.pending.remove(rpc_id);
197        let chan = entry.ok_or(RespondError::UnknownRpcId)?;
198        let chan = chan
199            .downcast::<oneshot::Sender<RpcStateGetResponse>>()
200            .or(Err(RespondError::UnexpectedResponseType))?;
201        let response = if let Some(filter) = filter {
202            let (json_state, filter) = optimize_filtered_state(state, filter)?;
203            match filter.parse::<jsonpath_rust::JsonPathInst>() {
204                Ok(filter) => {
205                    let values = filter
206                        .find_slice(&json_state, Default::default())
207                        .into_iter()
208                        .map(|p| (*p).clone())
209                        .collect::<Vec<_>>();
210                    Ok(if values.len() == 1 {
211                        values[0].clone()
212                    } else {
213                        serde_json::Value::Array(values)
214                    })
215                }
216                Err(err) => Err(RpcStateGetError::FilterError(err)),
217            }
218        } else {
219            Ok(serde_json::to_value(state)?)
220        };
221        chan.send(response)
222            .or(Err(RespondError::RespondingFailed))?;
223        Ok(())
224    }
225    rpc_service_impl!(respond_status_get, RpcStatusGetResponse);
226
227    rpc_service_impl!(respond_heartbeat_get, RpcHeartbeatGetResponse);
228
229    rpc_service_impl!(respond_sync_stats_get, RpcSyncStatsGetResponse);
230    rpc_service_impl!(respond_action_stats_get, RpcActionStatsGetResponse);
231    rpc_service_impl!(
232        respond_block_producer_stats_get,
233        RpcBlockProducerStatsGetResponse
234    );
235    rpc_service_impl!(
236        respond_message_progress_stats_get,
237        RpcMessageProgressResponse
238    );
239    rpc_service_impl!(respond_peers_get, RpcPeersGetResponse);
240    rpc_service_impl!(
241        respond_p2p_connection_outgoing,
242        RpcP2pConnectionOutgoingResponse
243    );
244
245    fn respond_p2p_connection_incoming_answer(
246        &mut self,
247        rpc_id: RpcId,
248        response: P2pConnectionResponse,
249    ) -> Result<(), RespondError> {
250        let entry = self.rpc.pending.get(rpc_id);
251        let chan = entry.ok_or(RespondError::UnknownRpcId)?;
252        let chan = chan
253            .downcast_ref::<mpsc::Sender<RpcP2pConnectionIncomingResponse>>()
254            .ok_or(RespondError::UnexpectedResponseType)?
255            .clone();
256        chan.try_send(RpcP2pConnectionIncomingResponse::Answer(response))
257            .or(Err(RespondError::RespondingFailed))?;
258        Ok(())
259    }
260
261    fn respond_p2p_connection_incoming(
262        &mut self,
263        rpc_id: RpcId,
264        response: Result<(), String>,
265    ) -> Result<(), RespondError> {
266        let entry = self.rpc.pending.remove(rpc_id);
267        let chan = entry.ok_or(RespondError::UnknownRpcId)?;
268        let chan = chan
269            .downcast::<mpsc::Sender<RpcP2pConnectionIncomingResponse>>()
270            .or(Err(RespondError::UnexpectedResponseType))?;
271        chan.try_send(RpcP2pConnectionIncomingResponse::Result(response))
272            .or(Err(RespondError::RespondingFailed))?;
273        Ok(())
274    }
275
276    rpc_service_impl!(
277        respond_scan_state_summary_get,
278        RpcScanStateSummaryGetResponse
279    );
280    rpc_service_impl!(respond_snark_pool_get, RpcSnarkPoolGetResponse);
281    rpc_service_impl!(respond_snark_pool_job_get, RpcSnarkPoolJobGetResponse);
282    rpc_service_impl!(
283        respond_snark_pool_completed_jobs_get,
284        RpcSnarkPoolCompletedJobsResponse
285    );
286    rpc_service_impl!(
287        respond_snark_pool_pending_jobs_get,
288        RpcSnarkPoolPendingJobsGetResponse
289    );
290    rpc_service_impl!(respond_snarker_job_commit, RpcSnarkerJobCommitResponse);
291    rpc_service_impl!(
292        respond_snarker_job_spec,
293        node::rpc::RpcSnarkerJobSpecResponse
294    );
295    rpc_service_impl!(
296        respond_snarker_workers,
297        node::rpc::RpcSnarkerWorkersResponse
298    );
299    rpc_service_impl!(
300        respond_snarker_config_get,
301        node::rpc::RpcSnarkerConfigGetResponse
302    );
303    rpc_service_impl!(respond_health_check, RpcHealthCheckResponse);
304    rpc_service_impl!(respond_readiness_check, RpcReadinessCheckResponse);
305    rpc_service_impl!(
306        respond_discovery_routing_table,
307        RpcDiscoveryRoutingTableResponse
308    );
309    rpc_service_impl!(
310        respond_discovery_bootstrap_stats,
311        RpcDiscoveryBoostrapStatsResponse
312    );
313    rpc_service_impl!(respond_transaction_pool, RpcTransactionPoolResponse);
314    rpc_service_impl!(respond_ledger_slim_accounts, RpcLedgerSlimAccountsResponse);
315    rpc_service_impl!(respond_ledger_accounts, RpcLedgerAccountsResponse);
316    rpc_service_impl!(respond_transaction_inject, RpcTransactionInjectResponse);
317    rpc_service_impl!(
318        respond_transition_frontier_commands,
319        RpcTransitionFrontierUserCommandsResponse
320    );
321    rpc_service_impl!(respond_best_chain, RpcBestChainResponse);
322    rpc_service_impl!(
323        respond_consensus_constants,
324        RpcConsensusConstantsGetResponse
325    );
326    rpc_service_impl!(respond_transaction_status, RpcTransactionStatusGetResponse);
327    rpc_service_impl!(respond_block_get, RpcGetBlockResponse);
328    rpc_service_impl!(respond_pooled_user_commands, RpcPooledUserCommandsResponse);
329    rpc_service_impl!(
330        respond_pooled_zkapp_commands,
331        RpcPooledZkappCommandsResponse
332    );
333    rpc_service_impl!(respond_genesis_block, RpcGenesisBlockResponse);
334    rpc_service_impl!(respond_consensus_time_get, RpcConsensusTimeGetResponse);
335    rpc_service_impl!(respond_ledger_status_get, RpcLedgerStatusGetResponse);
336    rpc_service_impl!(
337        respond_ledger_account_delegators_get,
338        RpcLedgerAccountDelegatorsGetResponse
339    );
340}
341
342#[cfg(test)]
343mod tests {
344    use super::strip_root_field;
345
346    #[test]
347    fn strip_root_field_test() {
348        for (filter, expected) in [
349            ("$.field", Some("")),
350            ("$['field']", Some("")),
351            ("$.field.another", Some(".another")),
352            ("$['field'].another", Some(".another")),
353            ("$.another", None),
354            ("$.field_1", None),
355            ("$.fields", None),
356        ] {
357            let actual = strip_root_field(filter, "field");
358            assert_eq!(actual, expected)
359        }
360    }
361}