openmina_bootstrap_sandbox/
client.rs

1use binprot::BinProtRead;
2use libp2p::{futures::StreamExt, swarm::SwarmEvent, PeerId, Swarm};
3use libp2p_rpc_behaviour::{Event as RpcEvent, Received, StreamId};
4use mina_p2p_messages::{
5    rpc::GetBestTipV2,
6    rpc_kernel::{self, QueryHeader, ResponseHeader, ResponsePayload, RpcMethod},
7};
8
9use super::behaviour::{Behaviour, Event};
10
11use thiserror::Error;
12
13pub struct Client {
14    swarm: Swarm<Behaviour>,
15    peer: Option<PeerId>,
16    stream: Option<StreamId>,
17    id: u64,
18}
19
20#[derive(Debug, Error)]
21pub enum ClientError {
22    #[error("{0}")]
23    Binprot(#[from] binprot::Error),
24    #[error("{0:?}")]
25    InternalError(rpc_kernel::Error),
26    #[error("libp2p stop working")]
27    Libp2p,
28}
29
30impl Client {
31    pub fn new(swarm: Swarm<Behaviour>) -> Self {
32        Client {
33            swarm,
34            peer: None,
35            stream: None,
36            id: 1,
37        }
38    }
39
40    pub async fn rpc<M>(&mut self, query: M::Query) -> Result<M::Response, ClientError>
41    where
42        M: RpcMethod,
43    {
44        let mut query = Some(query);
45        if let (Some(peer_id), Some(stream_id)) = (self.peer, self.stream) {
46            if let Some(query) = query.take() {
47                self.swarm
48                    .behaviour_mut()
49                    .rpc
50                    .query::<M>(peer_id, stream_id, self.id, query)?;
51                self.id += 1;
52            }
53        }
54
55        loop {
56            match self.swarm.next().await.ok_or(ClientError::Libp2p)? {
57                SwarmEvent::Behaviour(Event::Rpc((peer_id, RpcEvent::ConnectionEstablished))) => {
58                    log::info!("new connection {peer_id}");
59
60                    self.peer = Some(peer_id);
61                    self.swarm.behaviour_mut().rpc.open(peer_id, 0);
62                }
63                SwarmEvent::Behaviour(Event::Rpc((peer_id, RpcEvent::ConnectionClosed))) => {
64                    log::info!("connection closed {peer_id}");
65                    if self.peer == Some(peer_id) {
66                        self.peer = None;
67                        // TODO: resend
68                    }
69                }
70                SwarmEvent::Behaviour(Event::Rpc((
71                    peer_id,
72                    RpcEvent::Stream {
73                        stream_id,
74                        received,
75                    },
76                ))) => match received {
77                    Received::HandshakeDone => {
78                        log::info!("new stream {peer_id} {stream_id:?}");
79                        if self.stream.is_none() {
80                            self.stream = Some(stream_id);
81                        }
82
83                        if let (Some(peer_id), Some(stream_id)) = (self.peer, self.stream) {
84                            if let Some(query) = query.take() {
85                                self.swarm
86                                    .behaviour_mut()
87                                    .rpc
88                                    .query::<M>(peer_id, stream_id, self.id, query)?;
89                                self.id += 1;
90                            }
91                        }
92                    }
93                    Received::Menu(menu) => {
94                        log::info!("menu: {menu:?}");
95                    }
96                    Received::Query {
97                        header: QueryHeader { tag, version, id },
98                        bytes,
99                    } => {
100                        if tag.to_string_lossy() == "get_best_tip" && version == 2 {
101                            let _ = bytes;
102                            self.swarm
103                                .behaviour_mut()
104                                .rpc
105                                .respond::<GetBestTipV2>(peer_id, stream_id, id, Ok(None))
106                                .unwrap();
107                        } else {
108                            log::warn!("unhandled query: {tag} {version}");
109                        }
110                    }
111                    Received::Response {
112                        header: ResponseHeader { id },
113                        bytes,
114                    } => {
115                        if id + 1 == self.id {
116                            let mut bytes = bytes.as_slice();
117                            let response =
118                                ResponsePayload::<M::Response>::binprot_read(&mut bytes)?
119                                    .0
120                                    .map_err(ClientError::InternalError)?
121                                    .0;
122                            return Ok(response);
123                        }
124                    }
125                },
126                _ => {}
127            }
128        }
129    }
130}