mina_node_common/service/rpc/
mod.rs1mod sender;
63pub use sender::RpcSender;
64
65pub mod ledger;
66pub mod state;
67pub mod stats;
68pub mod transaction_pool;
69pub mod transition_frontier;
70
71use node::rpc::{
72 RpcBestChainResponse, RpcBlockProducerStatsGetResponse, RpcConsensusConstantsGetResponse,
73 RpcConsensusTimeGetResponse, RpcDiscoveryBoostrapStatsResponse,
74 RpcDiscoveryRoutingTableResponse, RpcGenesisBlockResponse, RpcGetBlockResponse,
75 RpcHealthCheckResponse, RpcHeartbeatGetResponse, RpcLedgerAccountDelegatorsGetResponse,
76 RpcLedgerAccountsResponse, RpcLedgerSlimAccountsResponse, RpcLedgerStatusGetResponse,
77 RpcMessageProgressResponse, RpcPeersGetResponse, RpcPooledUserCommandsResponse,
78 RpcPooledZkappCommandsResponse, RpcReadinessCheckResponse, RpcRequest,
79 RpcSnarkPoolCompletedJobsResponse, RpcSnarkPoolPendingJobsGetResponse, RpcStateGetError,
80 RpcStatusGetResponse, RpcTransactionInjectResponse, RpcTransactionPoolResponse,
81 RpcTransactionStatusGetResponse, RpcTransitionFrontierUserCommandsResponse,
82};
83use serde::{Deserialize, Serialize};
84
85use node::{
86 core::{
87 channels::{mpsc, oneshot},
88 requests::PendingRequests,
89 },
90 event_source::Event,
91 p2p::connection::P2pConnectionResponse,
92 rpc::RpcSnarkPoolJobGetResponse,
93 State,
94};
95pub use node::{
96 rpc::{
97 ActionStatsResponse, RpcActionStatsGetResponse, RpcId, RpcIdType,
98 RpcP2pConnectionOutgoingResponse, RpcScanStateSummaryGetResponse, RpcSnarkPoolGetResponse,
99 RpcSnarkerJobCommitResponse, RpcSnarkerJobSpecResponse, RpcStateGetResponse,
100 RpcSyncStatsGetResponse, RpcTransactionInjectSuccess,
101 },
102 rpc_effectful::RespondError,
103};
104
105use crate::NodeService;
106
107#[derive(Serialize, Deserialize, Debug)]
108pub enum RpcP2pConnectionIncomingResponse {
109 Answer(P2pConnectionResponse),
110 Result(Result<(), String>),
111}
112
113pub struct NodeRpcRequest {
114 pub req: RpcRequest,
115 pub responder: Box<dyn Send + std::any::Any>,
116}
117
118pub type RpcReceiver = mpsc::Receiver<NodeRpcRequest>;
119
120pub struct RpcService {
121 pending: PendingRequests<RpcIdType, Box<dyn Send + std::any::Any>>,
122
123 req_sender: mpsc::Sender<NodeRpcRequest>,
124 req_receiver: mpsc::Receiver<NodeRpcRequest>,
125}
126
127impl Default for RpcService {
128 fn default() -> Self {
129 Self::new()
130 }
131}
132
133impl RpcService {
134 pub fn new() -> Self {
135 let (tx, rx) = mpsc::channel(8);
136 Self {
137 pending: Default::default(),
138 req_sender: tx,
139 req_receiver: rx,
140 }
141 }
142
143 pub fn req_sender(&self) -> RpcSender {
145 RpcSender::new(self.req_sender.clone())
146 }
147
148 pub fn req_receiver(&mut self) -> &mut RpcReceiver {
150 &mut self.req_receiver
151 }
152
153 pub fn process_request(&mut self, req: NodeRpcRequest) -> Event {
154 let rpc_id = self.pending.add(req.responder);
155 let req = req.req;
156 Event::Rpc(rpc_id, Box::new(req))
157 }
158}
159
160impl NodeService {
161 pub fn process_rpc_request(&mut self, req: NodeRpcRequest) {
162 let rpc_id = self.rpc.pending.add(req.responder);
163 let req = req.req;
164 let tx = self.event_sender.clone();
165
166 let _ = tx.send(Event::Rpc(rpc_id, Box::new(req)));
167 }
168}
169
170macro_rules! rpc_service_impl {
171 ($name:ident, $ty:ty) => {
172 fn $name(&mut self, rpc_id: RpcId, response: $ty) -> Result<(), RespondError> {
173 let entry = self.rpc.pending.remove(rpc_id);
174 let chan = entry.ok_or(RespondError::UnknownRpcId)?;
175 let chan = chan
176 .downcast::<oneshot::Sender<$ty>>()
177 .map_err(|_| RespondError::UnexpectedResponseType)?;
178 chan.send(response)
179 .map_err(|_| RespondError::RespondingFailed)?;
180 Ok(())
181 }
182 };
183}
184
185macro_rules! state_field_filter {
186 ($state:expr, $($part:ident)|*, $filter:expr ) => {
187 $(
188 if let Some(filter) = strip_root_field($filter, stringify!($part)) {
189 (serde_json::to_value(&$state.$part)?, format!("${filter}"))
190 } else
191 )*
192 {
193 (serde_json::to_value($state)?, $filter.to_string())
194 }
195 };
196}
197
198fn strip_root_field<'a>(filter: &'a str, field: &str) -> Option<&'a str> {
215 let strip_root = |f: &'a str| f.strip_prefix('$');
216 let field_char = |c: char| c.is_alphabetic() || c == '_';
217 let strip_dot_field = |f: &'a str| {
218 f.strip_prefix('.').and_then(|f| {
219 f.strip_prefix(field)
220 .and_then(|f| (!f.starts_with(field_char)).then_some(f))
221 })
222 };
223 let strip_index_field = |f: &'a str| {
224 f.strip_prefix("['")
225 .and_then(|f| f.strip_prefix(field))
226 .and_then(|f| f.strip_prefix("']"))
227 };
228 strip_root(filter).and_then(|f| strip_dot_field(f).or_else(|| strip_index_field(f)))
229}
230
231fn optimize_filtered_state(
232 state: &State,
233 filter: &str,
234) -> Result<(serde_json::Value, String), serde_json::Error> {
235 let (value, filter) = state_field_filter!(
236 state,
237 config
238 | p2p
239 | snark
240 | transition_frontier
241 | snark_pool
242 | external_snark_worker
243 | block_producer
244 | rpc
245 | watched_accounts,
246 filter
247 );
248 Ok((value, filter))
249}
250
251impl node::rpc_effectful::RpcService for NodeService {
252 fn respond_state_get(
253 &mut self,
254 rpc_id: RpcId,
255 (state, filter): (&State, Option<&str>),
256 ) -> Result<(), RespondError> {
257 let entry = self.rpc.pending.remove(rpc_id);
258 let chan = entry.ok_or(RespondError::UnknownRpcId)?;
259 let chan = chan
260 .downcast::<oneshot::Sender<RpcStateGetResponse>>()
261 .or(Err(RespondError::UnexpectedResponseType))?;
262 let response = if let Some(filter) = filter {
263 let (json_state, filter) = optimize_filtered_state(state, filter)?;
264 match filter.parse::<jsonpath_rust::JsonPathInst>() {
265 Ok(filter) => {
266 let values = filter
267 .find_slice(&json_state, Default::default())
268 .into_iter()
269 .map(|p| (*p).clone())
270 .collect::<Vec<_>>();
271 Ok(if values.len() == 1 {
272 values[0].clone()
273 } else {
274 serde_json::Value::Array(values)
275 })
276 }
277 Err(err) => Err(RpcStateGetError::FilterError(err)),
278 }
279 } else {
280 Ok(serde_json::to_value(state)?)
281 };
282 chan.send(response)
283 .or(Err(RespondError::RespondingFailed))?;
284 Ok(())
285 }
286 rpc_service_impl!(respond_status_get, RpcStatusGetResponse);
287
288 rpc_service_impl!(respond_heartbeat_get, RpcHeartbeatGetResponse);
289
290 rpc_service_impl!(respond_sync_stats_get, RpcSyncStatsGetResponse);
291 rpc_service_impl!(respond_action_stats_get, RpcActionStatsGetResponse);
292 rpc_service_impl!(
293 respond_block_producer_stats_get,
294 RpcBlockProducerStatsGetResponse
295 );
296 rpc_service_impl!(
297 respond_message_progress_stats_get,
298 RpcMessageProgressResponse
299 );
300 rpc_service_impl!(respond_peers_get, RpcPeersGetResponse);
301 rpc_service_impl!(
302 respond_p2p_connection_outgoing,
303 RpcP2pConnectionOutgoingResponse
304 );
305
306 fn respond_p2p_connection_incoming_answer(
307 &mut self,
308 rpc_id: RpcId,
309 response: P2pConnectionResponse,
310 ) -> Result<(), RespondError> {
311 let entry = self.rpc.pending.get(rpc_id);
312 let chan = entry.ok_or(RespondError::UnknownRpcId)?;
313 let chan = chan
314 .downcast_ref::<mpsc::Sender<RpcP2pConnectionIncomingResponse>>()
315 .ok_or(RespondError::UnexpectedResponseType)?
316 .clone();
317 chan.try_send(RpcP2pConnectionIncomingResponse::Answer(response))
318 .or(Err(RespondError::RespondingFailed))?;
319 Ok(())
320 }
321
322 fn respond_p2p_connection_incoming(
323 &mut self,
324 rpc_id: RpcId,
325 response: Result<(), String>,
326 ) -> Result<(), RespondError> {
327 let entry = self.rpc.pending.remove(rpc_id);
328 let chan = entry.ok_or(RespondError::UnknownRpcId)?;
329 let chan = chan
330 .downcast::<mpsc::Sender<RpcP2pConnectionIncomingResponse>>()
331 .or(Err(RespondError::UnexpectedResponseType))?;
332 chan.try_send(RpcP2pConnectionIncomingResponse::Result(response))
333 .or(Err(RespondError::RespondingFailed))?;
334 Ok(())
335 }
336
337 rpc_service_impl!(
338 respond_scan_state_summary_get,
339 RpcScanStateSummaryGetResponse
340 );
341 rpc_service_impl!(respond_snark_pool_get, RpcSnarkPoolGetResponse);
342 rpc_service_impl!(respond_snark_pool_job_get, RpcSnarkPoolJobGetResponse);
343 rpc_service_impl!(
344 respond_snark_pool_completed_jobs_get,
345 RpcSnarkPoolCompletedJobsResponse
346 );
347 rpc_service_impl!(
348 respond_snark_pool_pending_jobs_get,
349 RpcSnarkPoolPendingJobsGetResponse
350 );
351 rpc_service_impl!(respond_snarker_job_commit, RpcSnarkerJobCommitResponse);
352 rpc_service_impl!(
353 respond_snarker_job_spec,
354 node::rpc::RpcSnarkerJobSpecResponse
355 );
356 rpc_service_impl!(
357 respond_snarker_workers,
358 node::rpc::RpcSnarkerWorkersResponse
359 );
360 rpc_service_impl!(
361 respond_snarker_config_get,
362 node::rpc::RpcSnarkerConfigGetResponse
363 );
364 rpc_service_impl!(respond_health_check, RpcHealthCheckResponse);
365 rpc_service_impl!(respond_readiness_check, RpcReadinessCheckResponse);
366 rpc_service_impl!(
367 respond_discovery_routing_table,
368 RpcDiscoveryRoutingTableResponse
369 );
370 rpc_service_impl!(
371 respond_discovery_bootstrap_stats,
372 RpcDiscoveryBoostrapStatsResponse
373 );
374 rpc_service_impl!(respond_transaction_pool, RpcTransactionPoolResponse);
375 rpc_service_impl!(respond_ledger_slim_accounts, RpcLedgerSlimAccountsResponse);
376 rpc_service_impl!(respond_ledger_accounts, RpcLedgerAccountsResponse);
377 rpc_service_impl!(respond_transaction_inject, RpcTransactionInjectResponse);
378 rpc_service_impl!(
379 respond_transition_frontier_commands,
380 RpcTransitionFrontierUserCommandsResponse
381 );
382 rpc_service_impl!(respond_best_chain, RpcBestChainResponse);
383 rpc_service_impl!(
384 respond_consensus_constants,
385 RpcConsensusConstantsGetResponse
386 );
387 rpc_service_impl!(respond_transaction_status, RpcTransactionStatusGetResponse);
388 rpc_service_impl!(respond_block_get, RpcGetBlockResponse);
389 rpc_service_impl!(respond_pooled_user_commands, RpcPooledUserCommandsResponse);
390 rpc_service_impl!(
391 respond_pooled_zkapp_commands,
392 RpcPooledZkappCommandsResponse
393 );
394 rpc_service_impl!(respond_genesis_block, RpcGenesisBlockResponse);
395 rpc_service_impl!(respond_consensus_time_get, RpcConsensusTimeGetResponse);
396 rpc_service_impl!(respond_ledger_status_get, RpcLedgerStatusGetResponse);
397 rpc_service_impl!(
398 respond_ledger_account_delegators_get,
399 RpcLedgerAccountDelegatorsGetResponse
400 );
401}
402
403#[cfg(test)]
404mod tests {
405 use super::strip_root_field;
406
407 #[test]
408 fn strip_root_field_test() {
409 for (filter, expected) in [
410 ("$.field", Some("")),
411 ("$['field']", Some("")),
412 ("$.field.another", Some(".another")),
413 ("$['field'].another", Some(".another")),
414 ("$.another", None),
415 ("$.field_1", None),
416 ("$.fields", None),
417 ] {
418 let actual = strip_root_field(filter, "field");
419 assert_eq!(actual, expected)
420 }
421 }
422}