p2p_testing/
predicates.rs

1use std::{
2    collections::{BTreeSet, HashSet},
3    future::{ready, Ready},
4};
5
6use libp2p::swarm::SwarmEvent;
7use p2p::PeerId;
8
9use crate::{
10    cluster::{ClusterEvent, NodeId},
11    event::RustNodeEvent,
12    libp2p_node::Libp2pEvent,
13    rust_node::RustNodeId,
14};
15
16/// Wraps plain function over a cluster event into an async one.
17pub fn async_fn<T, F>(mut f: F) -> impl FnMut(ClusterEvent) -> Ready<T>
18where
19    F: FnMut(ClusterEvent) -> T,
20{
21    move |event| ready(f(event))
22}
23
24/// Predicate returning true for a cluster event corresponging to the specified node started listening.
25pub fn listener_is_ready(id: RustNodeId) -> impl FnMut(ClusterEvent) -> Ready<bool> {
26    move |event| {
27        ready(
28            matches!(event.rust(), Some((event_id, RustNodeEvent::ListenerReady { .. })) if *event_id == id),
29        )
30    }
31}
32
33/// Predicate if kademlia has finished bootstrap
34pub fn kad_finished_bootstrap(id: RustNodeId) -> impl FnMut(ClusterEvent) -> Ready<bool> {
35    move |event| {
36        ready(matches!(
37            event.rust(),
38            Some((event_id, RustNodeEvent::KadBootstrapFinished)) if *event_id == id
39        ))
40    }
41}
42
43/// Predicate returning true for a cluster event corresponging to the specified node started listening.
44pub fn listeners_are_ready<I>(ids: I) -> impl FnMut(ClusterEvent) -> Ready<bool>
45where
46    I: IntoIterator<Item = RustNodeId>,
47{
48    let mut ids: HashSet<RustNodeId> = HashSet::from_iter(ids);
49    move |event| {
50        ready(
51            if let Some((event_id, RustNodeEvent::ListenerReady { .. })) = event.rust() {
52                ids.remove(event_id) && ids.is_empty()
53            } else {
54                false
55            },
56        )
57    }
58}
59
60/// Predicate returning true for a cluster event corresponging to the specified node started listening.
61pub fn all_listeners_are_ready<T, I>(ids: I) -> impl FnMut(ClusterEvent) -> Ready<bool>
62where
63    I: IntoIterator<Item = T>,
64    T: Into<NodeId>,
65{
66    let mut ids: HashSet<NodeId> = HashSet::from_iter(ids.into_iter().map(Into::into));
67    move |event| {
68        ready(
69            match event {
70                ClusterEvent::Rust {
71                    id,
72                    event: RustNodeEvent::ListenerReady { .. },
73                } => ids.remove(&NodeId::Rust(id)),
74                ClusterEvent::Libp2p {
75                    id,
76                    event: SwarmEvent::NewListenAddr { address, .. },
77                } => {
78                    println!("{id:?}: new listen addr: {address}");
79                    ids.remove(&NodeId::Libp2p(id))
80                }
81                _ => false,
82            } && ids.is_empty(),
83        )
84    }
85}
86
87pub fn nodes_peers_are_ready<I>(nodes_peers: I) -> impl FnMut(ClusterEvent) -> Ready<bool>
88where
89    I: IntoIterator<Item = (RustNodeId, PeerId)>,
90{
91    let mut nodes_peers = BTreeSet::from_iter(nodes_peers);
92    move |event| {
93        ready(
94            if let ClusterEvent::Rust {
95                id,
96                event: RustNodeEvent::PeerConnected { peer_id, .. },
97            } = event
98            {
99                nodes_peers.remove(&(id, peer_id)) && nodes_peers.is_empty()
100            } else {
101                false
102            },
103        )
104    }
105}
106
107/// Returns a predicate over cluster events that returns `true` once it is
108/// called at least once for events that represent established connection
109/// between a node and a peer from the `nodes_peers`.
110pub fn all_nodes_peers_are_ready<I>(nodes_peers: I) -> impl FnMut(ClusterEvent) -> Ready<bool>
111where
112    I: IntoIterator<Item = (NodeId, PeerId)>,
113{
114    let mut nodes_peers = BTreeSet::from_iter(nodes_peers);
115    move |event| {
116        ready(match event {
117            ClusterEvent::Rust {
118                id,
119                event: RustNodeEvent::PeerConnected { peer_id, .. },
120            } => nodes_peers.remove(&(id.into(), peer_id)) && nodes_peers.is_empty(),
121            ClusterEvent::Libp2p {
122                id,
123                event: Libp2pEvent::ConnectionEstablished { peer_id, .. },
124            } => {
125                nodes_peers.remove(&(id.into(), peer_id.try_into().expect("Conversion failed")))
126                    && nodes_peers.is_empty()
127            }
128            _ => false,
129        })
130    }
131}
132
133/// Predicate returning true when encountered an event signalling that the peer `peer_id` is connected to the node `id`.
134pub fn peer_is_connected(
135    id: RustNodeId,
136    peer_id: PeerId,
137) -> impl FnMut(ClusterEvent) -> Ready<bool> {
138    move |event| {
139        ready(
140            matches!(event.rust(), Some((event_id, RustNodeEvent::PeerConnected { peer_id: pid, .. })) if *event_id == id && pid == &peer_id),
141        )
142    }
143}
144
145/// Function that wraps a cluster event into a `Result` using default sense of erroneous event.
146pub fn default_errors(event: &ClusterEvent) -> bool {
147    match &event {
148        ClusterEvent::Rust { event: e, .. } => match e {
149            RustNodeEvent::ListenerError { .. } => true,
150            RustNodeEvent::PeerConnectionError { .. } => true,
151            RustNodeEvent::PeerDisconnected { .. } => true,
152            RustNodeEvent::P2p { event: e } => match e {
153                p2p::P2pEvent::Connection(_) => false,
154                p2p::P2pEvent::Channel(e) => matches!(
155                    e,
156                    p2p::P2pChannelEvent::Opened(_, _, Err(_))
157                        | p2p::P2pChannelEvent::Sent(_, _, _, Err(_))
158                        | p2p::P2pChannelEvent::Received(_, Err(_))
159                ),
160                p2p::P2pEvent::MioEvent(e) => matches!(
161                    e,
162                    p2p::MioEvent::ListenerError { .. }
163                        | p2p::MioEvent::IncomingConnectionDidAccept(_, Err(_))
164                        | p2p::MioEvent::IncomingDataDidReceive(_, Err(_))
165                        | p2p::MioEvent::OutgoingConnectionDidConnect(_, Err(_))
166                        | p2p::MioEvent::OutgoingDataDidSend(_, Err(_))
167                        | p2p::MioEvent::ConnectionDidClose(_, Err(_))
168                ),
169            },
170            _ => false,
171        },
172        _ => false,
173    }
174}
175
176/// For an event for a rust node _id_, that `f` maps to `Some(v)`,
177/// removes the pair `(id, v)` from the `nodes_items`, returning `true` if it
178/// runs out.
179pub fn all_nodes_with_value<T, I, F>(
180    nodes_items: I,
181    mut f: F,
182) -> impl FnMut(ClusterEvent) -> Ready<bool>
183where
184    T: PartialEq + Eq,
185    I: IntoIterator<Item = (RustNodeId, T)>,
186    F: FnMut(RustNodeEvent) -> Option<T>,
187{
188    let mut nodes_items = Vec::from_iter(nodes_items);
189    move |event| {
190        ready(if let ClusterEvent::Rust { id, event } = event {
191            f(event)
192                .and_then(|v| {
193                    nodes_items
194                        .iter()
195                        .position(|(_id, _v)| _id == &id && _v == &v)
196                })
197                .is_some_and(|i| {
198                    nodes_items.swap_remove(i);
199                    nodes_items.is_empty()
200                })
201        } else {
202            false
203        })
204    }
205}