1use std::time::Duration;
10
11use futures::{StreamExt, TryStreamExt};
12
13use crate::{
14 cluster::{Cluster, ClusterEvent, Error, NodeId},
15 event::RustNodeEvent,
16 predicates::{
17 all_listeners_are_ready, all_nodes_peers_are_ready, all_nodes_with_value,
18 listeners_are_ready, nodes_peers_are_ready,
19 },
20 rust_node::{RustNodeConfig, RustNodeId},
21 stream::ClusterStreamExt,
22};
23
24pub fn rust_nodes(
27 cluster: &mut Cluster,
28 config: RustNodeConfig,
29) -> impl '_ + Iterator<Item = Result<RustNodeId, Error>> {
30 std::iter::repeat_with(move || cluster.add_rust_node(config.clone()))
31}
32
33pub fn peer_ids<T: Into<NodeId>, const N: usize>(
35 cluster: &Cluster,
36 rust_nodes: [T; N],
37) -> [p2p::PeerId; N] {
38 rust_nodes.map(|id| cluster.peer_id(id.into()))
39}
40
41pub fn rust_nodes_from_configs<const N: usize>(
43 cluster: &mut Cluster,
44 configs: [RustNodeConfig; N],
45) -> Result<[RustNodeId; N], Error> {
46 let vec: Vec<_> = configs
47 .into_iter()
48 .map(|config| cluster.add_rust_node(config))
49 .collect::<Result<_, _>>()?;
50 Ok(vec.try_into().expect("size should match"))
51}
52
53pub fn rust_nodes_from_config<const N: usize>(
55 cluster: &mut Cluster,
56 config: RustNodeConfig,
57) -> Result<[RustNodeId; N], Error> {
58 let vec: Vec<_> = rust_nodes(cluster, config)
59 .take(N)
60 .collect::<Result<_, _>>()?;
61 Ok(vec.try_into().expect("size should match"))
62}
63
64pub fn rust_nodes_from_default_config<const N: usize>(
66 cluster: &mut Cluster,
67) -> Result<[RustNodeId; N], Error> {
68 rust_nodes_from_config(cluster, Default::default())
69}
70
71pub async fn run_cluster(cluster: &mut Cluster, time: Duration) {
73 let mut stream = cluster.stream().take_during(time);
74 while stream.next().await.is_some() {}
75}
76
77pub async fn try_run_cluster(cluster: &mut Cluster, time: Duration) -> Result<(), ClusterEvent> {
80 let mut stream = cluster.try_stream().take_during(time);
81 while stream.try_next().await?.is_some() {}
82 Ok(())
83}
84
85pub async fn wait_for_nodes_to_listen<I>(cluster: &mut Cluster, nodes: I, time: Duration) -> bool
88where
89 I: IntoIterator<Item = RustNodeId>,
90{
91 cluster
92 .stream()
93 .take_during(time)
94 .any(listeners_are_ready(nodes))
95 .await
96}
97
98pub async fn try_wait_for_nodes_to_listen<I>(
101 cluster: &mut Cluster,
102 nodes: I,
103 time: Duration,
104) -> Result<bool, ClusterEvent>
105where
106 I: IntoIterator<Item = RustNodeId>,
107{
108 cluster
109 .try_stream()
110 .take_during(time)
111 .try_any(listeners_are_ready(nodes))
112 .await
113}
114
115pub async fn wait_for_all_nodes_to_listen<T, I>(
118 cluster: &mut Cluster,
119 nodes: I,
120 time: Duration,
121) -> bool
122where
123 I: IntoIterator<Item = T>,
124 T: Into<NodeId>,
125{
126 cluster
127 .stream()
128 .take_during(time)
129 .any(all_listeners_are_ready(nodes))
130 .await
131}
132
133pub async fn try_wait_for_all_nodes_to_listen<T, I>(
136 cluster: &mut Cluster,
137 nodes: I,
138 time: Duration,
139) -> Result<bool, ClusterEvent>
140where
141 I: IntoIterator<Item = T>,
142 T: Into<NodeId>,
143{
144 cluster
145 .try_stream()
146 .take_during(time)
147 .try_any(all_listeners_are_ready(nodes))
148 .await
149}
150
151pub async fn wait_for_nodes_to_connect<I>(
155 cluster: &mut Cluster,
156 nodes_peers: I,
157 time: Duration,
158) -> bool
159where
160 I: IntoIterator<Item = (RustNodeId, p2p::PeerId)>,
161{
162 cluster
163 .stream()
164 .take_during(time)
165 .any(nodes_peers_are_ready(nodes_peers))
166 .await
167}
168
169pub async fn try_wait_for_nodes_to_connect<I>(
173 cluster: &mut Cluster,
174 nodes_peers: I,
175 time: Duration,
176) -> Result<bool, ClusterEvent>
177where
178 I: IntoIterator<Item = (RustNodeId, p2p::PeerId)>,
179{
180 cluster
181 .try_stream()
182 .take_during(time)
183 .try_any(nodes_peers_are_ready(nodes_peers))
184 .await
185}
186
187pub async fn wait_for_all_nodes_to_connect<I>(
191 cluster: &mut Cluster,
192 nodes_peers: I,
193 time: Duration,
194) -> bool
195where
196 I: IntoIterator<Item = (NodeId, p2p::PeerId)>,
197{
198 cluster
199 .stream()
200 .take_during(time)
201 .any(all_nodes_peers_are_ready(nodes_peers))
202 .await
203}
204
205pub async fn try_wait_for_all_nodes_to_connect<I>(
209 cluster: &mut Cluster,
210 nodes_peers: I,
211 time: Duration,
212) -> Result<bool, ClusterEvent>
213where
214 I: IntoIterator<Item = (NodeId, p2p::PeerId)>,
215{
216 cluster
217 .try_stream()
218 .take_during(time)
219 .try_any(all_nodes_peers_are_ready(nodes_peers))
220 .await
221}
222
223pub async fn try_wait_for_all_node_peer<I, F>(
229 cluster: &mut Cluster,
230 nodes_peers: I,
231 time: Duration,
232 f: F,
233) -> Result<bool, ClusterEvent>
234where
235 I: IntoIterator<Item = (RustNodeId, p2p::PeerId)>,
236 F: FnMut(RustNodeEvent) -> Option<p2p::PeerId>,
237{
238 cluster
239 .try_stream()
240 .take_during(time)
241 .try_any(all_nodes_with_value(nodes_peers, f))
242 .await
243}
244
245pub async fn try_wait_for_all_nodes_with_value<T, I, F>(
251 cluster: &mut Cluster,
252 nodes_values: I,
253 time: Duration,
254 f: F,
255) -> Result<bool, ClusterEvent>
256where
257 T: Eq,
258 I: IntoIterator<Item = (RustNodeId, T)>,
259 F: FnMut(RustNodeEvent) -> Option<T>,
260{
261 cluster
262 .try_stream()
263 .take_during(time)
264 .try_any(all_nodes_with_value(nodes_values, f))
265 .await
266}
267
268#[cfg(test)]
269mod tests {
270 use std::time::Duration;
271
272 use crate::{
273 cluster::ClusterBuilder,
274 rust_node::RustNodeConfig,
275 utils::{peer_ids, wait_for_nodes_to_connect, wait_for_nodes_to_listen},
276 };
277
278 #[tokio::test]
279 async fn test() {
280 let mut cluster = ClusterBuilder::new()
281 .ports(11000..11200)
282 .start()
283 .await
284 .expect("should build cluster");
285
286 let _nodes = super::rust_nodes(&mut cluster, RustNodeConfig::default())
287 .take(3)
288 .collect::<Vec<_>>();
289
290 let [_node1, _node2] =
291 super::rust_nodes_from_config(&mut cluster, RustNodeConfig::default())
292 .expect("Error creating nodes");
293 let [node1, node2, node3] =
294 super::rust_nodes_from_default_config(&mut cluster).expect("Error creating nodes");
295
296 let ready =
297 wait_for_nodes_to_listen(&mut cluster, [node1, node2, node3], Duration::from_secs(10))
298 .await;
299 assert!(ready);
300
301 let [peer_id2, peer_id3] = peer_ids(&cluster, [node2, node3]);
302
303 cluster.connect(node1, node2).expect("no error");
304 cluster.connect(node1, node3).expect("no error");
305
306 let ready = wait_for_nodes_to_connect(
307 &mut cluster,
308 [(node1, peer_id2), (node1, peer_id3)],
309 Duration::from_secs(10),
310 )
311 .await;
312 assert!(ready);
313 }
314}
315
316