1use std::{
2 collections::{HashSet, VecDeque},
3 net::SocketAddr,
4 process::{Child, Command},
5 time::SystemTime,
6};
7
8use reqwest::blocking::{Client, ClientBuilder};
9use serde::{Deserialize, Serialize};
10
11pub struct Debugger {
12 child: Option<Child>,
13 host: &'static str,
14 port: u16,
15 client: Client,
16}
17
18#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
19pub struct Connection {
20 pub info: ConnectionInfo,
21 pub incoming: bool,
22 pub timestamp: SystemTime,
23 pub timestamp_close: SystemTime,
24
25 pub alias: String,
26}
27
28#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
29pub struct ConnectionInfo {
30 pub addr: SocketAddr,
31 pub pid: u32,
32 pub fd: u32,
33}
34
35#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
36pub struct FullMessage {
37 pub connection_id: u64,
38 pub remote_addr: String,
39 pub incoming: bool,
40 pub timestamp: SystemTime,
41 pub stream_id: StreamId,
42 pub stream_kind: String,
43 pub message: serde_json::Value,
44 pub size: u32,
45}
46
47#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
48#[serde(rename_all = "snake_case")]
49pub enum StreamId {
50 Handshake,
51 Forward(u64),
52 Backward(u64),
53}
54
55impl Debugger {
56 pub fn drone_ci() -> Self {
61 Debugger {
62 child: None,
63 host: "localhost",
64 port: 8000,
65 client: ClientBuilder::new().build().unwrap(),
66 }
67 }
68
69 pub fn spawn(port: u16) -> Self {
70 let mut cmd = Command::new("bpf-recorder");
71 cmd.env("SERVER_PORT", port.to_string());
72 Debugger {
73 child: Some(cmd.spawn().expect("cannot spawn debugger")),
74 host: "localhost",
75 port,
76 client: ClientBuilder::new().build().unwrap(),
77 }
78 }
79
80 pub fn kill(&mut self) {
81 if let Some(mut child) = self.child.take() {
82 use nix::{
83 sys::signal::{self, Signal},
84 unistd::Pid,
85 };
86
87 if let Err(err) = signal::kill(Pid::from_raw(child.id() as i32), Signal::SIGINT) {
88 eprintln!("error sending ctrl+c to Network debugger: {err}");
89 }
90 match child.try_wait() {
91 Err(err) => {
92 eprintln!("error getting status from Network debugger: {err}");
93 }
94 Ok(None) => {
95 eprintln!("error getting status from Network debugger");
96 }
97 Ok(Some(status)) => {
98 eprintln!("network debugger {status}");
99 }
100 }
101 }
102 }
103
104 pub fn get_connection(&self, id: u64) -> anyhow::Result<Connection> {
105 let port = self.port;
106 let host = self.host;
107 let res = self
108 .client
109 .get(format!("http://{host}:{port}/connection/{id}"))
110 .send()?
111 .text()?;
112 serde_json::from_str(&res).map_err(From::from)
113 }
114
115 pub fn get_connections(&self, params: &str) -> anyhow::Result<Vec<(u64, Connection)>> {
116 let port = self.port;
117 let host = self.host;
118 let res = self
119 .client
120 .get(format!("http://{host}:{port}/connections?{params}"))
121 .send()?
122 .text()?;
123 serde_json::from_str(&res).map_err(From::from)
124 }
125
126 pub fn get_message(&self, id: u64) -> anyhow::Result<Vec<u8>> {
127 let port = self.port;
128 let host = self.host;
129 self.client
130 .get(format!("http://{host}:{port}/message_bin/{id}"))
131 .send()?
132 .bytes()
133 .map(|x| x.to_vec())
134 .map_err(Into::into)
135 }
136
137 pub fn get_messages(&self, params: &str) -> anyhow::Result<Vec<(u64, FullMessage)>> {
138 let port = self.port;
139 let host = self.host;
140 let res = self
141 .client
142 .get(format!("http://{host}:{port}/messages?{params}"))
143 .send()?
144 .text()?;
145 serde_json::from_str(&res).map_err(From::from)
146 }
147
148 pub fn current_cursor(&self) -> u64 {
149 self.get_messages("direction=reverse&limit=1")
150 .map_err(|err| eprintln!("determine cursor error: {err}"))
151 .ok()
152 .and_then(|msgs| msgs.first().map(|(id, _)| *id))
153 .unwrap_or_default()
154 }
155
156 pub fn connections_raw(&self, cursor: u64) -> Connections<'_> {
157 Connections {
158 inner: self,
159 cursor,
160 buffer: VecDeque::default(),
161 }
162 }
163
164 pub fn connections(&self) -> ConnectionsHandshaked<'_> {
165 ConnectionsHandshaked {
166 messages: self.messages(0, "stream_kind=/noise"),
167 ids_cache: HashSet::new(),
168 }
169 }
170
171 pub fn messages(&self, cursor: u64, additional: &'static str) -> Messages<'_> {
172 Messages {
173 inner: self,
174 cursor,
175 additional,
176 buffer: VecDeque::default(),
177 }
178 }
179}
180
181pub struct ConnectionsHandshaked<'a> {
182 messages: Messages<'a>,
183 ids_cache: HashSet<u64>,
184}
185
186impl Iterator for ConnectionsHandshaked<'_> {
187 type Item = u64;
188
189 fn next(&mut self) -> Option<Self::Item> {
190 loop {
191 let (_, msg) = self.messages.next()?;
192 let id = msg.connection_id;
193 if self.ids_cache.insert(id) {
194 break Some(id);
195 }
196 }
197 }
198}
199
200pub struct Connections<'a> {
201 inner: &'a Debugger,
202 cursor: u64,
203 buffer: VecDeque<(u64, Connection)>,
204}
205
206impl Iterator for Connections<'_> {
207 type Item = (u64, Connection);
208
209 fn next(&mut self) -> Option<Self::Item> {
210 if self.buffer.is_empty() {
211 let params = format!("direction=forward&limit=100&id={}", self.cursor);
212 let msgs = self
213 .inner
214 .get_connections(¶ms)
215 .map_err(|err| eprintln!("{err}"))
216 .ok()?;
217 let (last_id, _) = msgs.last()?;
218 self.cursor = *last_id + 1;
219 self.buffer.extend(msgs);
220 }
221 self.buffer.pop_front()
222 }
223}
224
225pub struct Messages<'a> {
226 inner: &'a Debugger,
227 cursor: u64,
228 additional: &'static str,
229 buffer: VecDeque<(u64, FullMessage)>,
230}
231
232impl Iterator for Messages<'_> {
233 type Item = (u64, FullMessage);
234
235 fn next(&mut self) -> Option<Self::Item> {
236 if self.buffer.is_empty() {
237 let params = if self.additional.is_empty() {
238 format!("direction=forward&limit=100&id={}", self.cursor)
239 } else {
240 format!(
241 "direction=forward&limit=100&id={}&{}",
242 self.cursor, self.additional
243 )
244 };
245
246 let msgs = self
247 .inner
248 .get_messages(¶ms)
249 .map_err(|err| eprintln!("{err}"))
250 .ok()?;
251 let (last_id, _) = msgs.last()?;
252 self.cursor = *last_id + 1;
253 self.buffer.extend(msgs);
254 }
255 self.buffer.pop_front()
256 }
257}
258
259impl Drop for Debugger {
260 fn drop(&mut self) {
261 self.kill();
262 }
263}