openmina_node_common/service/rpc/
mod.rs1mod sender;
2pub use sender::RpcSender;
3
4pub mod ledger;
5pub mod state;
6pub mod stats;
7pub mod transaction_pool;
8pub mod transition_frontier;
9
10use node::rpc::{
11 RpcBestChainResponse, RpcBlockProducerStatsGetResponse, RpcConsensusConstantsGetResponse,
12 RpcConsensusTimeGetResponse, RpcDiscoveryBoostrapStatsResponse,
13 RpcDiscoveryRoutingTableResponse, RpcGenesisBlockResponse, RpcGetBlockResponse,
14 RpcHealthCheckResponse, RpcHeartbeatGetResponse, RpcLedgerAccountDelegatorsGetResponse,
15 RpcLedgerAccountsResponse, RpcLedgerSlimAccountsResponse, RpcLedgerStatusGetResponse,
16 RpcMessageProgressResponse, RpcPeersGetResponse, RpcPooledUserCommandsResponse,
17 RpcPooledZkappCommandsResponse, RpcReadinessCheckResponse, RpcRequest,
18 RpcSnarkPoolCompletedJobsResponse, RpcSnarkPoolPendingJobsGetResponse, RpcStateGetError,
19 RpcStatusGetResponse, RpcTransactionInjectResponse, RpcTransactionPoolResponse,
20 RpcTransactionStatusGetResponse, RpcTransitionFrontierUserCommandsResponse,
21};
22use serde::{Deserialize, Serialize};
23
24use node::{
25 core::{
26 channels::{mpsc, oneshot},
27 requests::PendingRequests,
28 },
29 event_source::Event,
30 p2p::connection::P2pConnectionResponse,
31 rpc::RpcSnarkPoolJobGetResponse,
32 State,
33};
34pub use node::{
35 rpc::{
36 ActionStatsResponse, RpcActionStatsGetResponse, RpcId, RpcIdType,
37 RpcP2pConnectionOutgoingResponse, RpcScanStateSummaryGetResponse, RpcSnarkPoolGetResponse,
38 RpcSnarkerJobCommitResponse, RpcSnarkerJobSpecResponse, RpcStateGetResponse,
39 RpcSyncStatsGetResponse, RpcTransactionInjectSuccess,
40 },
41 rpc_effectful::RespondError,
42};
43
44use crate::NodeService;
45
46#[derive(Serialize, Deserialize, Debug)]
47pub enum RpcP2pConnectionIncomingResponse {
48 Answer(P2pConnectionResponse),
49 Result(Result<(), String>),
50}
51
52pub struct NodeRpcRequest {
53 pub req: RpcRequest,
54 pub responder: Box<dyn Send + std::any::Any>,
55}
56
57pub type RpcReceiver = mpsc::Receiver<NodeRpcRequest>;
58
59pub struct RpcService {
60 pending: PendingRequests<RpcIdType, Box<dyn Send + std::any::Any>>,
61
62 req_sender: mpsc::Sender<NodeRpcRequest>,
63 req_receiver: mpsc::Receiver<NodeRpcRequest>,
64}
65
66impl Default for RpcService {
67 fn default() -> Self {
68 Self::new()
69 }
70}
71
72impl RpcService {
73 pub fn new() -> Self {
74 let (tx, rx) = mpsc::channel(8);
75 Self {
76 pending: Default::default(),
77 req_sender: tx,
78 req_receiver: rx,
79 }
80 }
81
82 pub fn req_sender(&self) -> RpcSender {
84 RpcSender::new(self.req_sender.clone())
85 }
86
87 pub fn req_receiver(&mut self) -> &mut RpcReceiver {
89 &mut self.req_receiver
90 }
91
92 pub fn process_request(&mut self, req: NodeRpcRequest) -> Event {
93 let rpc_id = self.pending.add(req.responder);
94 let req = req.req;
95 Event::Rpc(rpc_id, Box::new(req))
96 }
97}
98
99impl NodeService {
100 pub fn process_rpc_request(&mut self, req: NodeRpcRequest) {
101 let rpc_id = self.rpc.pending.add(req.responder);
102 let req = req.req;
103 let tx = self.event_sender.clone();
104
105 let _ = tx.send(Event::Rpc(rpc_id, Box::new(req)));
106 }
107}
108
109macro_rules! rpc_service_impl {
110 ($name:ident, $ty:ty) => {
111 fn $name(&mut self, rpc_id: RpcId, response: $ty) -> Result<(), RespondError> {
112 let entry = self.rpc.pending.remove(rpc_id);
113 let chan = entry.ok_or(RespondError::UnknownRpcId)?;
114 let chan = chan
115 .downcast::<oneshot::Sender<$ty>>()
116 .map_err(|_| RespondError::UnexpectedResponseType)?;
117 chan.send(response)
118 .map_err(|_| RespondError::RespondingFailed)?;
119 Ok(())
120 }
121 };
122}
123
124macro_rules! state_field_filter {
125 ($state:expr, $($part:ident)|*, $filter:expr ) => {
126 $(
127 if let Some(filter) = strip_root_field($filter, stringify!($part)) {
128 (serde_json::to_value(&$state.$part)?, format!("${filter}"))
129 } else
130 )*
131 {
132 (serde_json::to_value($state)?, $filter.to_string())
133 }
134 };
135}
136
137fn strip_root_field<'a>(filter: &'a str, field: &str) -> Option<&'a str> {
154 let strip_root = |f: &'a str| f.strip_prefix('$');
155 let field_char = |c: char| c.is_alphabetic() || c == '_';
156 let strip_dot_field = |f: &'a str| {
157 f.strip_prefix('.').and_then(|f| {
158 f.strip_prefix(field)
159 .and_then(|f| (!f.starts_with(field_char)).then_some(f))
160 })
161 };
162 let strip_index_field = |f: &'a str| {
163 f.strip_prefix("['")
164 .and_then(|f| f.strip_prefix(field))
165 .and_then(|f| f.strip_prefix("']"))
166 };
167 strip_root(filter).and_then(|f| strip_dot_field(f).or_else(|| strip_index_field(f)))
168}
169
170fn optimize_filtered_state(
171 state: &State,
172 filter: &str,
173) -> Result<(serde_json::Value, String), serde_json::Error> {
174 let (value, filter) = state_field_filter!(
175 state,
176 config
177 | p2p
178 | snark
179 | transition_frontier
180 | snark_pool
181 | external_snark_worker
182 | block_producer
183 | rpc
184 | watched_accounts,
185 filter
186 );
187 Ok((value, filter))
188}
189
190impl node::rpc_effectful::RpcService for NodeService {
191 fn respond_state_get(
192 &mut self,
193 rpc_id: RpcId,
194 (state, filter): (&State, Option<&str>),
195 ) -> Result<(), RespondError> {
196 let entry = self.rpc.pending.remove(rpc_id);
197 let chan = entry.ok_or(RespondError::UnknownRpcId)?;
198 let chan = chan
199 .downcast::<oneshot::Sender<RpcStateGetResponse>>()
200 .or(Err(RespondError::UnexpectedResponseType))?;
201 let response = if let Some(filter) = filter {
202 let (json_state, filter) = optimize_filtered_state(state, filter)?;
203 match filter.parse::<jsonpath_rust::JsonPathInst>() {
204 Ok(filter) => {
205 let values = filter
206 .find_slice(&json_state, Default::default())
207 .into_iter()
208 .map(|p| (*p).clone())
209 .collect::<Vec<_>>();
210 Ok(if values.len() == 1 {
211 values[0].clone()
212 } else {
213 serde_json::Value::Array(values)
214 })
215 }
216 Err(err) => Err(RpcStateGetError::FilterError(err)),
217 }
218 } else {
219 Ok(serde_json::to_value(state)?)
220 };
221 chan.send(response)
222 .or(Err(RespondError::RespondingFailed))?;
223 Ok(())
224 }
225 rpc_service_impl!(respond_status_get, RpcStatusGetResponse);
226
227 rpc_service_impl!(respond_heartbeat_get, RpcHeartbeatGetResponse);
228
229 rpc_service_impl!(respond_sync_stats_get, RpcSyncStatsGetResponse);
230 rpc_service_impl!(respond_action_stats_get, RpcActionStatsGetResponse);
231 rpc_service_impl!(
232 respond_block_producer_stats_get,
233 RpcBlockProducerStatsGetResponse
234 );
235 rpc_service_impl!(
236 respond_message_progress_stats_get,
237 RpcMessageProgressResponse
238 );
239 rpc_service_impl!(respond_peers_get, RpcPeersGetResponse);
240 rpc_service_impl!(
241 respond_p2p_connection_outgoing,
242 RpcP2pConnectionOutgoingResponse
243 );
244
245 fn respond_p2p_connection_incoming_answer(
246 &mut self,
247 rpc_id: RpcId,
248 response: P2pConnectionResponse,
249 ) -> Result<(), RespondError> {
250 let entry = self.rpc.pending.get(rpc_id);
251 let chan = entry.ok_or(RespondError::UnknownRpcId)?;
252 let chan = chan
253 .downcast_ref::<mpsc::Sender<RpcP2pConnectionIncomingResponse>>()
254 .ok_or(RespondError::UnexpectedResponseType)?
255 .clone();
256 chan.try_send(RpcP2pConnectionIncomingResponse::Answer(response))
257 .or(Err(RespondError::RespondingFailed))?;
258 Ok(())
259 }
260
261 fn respond_p2p_connection_incoming(
262 &mut self,
263 rpc_id: RpcId,
264 response: Result<(), String>,
265 ) -> Result<(), RespondError> {
266 let entry = self.rpc.pending.remove(rpc_id);
267 let chan = entry.ok_or(RespondError::UnknownRpcId)?;
268 let chan = chan
269 .downcast::<mpsc::Sender<RpcP2pConnectionIncomingResponse>>()
270 .or(Err(RespondError::UnexpectedResponseType))?;
271 chan.try_send(RpcP2pConnectionIncomingResponse::Result(response))
272 .or(Err(RespondError::RespondingFailed))?;
273 Ok(())
274 }
275
276 rpc_service_impl!(
277 respond_scan_state_summary_get,
278 RpcScanStateSummaryGetResponse
279 );
280 rpc_service_impl!(respond_snark_pool_get, RpcSnarkPoolGetResponse);
281 rpc_service_impl!(respond_snark_pool_job_get, RpcSnarkPoolJobGetResponse);
282 rpc_service_impl!(
283 respond_snark_pool_completed_jobs_get,
284 RpcSnarkPoolCompletedJobsResponse
285 );
286 rpc_service_impl!(
287 respond_snark_pool_pending_jobs_get,
288 RpcSnarkPoolPendingJobsGetResponse
289 );
290 rpc_service_impl!(respond_snarker_job_commit, RpcSnarkerJobCommitResponse);
291 rpc_service_impl!(
292 respond_snarker_job_spec,
293 node::rpc::RpcSnarkerJobSpecResponse
294 );
295 rpc_service_impl!(
296 respond_snarker_workers,
297 node::rpc::RpcSnarkerWorkersResponse
298 );
299 rpc_service_impl!(
300 respond_snarker_config_get,
301 node::rpc::RpcSnarkerConfigGetResponse
302 );
303 rpc_service_impl!(respond_health_check, RpcHealthCheckResponse);
304 rpc_service_impl!(respond_readiness_check, RpcReadinessCheckResponse);
305 rpc_service_impl!(
306 respond_discovery_routing_table,
307 RpcDiscoveryRoutingTableResponse
308 );
309 rpc_service_impl!(
310 respond_discovery_bootstrap_stats,
311 RpcDiscoveryBoostrapStatsResponse
312 );
313 rpc_service_impl!(respond_transaction_pool, RpcTransactionPoolResponse);
314 rpc_service_impl!(respond_ledger_slim_accounts, RpcLedgerSlimAccountsResponse);
315 rpc_service_impl!(respond_ledger_accounts, RpcLedgerAccountsResponse);
316 rpc_service_impl!(respond_transaction_inject, RpcTransactionInjectResponse);
317 rpc_service_impl!(
318 respond_transition_frontier_commands,
319 RpcTransitionFrontierUserCommandsResponse
320 );
321 rpc_service_impl!(respond_best_chain, RpcBestChainResponse);
322 rpc_service_impl!(
323 respond_consensus_constants,
324 RpcConsensusConstantsGetResponse
325 );
326 rpc_service_impl!(respond_transaction_status, RpcTransactionStatusGetResponse);
327 rpc_service_impl!(respond_block_get, RpcGetBlockResponse);
328 rpc_service_impl!(respond_pooled_user_commands, RpcPooledUserCommandsResponse);
329 rpc_service_impl!(
330 respond_pooled_zkapp_commands,
331 RpcPooledZkappCommandsResponse
332 );
333 rpc_service_impl!(respond_genesis_block, RpcGenesisBlockResponse);
334 rpc_service_impl!(respond_consensus_time_get, RpcConsensusTimeGetResponse);
335 rpc_service_impl!(respond_ledger_status_get, RpcLedgerStatusGetResponse);
336 rpc_service_impl!(
337 respond_ledger_account_delegators_get,
338 RpcLedgerAccountDelegatorsGetResponse
339 );
340}
341
342#[cfg(test)]
343mod tests {
344 use super::strip_root_field;
345
346 #[test]
347 fn strip_root_field_test() {
348 for (filter, expected) in [
349 ("$.field", Some("")),
350 ("$['field']", Some("")),
351 ("$.field.another", Some(".another")),
352 ("$['field'].another", Some(".another")),
353 ("$.another", None),
354 ("$.field_1", None),
355 ("$.fields", None),
356 ] {
357 let actual = strip_root_field(filter, "field");
358 assert_eq!(actual, expected)
359 }
360 }
361}