p2p/
p2p_event.rs

1use std::{
2    fmt,
3    net::{IpAddr, SocketAddr},
4};
5
6use derive_more::From;
7use serde::{Deserialize, Serialize};
8
9use crate::{
10    channels::{
11        signaling::{
12            discovery::SignalingDiscoveryChannelMsg, exchange::SignalingExchangeChannelMsg,
13        },
14        streaming_rpc::StreamingRpcChannelMsg,
15        transaction::TransactionPropagationChannelMsg,
16        ChannelId, ChannelMsg, MsgId,
17    },
18    connection::P2pConnectionResponse,
19    webrtc::ConnectionAuthEncrypted,
20    ConnectionAddr, PeerId,
21};
22
23#[derive(Serialize, Deserialize, From, Debug, Clone)]
24pub enum P2pEvent {
25    Connection(P2pConnectionEvent),
26    Channel(P2pChannelEvent),
27    MioEvent(MioEvent),
28}
29
30/// The mio service reports events.
31#[derive(Serialize, Deserialize, Debug, Clone)]
32pub enum MioEvent {
33    /// A new network interface was detected on the machine.
34    InterfaceDetected(IpAddr),
35    /// The interface is not available anymore.
36    InterfaceExpired(IpAddr),
37
38    /// Started listening on a local port.
39    ListenerReady { listener: SocketAddr },
40    /// Error listening on a local port
41    ListenerError { listener: SocketAddr, error: String },
42
43    /// The remote peer is trying to connect to us.
44    IncomingConnectionIsReady { listener: SocketAddr },
45    /// We accepted the connection from the remote peer.
46    IncomingConnectionDidAccept(Option<ConnectionAddr>, Result<(), String>),
47    /// The remote peer is trying to send us some data.
48    IncomingDataIsReady(ConnectionAddr),
49    /// We received the data from the remote peer.
50    IncomingDataDidReceive(ConnectionAddr, Result<crate::Data, String>),
51
52    /// We connected to the remote peer by the address.
53    OutgoingConnectionDidConnect(ConnectionAddr, Result<(), String>),
54    /// We sent some data to the remote peer.
55    OutgoingDataDidSend(ConnectionAddr, Result<(), String>),
56
57    /// The remote peer is disconnected gracefully or with an error.
58    ConnectionDidClose(ConnectionAddr, Result<(), String>),
59
60    /// The remote peer is disconnected by our node.
61    ConnectionDidCloseOnDemand(ConnectionAddr),
62}
63
64#[derive(Serialize, Deserialize, Debug, Clone)]
65pub enum P2pConnectionEvent {
66    OfferSdpReady(PeerId, Result<String, String>),
67    AnswerSdpReady(PeerId, Result<String, String>),
68    AnswerReceived(PeerId, P2pConnectionResponse),
69    Finalized(PeerId, Result<ConnectionAuthEncrypted, String>),
70    Closed(PeerId),
71}
72
73#[derive(Serialize, Deserialize, From, Debug, Clone)]
74pub enum P2pChannelEvent {
75    Opened(PeerId, ChannelId, Result<(), String>),
76    Sent(PeerId, ChannelId, MsgId, Result<(), String>),
77    Received(PeerId, Result<ChannelMsg, String>),
78    Closed(PeerId, ChannelId),
79}
80
81fn res_kind<T, E>(res: &Result<T, E>) -> &'static str {
82    match res {
83        Err(_) => "Err",
84        Ok(_) => "Ok",
85    }
86}
87
88impl fmt::Display for P2pEvent {
89    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90        write!(f, "P2p, ")?;
91        match self {
92            Self::Connection(v) => v.fmt(f),
93            Self::Channel(v) => v.fmt(f),
94            Self::MioEvent(v) => v.fmt(f),
95        }
96    }
97}
98
99impl fmt::Display for P2pConnectionEvent {
100    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101        write!(f, "Connection, ")?;
102        match self {
103            Self::OfferSdpReady(peer_id, res) => {
104                write!(f, "OfferSdpReady, {peer_id}, {}", res_kind(res))
105            }
106            Self::AnswerSdpReady(peer_id, res) => {
107                write!(f, "AnswerSdpReady, {peer_id}, {}", res_kind(res))
108            }
109            Self::AnswerReceived(peer_id, ans) => match ans {
110                P2pConnectionResponse::Accepted(_) => {
111                    write!(f, "AnswerReceived, {peer_id}, Accepted")
112                }
113                P2pConnectionResponse::Rejected(reason) => {
114                    write!(f, "AnswerReceived, {peer_id}, Rejected, {reason:?}")
115                }
116                P2pConnectionResponse::SignalDecryptionFailed => {
117                    write!(f, "SignalDecryptionFailed, {peer_id}")
118                }
119                P2pConnectionResponse::InternalError => {
120                    write!(f, "AnswerReceived, {peer_id}, InternalError")
121                }
122            },
123            Self::Finalized(peer_id, res) => write!(f, "Finalized, {peer_id}, {}", res_kind(res)),
124            Self::Closed(peer_id) => write!(f, "Closed, {peer_id}"),
125        }
126    }
127}
128
129impl fmt::Display for P2pChannelEvent {
130    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131        use crate::channels::{
132            best_tip::BestTipPropagationChannelMsg, rpc::RpcChannelMsg,
133            snark::SnarkPropagationChannelMsg,
134            snark_job_commitment::SnarkJobCommitmentPropagationChannelMsg,
135        };
136
137        write!(f, "Channel, ")?;
138        match self {
139            Self::Opened(peer_id, chan_id, res) => {
140                write!(f, "Opened, {peer_id}, {chan_id:?}, {}", res_kind(res))
141            }
142            Self::Closed(peer_id, chan_id) => {
143                write!(f, "Closed, {peer_id}, {chan_id:?}")
144            }
145            Self::Sent(peer_id, chan_id, msg_id, res) => {
146                write!(
147                    f,
148                    "Sent, {peer_id}, {chan_id:?}, {msg_id:?}, {}",
149                    res_kind(res)
150                )
151            }
152            Self::Received(peer_id, res) => {
153                write!(f, "Received, {peer_id}, ")?;
154                let msg = match res {
155                    Err(_) => return write!(f, "Err"),
156                    Ok(msg) => {
157                        write!(f, "{:?}, ", msg.channel_id())?;
158                        msg
159                    }
160                };
161
162                match msg {
163                    ChannelMsg::SignalingDiscovery(v) => match v {
164                        SignalingDiscoveryChannelMsg::GetNext => write!(f, "GetNext"),
165                        SignalingDiscoveryChannelMsg::Discover => write!(f, "Discover"),
166                        SignalingDiscoveryChannelMsg::Discovered { target_public_key } => {
167                            write!(f, "Discovered, {}", target_public_key.peer_id())
168                        }
169                        SignalingDiscoveryChannelMsg::DiscoveredReject => write!(f, "Discovered"),
170                        SignalingDiscoveryChannelMsg::DiscoveredAccept(_) => {
171                            write!(f, "DiscoveredAccept")
172                        }
173                        SignalingDiscoveryChannelMsg::Answer(_) => write!(f, "Answer"),
174                    },
175                    ChannelMsg::SignalingExchange(v) => match v {
176                        SignalingExchangeChannelMsg::GetNext => write!(f, "GetNext"),
177                        SignalingExchangeChannelMsg::OfferToYou {
178                            offerer_pub_key, ..
179                        } => {
180                            write!(f, "OfferToYou, {}", offerer_pub_key.peer_id())
181                        }
182                        SignalingExchangeChannelMsg::Answer(_) => write!(f, "Answer"),
183                    },
184                    ChannelMsg::BestTipPropagation(v) => {
185                        match v {
186                            BestTipPropagationChannelMsg::GetNext => write!(f, "GetNext"),
187                            // TODO(binier): avoid rehashing.
188                            BestTipPropagationChannelMsg::BestTip(block) => {
189                                match block.try_hash() {
190                                    Ok(block_hash) => write!(f, "{}", block_hash),
191                                    Err(_) => write!(f, "[Block_with_invalid_field]"),
192                                }
193                            }
194                        }
195                    }
196                    ChannelMsg::TransactionPropagation(v) => match v {
197                        TransactionPropagationChannelMsg::GetNext { limit } => {
198                            write!(f, "GetNext, limit: {limit}")
199                        }
200                        TransactionPropagationChannelMsg::WillSend { count } => {
201                            write!(f, "WillSend, count: {count}")
202                        }
203                        TransactionPropagationChannelMsg::Transaction(tx) => write!(
204                            f,
205                            "Transaction, fee: {}, fee_payer: {}, hash: {}",
206                            tx.fee, tx.fee_payer, tx.hash,
207                        ),
208                    },
209                    ChannelMsg::SnarkPropagation(v) => match v {
210                        SnarkPropagationChannelMsg::GetNext { limit } => {
211                            write!(f, "GetNext, limit: {limit}")
212                        }
213                        SnarkPropagationChannelMsg::WillSend { count } => {
214                            write!(f, "WillSend, count: {count}")
215                        }
216                        SnarkPropagationChannelMsg::Snark(snark) => write!(
217                            f,
218                            "Snark, fee: {}, snarker: {}, job_id: {}",
219                            snark.fee.as_u64(),
220                            snark.prover,
221                            snark.job_id
222                        ),
223                    },
224                    ChannelMsg::SnarkJobCommitmentPropagation(v) => match v {
225                        SnarkJobCommitmentPropagationChannelMsg::GetNext { limit } => {
226                            write!(f, "GetNext, limit: {limit}")
227                        }
228                        SnarkJobCommitmentPropagationChannelMsg::WillSend { count } => {
229                            write!(f, "WillSend, count: {count}")
230                        }
231                        SnarkJobCommitmentPropagationChannelMsg::Commitment(commitment) => write!(
232                            f,
233                            "Commitment, fee: {}, snarker: {}, job_id: {}",
234                            commitment.fee.as_u64(),
235                            commitment.snarker,
236                            commitment.job_id
237                        ),
238                    },
239                    ChannelMsg::Rpc(v) => match v {
240                        RpcChannelMsg::Request(id, req) => {
241                            write!(f, "Request, id: {id}, {req}")
242                        }
243                        RpcChannelMsg::Response(id, resp) => {
244                            write!(f, "Response, id: {id}, ")?;
245                            match resp {
246                                None => write!(f, "None"),
247                                Some(resp) => write!(f, "{:?}", resp.kind()),
248                            }
249                        }
250                    },
251                    ChannelMsg::StreamingRpc(v) => match v {
252                        StreamingRpcChannelMsg::Next(id) => {
253                            write!(f, "Next, id: {id}")
254                        }
255                        StreamingRpcChannelMsg::Request(id, req) => {
256                            write!(f, "Request, id: {id}, {req}")
257                        }
258                        StreamingRpcChannelMsg::Response(id, resp) => {
259                            write!(f, "Response, id: {id}, ")?;
260                            match resp {
261                                None => write!(f, "None"),
262                                Some(resp) => write!(f, "{:?}", resp.kind()),
263                            }
264                        }
265                    },
266                }
267            }
268        }
269    }
270}
271
272impl fmt::Display for MioEvent {
273    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
274        match self {
275            Self::InterfaceDetected(ip) => write!(f, "InterfaceDetected, {ip}"),
276            Self::InterfaceExpired(ip) => write!(f, "InterfaceExpired, {ip}"),
277            Self::ListenerReady { listener } => write!(f, "ListenerReady, {listener}"),
278            Self::ListenerError { listener, error } => {
279                write!(f, "ListenerError, {listener}, {error}")
280            }
281            Self::IncomingConnectionIsReady { listener } => {
282                write!(f, "IncomingConnectionIsReady, {listener}")
283            }
284            Self::IncomingConnectionDidAccept(Some(addr), res) => {
285                write!(f, "IncomingConnectionDidAccept, {addr}, {}", res_kind(res))
286            }
287            Self::IncomingConnectionDidAccept(None, res) => {
288                write!(f, "IncomingConnectionDidAccept, unknown, {}", res_kind(res))
289            }
290            Self::IncomingDataIsReady(addr) => {
291                write!(f, "IncomingDataIsReady, {addr}")
292            }
293            Self::IncomingDataDidReceive(addr, res) => {
294                write!(f, "IncomingDataDidReceive, {addr}. {}", res_kind(res))
295            }
296            Self::OutgoingConnectionDidConnect(addr, res) => {
297                write!(f, "OutgoingConnectionDidConnect, {addr}, {}", res_kind(res))
298            }
299            Self::OutgoingDataDidSend(addr, res) => {
300                write!(f, "OutgoingDataDidSend, {addr}, {}", res_kind(res))
301            }
302            Self::ConnectionDidClose(addr, res) => {
303                write!(f, "ConnectionDidClose, {addr}, {}", res_kind(res))
304            }
305            Self::ConnectionDidCloseOnDemand(addr) => {
306                write!(f, "ConnectionDidCloseOnDemand, {addr}")
307            }
308        }
309    }
310}