mina_node_testing/
network_debugger.rs

1use std::{
2    collections::{HashSet, VecDeque},
3    net::SocketAddr,
4    process::{Child, Command},
5    time::SystemTime,
6};
7
8use reqwest::blocking::{Client, ClientBuilder};
9use serde::{Deserialize, Serialize};
10
11pub struct Debugger {
12    child: Option<Child>,
13    host: &'static str,
14    port: u16,
15    client: Client,
16}
17
18#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
19pub struct Connection {
20    pub info: ConnectionInfo,
21    pub incoming: bool,
22    pub timestamp: SystemTime,
23    pub timestamp_close: SystemTime,
24
25    pub alias: String,
26}
27
28#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
29pub struct ConnectionInfo {
30    pub addr: SocketAddr,
31    pub pid: u32,
32    pub fd: u32,
33}
34
35#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
36pub struct FullMessage {
37    pub connection_id: u64,
38    pub remote_addr: String,
39    pub incoming: bool,
40    pub timestamp: SystemTime,
41    pub stream_id: StreamId,
42    pub stream_kind: String,
43    pub message: serde_json::Value,
44    pub size: u32,
45}
46
47#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
48#[serde(rename_all = "snake_case")]
49pub enum StreamId {
50    Handshake,
51    Forward(u64),
52    Backward(u64),
53}
54
55impl Debugger {
56    // TODO: Rename `drone_ci` to `external_ci` or `ci_sidecar` since the project
57    // no longer uses Drone CI. This method connects to an external debugger
58    // service (like the mina-network-debugger sidecar container) rather than
59    // spawning a local debugger process.
60    pub fn drone_ci() -> Self {
61        Debugger {
62            child: None,
63            host: "localhost",
64            port: 8000,
65            client: ClientBuilder::new().build().unwrap(),
66        }
67    }
68
69    pub fn spawn(port: u16) -> Self {
70        let mut cmd = Command::new("bpf-recorder");
71        cmd.env("SERVER_PORT", port.to_string());
72        Debugger {
73            child: Some(cmd.spawn().expect("cannot spawn debugger")),
74            host: "localhost",
75            port,
76            client: ClientBuilder::new().build().unwrap(),
77        }
78    }
79
80    pub fn kill(&mut self) {
81        if let Some(mut child) = self.child.take() {
82            use nix::{
83                sys::signal::{self, Signal},
84                unistd::Pid,
85            };
86
87            if let Err(err) = signal::kill(Pid::from_raw(child.id() as i32), Signal::SIGINT) {
88                eprintln!("error sending ctrl+c to Network debugger: {err}");
89            }
90            match child.try_wait() {
91                Err(err) => {
92                    eprintln!("error getting status from Network debugger: {err}");
93                }
94                Ok(None) => {
95                    eprintln!("error getting status from Network debugger");
96                }
97                Ok(Some(status)) => {
98                    eprintln!("network debugger {status}");
99                }
100            }
101        }
102    }
103
104    pub fn get_connection(&self, id: u64) -> anyhow::Result<Connection> {
105        let port = self.port;
106        let host = self.host;
107        let res = self
108            .client
109            .get(format!("http://{host}:{port}/connection/{id}"))
110            .send()?
111            .text()?;
112        serde_json::from_str(&res).map_err(From::from)
113    }
114
115    pub fn get_connections(&self, params: &str) -> anyhow::Result<Vec<(u64, Connection)>> {
116        let port = self.port;
117        let host = self.host;
118        let res = self
119            .client
120            .get(format!("http://{host}:{port}/connections?{params}"))
121            .send()?
122            .text()?;
123        serde_json::from_str(&res).map_err(From::from)
124    }
125
126    pub fn get_message(&self, id: u64) -> anyhow::Result<Vec<u8>> {
127        let port = self.port;
128        let host = self.host;
129        self.client
130            .get(format!("http://{host}:{port}/message_bin/{id}"))
131            .send()?
132            .bytes()
133            .map(|x| x.to_vec())
134            .map_err(Into::into)
135    }
136
137    pub fn get_messages(&self, params: &str) -> anyhow::Result<Vec<(u64, FullMessage)>> {
138        let port = self.port;
139        let host = self.host;
140        let res = self
141            .client
142            .get(format!("http://{host}:{port}/messages?{params}"))
143            .send()?
144            .text()?;
145        serde_json::from_str(&res).map_err(From::from)
146    }
147
148    pub fn current_cursor(&self) -> u64 {
149        self.get_messages("direction=reverse&limit=1")
150            .map_err(|err| eprintln!("determine cursor error: {err}"))
151            .ok()
152            .and_then(|msgs| msgs.first().map(|(id, _)| *id))
153            .unwrap_or_default()
154    }
155
156    pub fn connections_raw(&self, cursor: u64) -> Connections<'_> {
157        Connections {
158            inner: self,
159            cursor,
160            buffer: VecDeque::default(),
161        }
162    }
163
164    pub fn connections(&self) -> ConnectionsHandshaked<'_> {
165        ConnectionsHandshaked {
166            messages: self.messages(0, "stream_kind=/noise"),
167            ids_cache: HashSet::new(),
168        }
169    }
170
171    pub fn messages(&self, cursor: u64, additional: &'static str) -> Messages<'_> {
172        Messages {
173            inner: self,
174            cursor,
175            additional,
176            buffer: VecDeque::default(),
177        }
178    }
179}
180
181pub struct ConnectionsHandshaked<'a> {
182    messages: Messages<'a>,
183    ids_cache: HashSet<u64>,
184}
185
186impl Iterator for ConnectionsHandshaked<'_> {
187    type Item = u64;
188
189    fn next(&mut self) -> Option<Self::Item> {
190        loop {
191            let (_, msg) = self.messages.next()?;
192            let id = msg.connection_id;
193            if self.ids_cache.insert(id) {
194                break Some(id);
195            }
196        }
197    }
198}
199
200pub struct Connections<'a> {
201    inner: &'a Debugger,
202    cursor: u64,
203    buffer: VecDeque<(u64, Connection)>,
204}
205
206impl Iterator for Connections<'_> {
207    type Item = (u64, Connection);
208
209    fn next(&mut self) -> Option<Self::Item> {
210        if self.buffer.is_empty() {
211            let params = format!("direction=forward&limit=100&id={}", self.cursor);
212            let msgs = self
213                .inner
214                .get_connections(&params)
215                .map_err(|err| eprintln!("{err}"))
216                .ok()?;
217            let (last_id, _) = msgs.last()?;
218            self.cursor = *last_id + 1;
219            self.buffer.extend(msgs);
220        }
221        self.buffer.pop_front()
222    }
223}
224
225pub struct Messages<'a> {
226    inner: &'a Debugger,
227    cursor: u64,
228    additional: &'static str,
229    buffer: VecDeque<(u64, FullMessage)>,
230}
231
232impl Iterator for Messages<'_> {
233    type Item = (u64, FullMessage);
234
235    fn next(&mut self) -> Option<Self::Item> {
236        if self.buffer.is_empty() {
237            let params = if self.additional.is_empty() {
238                format!("direction=forward&limit=100&id={}", self.cursor)
239            } else {
240                format!(
241                    "direction=forward&limit=100&id={}&{}",
242                    self.cursor, self.additional
243                )
244            };
245
246            let msgs = self
247                .inner
248                .get_messages(&params)
249                .map_err(|err| eprintln!("{err}"))
250                .ok()?;
251            let (last_id, _) = msgs.last()?;
252            self.cursor = *last_id + 1;
253            self.buffer.extend(msgs);
254        }
255        self.buffer.pop_front()
256    }
257}
258
259impl Drop for Debugger {
260    fn drop(&mut self) {
261        self.kill();
262    }
263}