p2p_testing/
utils.rs

1// use std::{ops::Deref, pin::Pin, time::Duration};
2
3// use futures::{TryStream, TryStreamExt};
4
5// use crate::{
6//     cluster::{Cluster, ClusterEvent, Listener, TimestampEvent, TimestampSource}, predicates::listener_is_ready, rust_node::RustNodeId, stream::ClusterStreamExt
7// };
8
9use std::time::Duration;
10
11use futures::{StreamExt, TryStreamExt};
12
13use crate::{
14    cluster::{Cluster, ClusterEvent, Error, NodeId},
15    event::RustNodeEvent,
16    predicates::{
17        all_listeners_are_ready, all_nodes_peers_are_ready, all_nodes_with_value,
18        listeners_are_ready, nodes_peers_are_ready,
19    },
20    rust_node::{RustNodeConfig, RustNodeId},
21    stream::ClusterStreamExt,
22};
23
24/// Creates an iterator that creates Rust nodes in the cluster using specified
25/// configuration.
26pub fn rust_nodes(
27    cluster: &mut Cluster,
28    config: RustNodeConfig,
29) -> impl '_ + Iterator<Item = Result<RustNodeId, Error>> {
30    std::iter::repeat_with(move || cluster.add_rust_node(config.clone()))
31}
32
33/// Maps an array of Rust node ids to an array of corresponding peer ids.
34pub fn peer_ids<T: Into<NodeId>, const N: usize>(
35    cluster: &Cluster,
36    rust_nodes: [T; N],
37) -> [p2p::PeerId; N] {
38    rust_nodes.map(|id| cluster.peer_id(id.into()))
39}
40
41/// Tries to create a Rust node for each configuration, returning an array of node ids, or an error.
42pub fn rust_nodes_from_configs<const N: usize>(
43    cluster: &mut Cluster,
44    configs: [RustNodeConfig; N],
45) -> Result<[RustNodeId; N], Error> {
46    let vec: Vec<_> = configs
47        .into_iter()
48        .map(|config| cluster.add_rust_node(config))
49        .collect::<Result<_, _>>()?;
50    Ok(vec.try_into().expect("size should match"))
51}
52
53/// Tries to create as many Rust nodes using the specified configuration to fill the resulting array.
54pub fn rust_nodes_from_config<const N: usize>(
55    cluster: &mut Cluster,
56    config: RustNodeConfig,
57) -> Result<[RustNodeId; N], Error> {
58    let vec: Vec<_> = rust_nodes(cluster, config)
59        .take(N)
60        .collect::<Result<_, _>>()?;
61    Ok(vec.try_into().expect("size should match"))
62}
63
64/// Tries to create as many Rust nodes using the default configuration to fill the resulting array.
65pub fn rust_nodes_from_default_config<const N: usize>(
66    cluster: &mut Cluster,
67) -> Result<[RustNodeId; N], Error> {
68    rust_nodes_from_config(cluster, Default::default())
69}
70
71/// Runs the cluster for the specified period of `time`.
72pub async fn run_cluster(cluster: &mut Cluster, time: Duration) {
73    let mut stream = cluster.stream().take_during(time);
74    while stream.next().await.is_some() {}
75}
76
77/// Tries to run the cluster for the specified period of `time`, returning `Err`
78/// if error event happens.
79pub async fn try_run_cluster(cluster: &mut Cluster, time: Duration) -> Result<(), ClusterEvent> {
80    let mut stream = cluster.try_stream().take_during(time);
81    while stream.try_next().await?.is_some() {}
82    Ok(())
83}
84
85/// Runs the cluster for the specified period of `time`, returning early true if
86/// the specified `nodes` are ready to accept connections.
87pub async fn wait_for_nodes_to_listen<I>(cluster: &mut Cluster, nodes: I, time: Duration) -> bool
88where
89    I: IntoIterator<Item = RustNodeId>,
90{
91    cluster
92        .stream()
93        .take_during(time)
94        .any(listeners_are_ready(nodes))
95        .await
96}
97
98/// Tries to run the cluster for the specified period of `time`, returning early
99/// true if the specified `nodes` are ready to accept connections.
100pub async fn try_wait_for_nodes_to_listen<I>(
101    cluster: &mut Cluster,
102    nodes: I,
103    time: Duration,
104) -> Result<bool, ClusterEvent>
105where
106    I: IntoIterator<Item = RustNodeId>,
107{
108    cluster
109        .try_stream()
110        .take_during(time)
111        .try_any(listeners_are_ready(nodes))
112        .await
113}
114
115/// Tries to run the cluster for the specified period of `time`, returning early
116/// true if the specified `nodes` are ready to accept connections.
117pub async fn wait_for_all_nodes_to_listen<T, I>(
118    cluster: &mut Cluster,
119    nodes: I,
120    time: Duration,
121) -> bool
122where
123    I: IntoIterator<Item = T>,
124    T: Into<NodeId>,
125{
126    cluster
127        .stream()
128        .take_during(time)
129        .any(all_listeners_are_ready(nodes))
130        .await
131}
132
133/// Tries to run the cluster for the specified period of `time`, returning early
134/// true if the specified `nodes` are ready to accept connections.
135pub async fn try_wait_for_all_nodes_to_listen<T, I>(
136    cluster: &mut Cluster,
137    nodes: I,
138    time: Duration,
139) -> Result<bool, ClusterEvent>
140where
141    I: IntoIterator<Item = T>,
142    T: Into<NodeId>,
143{
144    cluster
145        .try_stream()
146        .take_during(time)
147        .try_any(all_listeners_are_ready(nodes))
148        .await
149}
150
151/// Runs the cluster for specified period of `time`, returning `true` early if
152/// for each specified pair of Rust node id and peer id there was a succesfull
153/// connection.
154pub async fn wait_for_nodes_to_connect<I>(
155    cluster: &mut Cluster,
156    nodes_peers: I,
157    time: Duration,
158) -> bool
159where
160    I: IntoIterator<Item = (RustNodeId, p2p::PeerId)>,
161{
162    cluster
163        .stream()
164        .take_during(time)
165        .any(nodes_peers_are_ready(nodes_peers))
166        .await
167}
168
169/// Tries to run the cluster for specified period of `time`, returning `true`
170/// early if for each specified pair of Rust node id and peer id there was a
171/// succesfull connection.
172pub async fn try_wait_for_nodes_to_connect<I>(
173    cluster: &mut Cluster,
174    nodes_peers: I,
175    time: Duration,
176) -> Result<bool, ClusterEvent>
177where
178    I: IntoIterator<Item = (RustNodeId, p2p::PeerId)>,
179{
180    cluster
181        .try_stream()
182        .take_during(time)
183        .try_any(nodes_peers_are_ready(nodes_peers))
184        .await
185}
186
187/// Runs the cluster for specified period of `time`, returning `true` early if
188/// for each specified pair of node id and peer id there was a succesfull
189/// connection.
190pub async fn wait_for_all_nodes_to_connect<I>(
191    cluster: &mut Cluster,
192    nodes_peers: I,
193    time: Duration,
194) -> bool
195where
196    I: IntoIterator<Item = (NodeId, p2p::PeerId)>,
197{
198    cluster
199        .stream()
200        .take_during(time)
201        .any(all_nodes_peers_are_ready(nodes_peers))
202        .await
203}
204
205/// Tries to run the cluster for specified period of `time`, returning `true`
206/// early if for each specified pair of node id and peer id there was a
207/// succesfull connection.
208pub async fn try_wait_for_all_nodes_to_connect<I>(
209    cluster: &mut Cluster,
210    nodes_peers: I,
211    time: Duration,
212) -> Result<bool, ClusterEvent>
213where
214    I: IntoIterator<Item = (NodeId, p2p::PeerId)>,
215{
216    cluster
217        .try_stream()
218        .take_during(time)
219        .try_any(all_nodes_peers_are_ready(nodes_peers))
220        .await
221}
222
223/// Tries to wait for particular event to happen for all specified pairs of (node, peer_id).
224///
225/// Function `f` extract peer_id from a Rust event.
226///
227/// See [`super::predicates::all_nodes_with_value`].
228pub async fn try_wait_for_all_node_peer<I, F>(
229    cluster: &mut Cluster,
230    nodes_peers: I,
231    time: Duration,
232    f: F,
233) -> Result<bool, ClusterEvent>
234where
235    I: IntoIterator<Item = (RustNodeId, p2p::PeerId)>,
236    F: FnMut(RustNodeEvent) -> Option<p2p::PeerId>,
237{
238    cluster
239        .try_stream()
240        .take_during(time)
241        .try_any(all_nodes_with_value(nodes_peers, f))
242        .await
243}
244
245/// Tries to wait for particular event to happen at least once for all specified pairs of (node, v).
246///
247/// Function `f` extract value `v` from a Rust event.
248///
249/// See [`super::predicates::all_nodes_with_value`].
250pub async fn try_wait_for_all_nodes_with_value<T, I, F>(
251    cluster: &mut Cluster,
252    nodes_values: I,
253    time: Duration,
254    f: F,
255) -> Result<bool, ClusterEvent>
256where
257    T: Eq,
258    I: IntoIterator<Item = (RustNodeId, T)>,
259    F: FnMut(RustNodeEvent) -> Option<T>,
260{
261    cluster
262        .try_stream()
263        .take_during(time)
264        .try_any(all_nodes_with_value(nodes_values, f))
265        .await
266}
267
268#[cfg(test)]
269mod tests {
270    use std::time::Duration;
271
272    use crate::{
273        cluster::ClusterBuilder,
274        rust_node::RustNodeConfig,
275        utils::{peer_ids, wait_for_nodes_to_connect, wait_for_nodes_to_listen},
276    };
277
278    #[tokio::test]
279    async fn test() {
280        let mut cluster = ClusterBuilder::new()
281            .ports(11000..11200)
282            .start()
283            .await
284            .expect("should build cluster");
285
286        let _nodes = super::rust_nodes(&mut cluster, RustNodeConfig::default())
287            .take(3)
288            .collect::<Vec<_>>();
289
290        let [_node1, _node2] =
291            super::rust_nodes_from_config(&mut cluster, RustNodeConfig::default())
292                .expect("Error creating nodes");
293        let [node1, node2, node3] =
294            super::rust_nodes_from_default_config(&mut cluster).expect("Error creating nodes");
295
296        let ready =
297            wait_for_nodes_to_listen(&mut cluster, [node1, node2, node3], Duration::from_secs(10))
298                .await;
299        assert!(ready);
300
301        let [peer_id2, peer_id3] = peer_ids(&cluster, [node2, node3]);
302
303        cluster.connect(node1, node2).expect("no error");
304        cluster.connect(node1, node3).expect("no error");
305
306        let ready = wait_for_nodes_to_connect(
307            &mut cluster,
308            [(node1, peer_id2), (node1, peer_id3)],
309            Duration::from_secs(10),
310        )
311        .await;
312        assert!(ready);
313    }
314}
315
316// async fn try_listener_is_ready<T, L>(cluster: &mut T, node: RustNodeId, duration: Duration) -> Result<bool, T::Error>
317// where
318//     T: Unpin + Deref<Target = Cluster> + TryStream<Ok = ClusterEvent> + TimestampSource,
319//     T::Item: TimestampEvent,
320//     L: Into<Listener>,
321// {
322
323//     ClusterStreamExt::take_during(Pin::new(cluster), duration).try_any(listener_is_ready(node)).await
324// }
325
326// async fn try_connect_and_ready<T, L>(cluster: &mut T, node: RustNodeId, peer: L) -> bool
327// where
328//     T: Deref<Cluster> + TryStream<Ok = ClusterEvent>,
329//     L: Into<Listener>,
330// {
331//     cluster.
332// }