openmina_node_common/service/rpc/
sender.rs

1#[cfg(target_family = "wasm")]
2use gloo_utils::format::JsValueSerdeExt;
3use serde::Serialize;
4#[cfg(target_family = "wasm")]
5use wasm_bindgen::prelude::*;
6
7use node::{
8    core::channels::{mpsc, oneshot},
9    p2p::connection::outgoing::P2pConnectionOutgoingInitOpts,
10    rpc::*,
11};
12
13use super::{
14    ledger::Ledger, state::State, stats::Stats, transaction_pool::TransactionPool,
15    transition_frontier::TransitionFrontier, NodeRpcRequest,
16};
17
18#[derive(Clone)]
19#[cfg_attr(target_family = "wasm", wasm_bindgen)]
20pub struct RpcSender {
21    tx: mpsc::Sender<NodeRpcRequest>,
22}
23
24impl RpcSender {
25    pub fn new(tx: mpsc::Sender<NodeRpcRequest>) -> Self {
26        Self { tx }
27    }
28
29    pub async fn oneshot_request<T>(&self, req: RpcRequest) -> Option<T>
30    where
31        T: 'static + Send + Serialize,
32    {
33        let (tx, rx) = oneshot::channel::<T>();
34        let responder = Box::new(tx);
35        let sender = self.tx.clone();
36        let _ = sender.send(NodeRpcRequest { req, responder }).await;
37
38        rx.await.ok()
39    }
40
41    pub async fn multishot_request<T>(
42        &self,
43        expected_messages: usize,
44        req: RpcRequest,
45    ) -> mpsc::Receiver<T>
46    where
47        T: 'static + Send + Serialize,
48    {
49        let (tx, rx) = mpsc::channel::<T>(expected_messages);
50        let responder = Box::new(tx);
51        let sender = self.tx.clone();
52        let _ = sender.send(NodeRpcRequest { req, responder }).await;
53
54        rx
55    }
56}
57
58impl RpcSender {
59    pub async fn peer_connect(
60        &self,
61        opts: P2pConnectionOutgoingInitOpts,
62    ) -> Result<String, String> {
63        let peer_id = opts.peer_id().to_string();
64        let req = RpcRequest::P2pConnectionOutgoing(opts);
65        self.oneshot_request::<RpcP2pConnectionOutgoingResponse>(req)
66            .await
67            .ok_or_else(|| "state machine shut down".to_owned())??;
68
69        Ok(peer_id)
70    }
71}
72
73#[cfg_attr(target_family = "wasm", wasm_bindgen)]
74impl RpcSender {
75    pub fn state(&self) -> State {
76        State::new(self.clone())
77    }
78
79    pub fn stats(&self) -> Stats {
80        Stats::new(self.clone())
81    }
82
83    pub fn transaction_pool(&self) -> TransactionPool {
84        TransactionPool::new(self.clone())
85    }
86
87    pub fn transition_frontier(&self) -> TransitionFrontier {
88        TransitionFrontier::new(self.clone())
89    }
90
91    pub fn ledger(&self) -> Ledger {
92        Ledger::new(self.clone())
93    }
94}
95
96#[cfg(target_family = "wasm")]
97#[cfg_attr(target_family = "wasm", wasm_bindgen)]
98impl RpcSender {
99    pub async fn status(&self) -> JsValue {
100        let res = self
101            .oneshot_request::<RpcStatusGetResponse>(RpcRequest::StatusGet)
102            .await
103            .flatten();
104        JsValue::from_serde(&res).unwrap_or_default()
105    }
106
107    pub async fn make_heartbeat(&self) -> JsValue {
108        let res = self
109            .oneshot_request::<RpcHeartbeatGetResponse>(RpcRequest::HeartbeatGet)
110            .await
111            .flatten();
112        JsValue::from_serde(&res).unwrap_or_default()
113    }
114}