openmina_bootstrap_sandbox/
client.rs1use binprot::BinProtRead;
2use libp2p::{futures::StreamExt, swarm::SwarmEvent, PeerId, Swarm};
3use libp2p_rpc_behaviour::{Event as RpcEvent, Received, StreamId};
4use mina_p2p_messages::{
5 rpc::GetBestTipV2,
6 rpc_kernel::{self, QueryHeader, ResponseHeader, ResponsePayload, RpcMethod},
7};
8
9use super::behaviour::{Behaviour, Event};
10
11use thiserror::Error;
12
13pub struct Client {
14 swarm: Swarm<Behaviour>,
15 peer: Option<PeerId>,
16 stream: Option<StreamId>,
17 id: u64,
18}
19
20#[derive(Debug, Error)]
21pub enum ClientError {
22 #[error("{0}")]
23 Binprot(#[from] binprot::Error),
24 #[error("{0:?}")]
25 InternalError(rpc_kernel::Error),
26 #[error("libp2p stop working")]
27 Libp2p,
28}
29
30impl Client {
31 pub fn new(swarm: Swarm<Behaviour>) -> Self {
32 Client {
33 swarm,
34 peer: None,
35 stream: None,
36 id: 1,
37 }
38 }
39
40 pub async fn rpc<M>(&mut self, query: M::Query) -> Result<M::Response, ClientError>
41 where
42 M: RpcMethod,
43 {
44 let mut query = Some(query);
45 if let (Some(peer_id), Some(stream_id)) = (self.peer, self.stream) {
46 if let Some(query) = query.take() {
47 self.swarm
48 .behaviour_mut()
49 .rpc
50 .query::<M>(peer_id, stream_id, self.id, query)?;
51 self.id += 1;
52 }
53 }
54
55 loop {
56 match self.swarm.next().await.ok_or(ClientError::Libp2p)? {
57 SwarmEvent::Behaviour(Event::Rpc((peer_id, RpcEvent::ConnectionEstablished))) => {
58 log::info!("new connection {peer_id}");
59
60 self.peer = Some(peer_id);
61 self.swarm.behaviour_mut().rpc.open(peer_id, 0);
62 }
63 SwarmEvent::Behaviour(Event::Rpc((peer_id, RpcEvent::ConnectionClosed))) => {
64 log::info!("connection closed {peer_id}");
65 if self.peer == Some(peer_id) {
66 self.peer = None;
67 }
69 }
70 SwarmEvent::Behaviour(Event::Rpc((
71 peer_id,
72 RpcEvent::Stream {
73 stream_id,
74 received,
75 },
76 ))) => match received {
77 Received::HandshakeDone => {
78 log::info!("new stream {peer_id} {stream_id:?}");
79 if self.stream.is_none() {
80 self.stream = Some(stream_id);
81 }
82
83 if let (Some(peer_id), Some(stream_id)) = (self.peer, self.stream) {
84 if let Some(query) = query.take() {
85 self.swarm
86 .behaviour_mut()
87 .rpc
88 .query::<M>(peer_id, stream_id, self.id, query)?;
89 self.id += 1;
90 }
91 }
92 }
93 Received::Menu(menu) => {
94 log::info!("menu: {menu:?}");
95 }
96 Received::Query {
97 header: QueryHeader { tag, version, id },
98 bytes,
99 } => {
100 if tag.to_string_lossy() == "get_best_tip" && version == 2 {
101 let _ = bytes;
102 self.swarm
103 .behaviour_mut()
104 .rpc
105 .respond::<GetBestTipV2>(peer_id, stream_id, id, Ok(None))
106 .unwrap();
107 } else {
108 log::warn!("unhandled query: {tag} {version}");
109 }
110 }
111 Received::Response {
112 header: ResponseHeader { id },
113 bytes,
114 } => {
115 if id + 1 == self.id {
116 let mut bytes = bytes.as_slice();
117 let response =
118 ResponsePayload::<M::Response>::binprot_read(&mut bytes)?
119 .0
120 .map_err(ClientError::InternalError)?
121 .0;
122 return Ok(response);
123 }
124 }
125 },
126 _ => {}
127 }
128 }
129 }
130}