p2p_testing/
event.rs

1use std::net::{IpAddr, SocketAddr};
2
3use p2p::{
4    channels::{rpc::P2pChannelsRpcAction, P2pChannelsAction},
5    connection::{incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction},
6    disconnection::P2pDisconnectionAction,
7    identify::P2pIdentifyAction,
8    network::identify::P2pNetworkIdentify,
9    peer::P2pPeerAction,
10    MioEvent, P2pAction, P2pEvent, PeerId,
11};
12
13use crate::cluster::ClusterEvent;
14
15#[derive(Debug)]
16pub enum RustNodeEvent {
17    Interface {
18        addr: IpAddr,
19    },
20    ListenerReady {
21        addr: SocketAddr,
22    },
23    ListenerError {
24        addr: SocketAddr,
25        error: String,
26    },
27    PeerConnected {
28        peer_id: PeerId,
29        incoming: bool,
30    },
31    PeerConnectionError {
32        peer_id: Option<PeerId>,
33        incoming: bool,
34        error: String,
35    },
36    PeerDisconnected {
37        peer_id: PeerId,
38        reason: String,
39    },
40    RpcChannelReady {
41        peer_id: PeerId,
42    },
43    RpcChannelRequestReceived {
44        peer_id: PeerId,
45        id: p2p::channels::rpc::P2pRpcId,
46        request: p2p::channels::rpc::P2pRpcRequest,
47    },
48    RpcChannelResponseReceived {
49        peer_id: PeerId,
50        id: p2p::channels::rpc::P2pRpcId,
51        response: Option<p2p::channels::rpc::P2pRpcResponse>,
52    },
53    Identify {
54        peer_id: PeerId,
55        info: Box<P2pNetworkIdentify>,
56    },
57    KadBootstrapFinished,
58    KadUpdateFindNodeRequest {
59        peer_id: PeerId,
60        closest_peers: Vec<p2p::network::kad::P2pNetworkKadEntry>,
61    },
62    /// Other non-specific p2p event.
63    P2p {
64        event: P2pEvent,
65    },
66    /// Timeout event with no specific outcome.
67    Idle,
68}
69
70pub(super) trait RustNodeEventStore {
71    fn store_event(&mut self, event: RustNodeEvent);
72}
73
74pub fn event_mapper_effect(store: &mut super::redux::Store, action: P2pAction) {
75    let store_event = |store: &mut super::redux::Store, event| store.service().store_event(event);
76    match action {
77        P2pAction::Peer(P2pPeerAction::Ready { peer_id, incoming }) => {
78            store_event(store, RustNodeEvent::PeerConnected { peer_id, incoming })
79        }
80        P2pAction::Connection(action) => match action {
81            p2p::connection::P2pConnectionAction::Outgoing(
82                P2pConnectionOutgoingAction::Error { peer_id, error },
83            ) => store_event(
84                store,
85                RustNodeEvent::PeerConnectionError {
86                    peer_id: Some(peer_id),
87                    incoming: false,
88                    error: error.to_string(),
89                },
90            ),
91            p2p::connection::P2pConnectionAction::Incoming(
92                P2pConnectionIncomingAction::Error { peer_id, error },
93            ) => store_event(
94                store,
95                RustNodeEvent::PeerConnectionError {
96                    peer_id: Some(peer_id),
97                    incoming: true,
98                    error: error.to_string(),
99                },
100            ),
101            _ => {}
102        },
103
104        P2pAction::Disconnection(P2pDisconnectionAction::Init { peer_id, reason }) => store_event(
105            store,
106            RustNodeEvent::PeerDisconnected {
107                peer_id,
108                reason: reason.to_string(),
109            },
110        ),
111        P2pAction::Network(p2p::P2pNetworkAction::Kad(p2p::P2pNetworkKadAction::System(
112            p2p::P2pNetworkKademliaAction::BootstrapFinished,
113        ))) => {
114            store_event(store, RustNodeEvent::KadBootstrapFinished);
115        }
116        P2pAction::Network(p2p::P2pNetworkAction::Kad(p2p::P2pNetworkKadAction::System(
117            p2p::P2pNetworkKademliaAction::UpdateFindNodeRequest {
118                peer_id,
119                closest_peers,
120                ..
121            },
122        ))) => {
123            store_event(
124                store,
125                RustNodeEvent::KadUpdateFindNodeRequest {
126                    peer_id,
127                    closest_peers,
128                },
129            );
130        }
131        P2pAction::Channels(P2pChannelsAction::Rpc(action)) => match action {
132            P2pChannelsRpcAction::Ready { peer_id } => {
133                store_event(store, RustNodeEvent::RpcChannelReady { peer_id })
134            }
135            P2pChannelsRpcAction::RequestReceived {
136                peer_id,
137                id,
138                request,
139            } => {
140                // if matches!(store.service.peek_rust_node_event(), Some(RustNodeEvent::RpcChannelReady { peer_id: pid }) if pid == &peer_id )
141                // {
142                //     store.service.rust_node_event();
143                // }
144                store_event(
145                    store,
146                    RustNodeEvent::RpcChannelRequestReceived {
147                        peer_id,
148                        id,
149                        request: *request,
150                    },
151                )
152            }
153            P2pChannelsRpcAction::ResponseReceived {
154                peer_id,
155                id,
156                response,
157            } => store_event(
158                store,
159                RustNodeEvent::RpcChannelResponseReceived {
160                    peer_id,
161                    id,
162                    response: response.map(|v| *v),
163                },
164            ),
165            _ => {}
166        },
167        P2pAction::Identify(P2pIdentifyAction::UpdatePeerInformation { peer_id, info, .. }) => {
168            store_event(store, RustNodeEvent::Identify { peer_id, info })
169        }
170
171        P2pAction::Network(p2p::P2pNetworkAction::Scheduler(action)) => match action {
172            p2p::P2pNetworkSchedulerAction::InterfaceDetected { ip } => {
173                store_event(store, RustNodeEvent::Interface { addr: ip })
174            }
175            p2p::P2pNetworkSchedulerAction::ListenerReady { listener } => {
176                store_event(store, RustNodeEvent::ListenerReady { addr: listener })
177            }
178            p2p::P2pNetworkSchedulerAction::ListenerError { listener, error } => store_event(
179                store,
180                RustNodeEvent::ListenerError {
181                    addr: listener,
182                    error,
183                },
184            ),
185            p2p::P2pNetworkSchedulerAction::Error { addr, error } => {
186                if let Some(conn_state) = store.state().0.network.scheduler.connections.get(&addr) {
187                    if conn_state.peer_id().is_none() {
188                        let error = error.to_string();
189                        let incoming = conn_state.incoming;
190                        store_event(
191                            store,
192                            RustNodeEvent::PeerConnectionError {
193                                peer_id: None,
194                                incoming,
195                                error,
196                            },
197                        );
198                    }
199                }
200            }
201            _ => {}
202        },
203        _ => {}
204    }
205}
206
207pub fn is_error(event: &ClusterEvent) -> bool {
208    let ClusterEvent::Rust { event, .. } = event else {
209        return false;
210    };
211    match event {
212        RustNodeEvent::ListenerError { .. } => true,
213        RustNodeEvent::PeerConnectionError { .. } => true,
214        RustNodeEvent::PeerDisconnected { .. } => true,
215        RustNodeEvent::P2p { event } => match event {
216            P2pEvent::Connection(_event) => false, // TODO
217            P2pEvent::Channel(_event) => false,    // TODO
218            P2pEvent::MioEvent(event) => matches!(
219                event,
220                MioEvent::ListenerError { .. }
221                    | MioEvent::IncomingConnectionDidAccept(_, Err(_))
222                    | MioEvent::IncomingDataDidReceive(_, Err(_))
223                    | MioEvent::OutgoingConnectionDidConnect(_, Err(_))
224                    | MioEvent::OutgoingDataDidSend(_, Err(_))
225                    | MioEvent::ConnectionDidClose(_, Err(_))
226            ),
227        },
228        _ => false,
229    }
230}
231
232pub fn allow_disconnections(event: &ClusterEvent) -> bool {
233    let ClusterEvent::Rust { event, .. } = event else {
234        return false;
235    };
236    match event {
237        RustNodeEvent::ListenerError { .. } => true,
238        RustNodeEvent::PeerConnectionError { .. } => false,
239        RustNodeEvent::PeerDisconnected { .. } => true,
240        RustNodeEvent::P2p { event } => match event {
241            P2pEvent::Connection(_event) => false, // TODO
242            P2pEvent::Channel(_event) => false,    // TODO
243            P2pEvent::MioEvent(event) => matches!(
244                event,
245                MioEvent::ListenerError { .. } | MioEvent::IncomingConnectionDidAccept(_, Err(_)) // | MioEvent::IncomingDataDidReceive(_, Err(_))
246                                                                                                  // | MioEvent::OutgoingConnectionDidConnect(_, Err(_))
247                                                                                                  // | MioEvent::OutgoingDataDidSend(_, Err(_))
248                                                                                                  // | MioEvent::ConnectionDidClose(_, Err(_))
249            ),
250        },
251        _ => false,
252    }
253}