openmina_node_common/service/rpc/
sender.rs1#[cfg(target_family = "wasm")]
2use gloo_utils::format::JsValueSerdeExt;
3use serde::Serialize;
4#[cfg(target_family = "wasm")]
5use wasm_bindgen::prelude::*;
6
7use node::{
8 core::channels::{mpsc, oneshot},
9 p2p::connection::outgoing::P2pConnectionOutgoingInitOpts,
10 rpc::*,
11};
12
13use super::{
14 ledger::Ledger, state::State, stats::Stats, transaction_pool::TransactionPool,
15 transition_frontier::TransitionFrontier, NodeRpcRequest,
16};
17
18#[derive(Clone)]
19#[cfg_attr(target_family = "wasm", wasm_bindgen)]
20pub struct RpcSender {
21 tx: mpsc::Sender<NodeRpcRequest>,
22}
23
24impl RpcSender {
25 pub fn new(tx: mpsc::Sender<NodeRpcRequest>) -> Self {
26 Self { tx }
27 }
28
29 pub async fn oneshot_request<T>(&self, req: RpcRequest) -> Option<T>
30 where
31 T: 'static + Send + Serialize,
32 {
33 let (tx, rx) = oneshot::channel::<T>();
34 let responder = Box::new(tx);
35 let sender = self.tx.clone();
36 let _ = sender.send(NodeRpcRequest { req, responder }).await;
37
38 rx.await.ok()
39 }
40
41 pub async fn multishot_request<T>(
42 &self,
43 expected_messages: usize,
44 req: RpcRequest,
45 ) -> mpsc::Receiver<T>
46 where
47 T: 'static + Send + Serialize,
48 {
49 let (tx, rx) = mpsc::channel::<T>(expected_messages);
50 let responder = Box::new(tx);
51 let sender = self.tx.clone();
52 let _ = sender.send(NodeRpcRequest { req, responder }).await;
53
54 rx
55 }
56}
57
58impl RpcSender {
59 pub async fn peer_connect(
60 &self,
61 opts: P2pConnectionOutgoingInitOpts,
62 ) -> Result<String, String> {
63 let peer_id = opts.peer_id().to_string();
64 let req = RpcRequest::P2pConnectionOutgoing(opts);
65 self.oneshot_request::<RpcP2pConnectionOutgoingResponse>(req)
66 .await
67 .ok_or_else(|| "state machine shut down".to_owned())??;
68
69 Ok(peer_id)
70 }
71}
72
73#[cfg_attr(target_family = "wasm", wasm_bindgen)]
74impl RpcSender {
75 pub fn state(&self) -> State {
76 State::new(self.clone())
77 }
78
79 pub fn stats(&self) -> Stats {
80 Stats::new(self.clone())
81 }
82
83 pub fn transaction_pool(&self) -> TransactionPool {
84 TransactionPool::new(self.clone())
85 }
86
87 pub fn transition_frontier(&self) -> TransitionFrontier {
88 TransitionFrontier::new(self.clone())
89 }
90
91 pub fn ledger(&self) -> Ledger {
92 Ledger::new(self.clone())
93 }
94}
95
96#[cfg(target_family = "wasm")]
97#[cfg_attr(target_family = "wasm", wasm_bindgen)]
98impl RpcSender {
99 pub async fn status(&self) -> JsValue {
100 let res = self
101 .oneshot_request::<RpcStatusGetResponse>(RpcRequest::StatusGet)
102 .await
103 .flatten();
104 JsValue::from_serde(&res).unwrap_or_default()
105 }
106
107 pub async fn make_heartbeat(&self) -> JsValue {
108 let res = self
109 .oneshot_request::<RpcHeartbeatGetResponse>(RpcRequest::HeartbeatGet)
110 .await
111 .flatten();
112 JsValue::from_serde(&res).unwrap_or_default()
113 }
114}