1use std::{
2 collections::BTreeSet,
3 fmt::Debug,
4 net::SocketAddr,
5 time::{Duration, Instant},
6};
7
8use node::{
9 event_source::Event,
10 p2p::{
11 connection::outgoing::P2pConnectionOutgoingInitOpts,
12 webrtc::{Host, HttpSignalingInfo, SignalingMethod},
13 ConnectionAddr, P2pConnectionEvent, P2pEvent, P2pNetworkConnectionMuxState,
14 P2pNetworkConnectionState, P2pNetworkYamuxState, P2pPeerState, P2pPeerStatus, P2pState,
15 PeerId,
16 },
17 State,
18};
19
20use node::p2p::connection::outgoing::P2pConnectionOutgoingInitLibp2pOpts;
21
22use node::p2p::{MioEvent, P2pNetworkAuthState, P2pNetworkNoiseState, P2pNetworkNoiseStateInner};
23
24use crate::{cluster::ClusterNodeId, node::RustNodeTestingConfig, scenario::ScenarioStep};
25
26use super::ClusterRunner;
27
28pub fn match_addr_with_port_and_peer_id(
29 port: u16,
30 peer_id: PeerId,
31) -> impl Fn(&P2pConnectionOutgoingInitOpts) -> bool {
32 move |conn_opt| match conn_opt {
33 P2pConnectionOutgoingInitOpts::WebRTC {
34 peer_id: pid,
35 signaling:
36 SignalingMethod::Http(HttpSignalingInfo {
37 host: Host::Ipv4(_ip4),
38 port: p,
39 }),
40 }
41 | P2pConnectionOutgoingInitOpts::WebRTC {
42 peer_id: pid,
43 signaling:
44 SignalingMethod::Https(HttpSignalingInfo {
45 host: Host::Ipv4(_ip4),
46 port: p,
47 }),
48 } => &peer_id == pid && port == *p,
49 P2pConnectionOutgoingInitOpts::LibP2P(libp2p_opts) => {
50 libp2p_opts.peer_id == peer_id && libp2p_opts.port == port
51 }
52 _ => false,
53 }
54}
55
56pub fn get_peers_iter(
57 data: &serde_json::Value,
58) -> Option<impl Iterator<Item = Option<(&str, i64, &str)>>> {
59 let iter = data
60 .as_object()?
61 .get("data")?
62 .get("getPeers")?
63 .as_array()?
64 .iter()
65 .map(|elt| {
66 let elt = elt.as_object()?;
67 let host = elt.get("host")?.as_str()?;
68 let port = elt.get("libp2pPort")?.as_i64()?;
69 let peer_id = elt.get("peerId")?.as_str()?;
70 Some((host, port, peer_id))
71 });
72 Some(iter)
73}
74
75pub const PEERS_QUERY: &str = r#"query {
76 getPeers {
77 host
78 libp2pPort
79 peerId
80 }
81}"#;
82
83pub fn connection_finalized_event(
84 pred: impl Fn(ClusterNodeId, &PeerId) -> bool,
85) -> impl Fn(ClusterNodeId, &Event, &State) -> bool {
86 move |node_id, event, _| {
87 matches!(
88 event,
89 Event::P2p(P2pEvent::Connection(P2pConnectionEvent::Finalized(peer, res))) if pred(node_id, peer) && res.is_ok()
90 )
91 }
92}
93
94fn peer_has_addr(peer: &P2pPeerState, addr: ConnectionAddr) -> bool {
95 match (&peer.dial_opts, addr) {
96 (
97 Some(P2pConnectionOutgoingInitOpts::LibP2P(P2pConnectionOutgoingInitLibp2pOpts {
98 host: Host::Ipv4(host),
99 port,
100 ..
101 })),
102 ConnectionAddr {
103 sock_addr: SocketAddr::V4(addr),
104 ..
105 },
106 ) => addr.ip() == host && addr.port() == *port,
107 _ => false,
108 }
109}
110
111pub fn as_event_mio_interface_detected(event: &Event) -> Option<&std::net::IpAddr> {
112 if let Event::P2p(P2pEvent::MioEvent(MioEvent::InterfaceDetected(ip_addr))) = event {
113 Some(ip_addr)
114 } else {
115 None
116 }
117}
118
119pub fn as_event_mio_listener_ready(event: &Event) -> Option<&std::net::SocketAddr> {
120 if let Event::P2p(P2pEvent::MioEvent(MioEvent::ListenerReady { listener })) = event {
121 Some(listener)
122 } else {
123 None
124 }
125}
126
127pub fn as_event_mio_error(event: &Event) -> Option<ConnectionAddr> {
128 match event {
129 Event::P2p(P2pEvent::MioEvent(MioEvent::ConnectionDidClose(addr, res))) if res.is_err() => {
130 Some(*addr)
131 }
132 _ => None,
133 }
134}
135
136pub fn as_event_mio_data_send_receive(event: &Event) -> Option<ConnectionAddr> {
137 match event {
138 Event::P2p(P2pEvent::MioEvent(
139 MioEvent::IncomingDataDidReceive(addr, _) | MioEvent::OutgoingDataDidSend(addr, _),
140 )) => Some(*addr),
141 _ => None,
142 }
143}
144
145pub fn as_event_mio_connection_event(event: &Event) -> Option<ConnectionAddr> {
146 match event {
147 Event::P2p(P2pEvent::MioEvent(
148 MioEvent::IncomingDataDidReceive(addr, _)
149 | MioEvent::OutgoingDataDidSend(addr, _)
150 | MioEvent::ConnectionDidClose(addr, _),
151 )) => Some(*addr),
152 _ => None,
153 }
154}
155
156pub fn as_event_mio_outgoing_connection(
157 event: &Event,
158) -> Option<(ConnectionAddr, &Result<(), String>)> {
159 match event {
160 Event::P2p(P2pEvent::MioEvent(MioEvent::OutgoingConnectionDidConnect(addr, result))) => {
161 Some((*addr, result))
162 }
163 _ => None,
164 }
165}
166
167pub struct Driver<'cluster> {
168 runner: ClusterRunner<'cluster>,
169 emulated_time: bool,
170}
171
172impl<'cluster> Driver<'cluster> {
173 pub fn new(runner: ClusterRunner<'cluster>) -> Self {
174 Driver {
175 runner,
176 emulated_time: false,
177 }
178 }
179
180 pub fn with_emulated_time(runner: ClusterRunner<'cluster>) -> Self {
181 Driver {
182 runner,
183 emulated_time: true,
184 }
185 }
186
187 async fn sleep(&self, duration: Duration) {
188 if !self.emulated_time {
189 tokio::time::sleep(duration).await;
190 }
191 }
192
193 pub async fn wait_for(
214 &mut self,
215 duration: Duration,
216 mut f: impl FnMut(ClusterNodeId, &Event, &State) -> bool,
217 ) -> anyhow::Result<Option<(ClusterNodeId, Event)>> {
218 let timeout = redux::Instant::now() + duration;
219 while redux::Instant::now() < timeout {
220 let mut steps = Vec::new();
221 let mut found = None;
222 for (node_id, state, events) in self.runner.pending_events(true) {
223 for (_, event) in events {
224 if f(node_id, event, state) {
225 found = Some((node_id, event.clone()));
226 break;
227 } else {
228 let event = event.to_string();
229 steps.push(ScenarioStep::Event { node_id, event });
230 }
231 }
232 }
233 for step in steps {
234 self.runner.exec_step(step).await?;
235 }
236 if found.is_some() {
237 return Ok(found);
238 }
239 self.idle(Duration::from_millis(100)).await?;
240 }
241 Ok(None)
242 }
243
244 pub async fn run_until(
245 &mut self,
246 duration: Duration,
247 mut f: impl FnMut(ClusterNodeId, &Event, &State) -> bool,
248 ) -> anyhow::Result<bool> {
249 let timeout = redux::Instant::now() + duration;
250 while redux::Instant::now() < timeout {
251 let mut steps = Vec::new();
252 let mut found = false;
253 'pending_events: for (node_id, state, events) in self.runner.pending_events(true) {
254 for (_, event) in events {
255 found = f(node_id, event, state);
256 steps.push(ScenarioStep::Event {
257 node_id,
258 event: event.to_string(),
259 });
260 if found {
261 break 'pending_events;
262 }
263 }
264 }
265 for step in steps {
266 self.runner.exec_step(step).await?;
267 }
268 if found {
269 return Ok(true);
270 }
271 self.idle(Duration::from_millis(100)).await?;
272 }
273 Ok(false)
274 }
275
276 pub async fn exec_steps_until(
279 &mut self,
280 duration: Duration,
281 mut f: impl FnMut(ClusterNodeId, &Event, &State) -> bool,
282 ) -> anyhow::Result<bool> {
283 let timeout = redux::Instant::now() + duration;
284 while redux::Instant::now() < timeout {
285 while let Some((node_id, event)) = self.next_event() {
286 let step = ScenarioStep::Event {
287 node_id,
288 event: event.to_string(),
289 };
290 self.runner.exec_step(step).await?;
291 let state = self.runner.node(node_id).unwrap().state();
292 if f(node_id, &event, state) {
293 return Ok(true);
294 }
295 }
296 self.idle(Duration::from_millis(100)).await?;
297 }
298 Ok(false)
299 }
300
301 pub fn next_event(&mut self) -> Option<(ClusterNodeId, Event)> {
302 self.runner
303 .pending_events(true)
304 .find_map(|(node_id, _, mut events)| {
305 events.next().map(|(_, event)| (node_id, event.clone()))
306 })
307 }
308
309 pub async fn trace_steps(&mut self) -> anyhow::Result<()> {
310 loop {
311 while let Some((node_id, event)) = self.next_event() {
312 println!("{node_id} event: {event}");
313 let step = ScenarioStep::Event {
314 node_id,
315 event: event.to_string(),
316 };
317 self.runner.exec_step(step).await?;
318 let _state = self.runner.node(node_id).unwrap().state();
319 }
321 self.idle(Duration::from_millis(100)).await?;
322 }
323 }
324
325 pub async fn run(&mut self, duration: Duration) -> anyhow::Result<()> {
326 let finish = redux::Instant::now() + duration;
327 while redux::Instant::now() < finish {
328 while let Some((node_id, event)) = self.next_event() {
329 let step = ScenarioStep::Event {
330 node_id,
331 event: event.to_string(),
332 };
333 self.runner.exec_step(step).await?;
334 let _state = self.runner.node(node_id).unwrap().state();
335 }
336 self.idle(Duration::from_millis(100)).await?;
337 }
338 Ok(())
339 }
340
341 pub async fn idle(&mut self, duration: Duration) -> anyhow::Result<()> {
342 self.sleep(duration).await;
343 self.runner
344 .exec_step(ScenarioStep::AdvanceTime {
345 by_nanos: duration.as_nanos().try_into()?,
346 })
347 .await?;
348 let nodes = self
349 .runner
350 .nodes_iter()
351 .map(|(node_id, _)| node_id)
352 .collect::<Vec<_>>();
353 for node_id in nodes {
354 self.runner
355 .exec_step(ScenarioStep::CheckTimeouts { node_id })
356 .await?;
357 }
358 Ok(())
359 }
360
361 pub async fn exec_step(&mut self, step: ScenarioStep) -> anyhow::Result<bool> {
362 self.runner.exec_step(step).await
363 }
364
365 pub async fn exec_even_step(
366 &mut self,
367 (node_id, event): (ClusterNodeId, Event),
368 ) -> anyhow::Result<Option<&State>> {
369 let event = event.to_string();
370 let result = if self
371 .runner
372 .exec_step(ScenarioStep::Event { node_id, event })
373 .await?
374 {
375 Some(
376 self.runner
377 .node(node_id)
378 .ok_or(anyhow::format_err!("no node {}", node_id.index()))?
379 .state(),
380 )
381 } else {
382 None
383 };
384 Ok(result)
385 }
386
387 pub fn add_rust_node(
388 &mut self,
389 testing_config: RustNodeTestingConfig,
390 ) -> (ClusterNodeId, PeerId) {
391 let node_id = self.runner.add_rust_node(testing_config);
392 let peer_id = self.runner.node(node_id).unwrap().peer_id();
393 (node_id, peer_id)
394 }
395
396 pub fn add_rust_node_with<Item, F>(
397 &mut self,
398 testing_config: RustNodeTestingConfig,
399 mut f: F,
400 ) -> (ClusterNodeId, Item)
401 where
402 F: FnMut(&State) -> Item,
403 {
404 let node_id = self.runner.add_rust_node(testing_config);
405 let state = self.runner.node(node_id).unwrap().state();
406 let item = f(state);
407 (node_id, item)
408 }
409
410 pub fn inner(&self) -> &ClusterRunner<'_> {
411 &self.runner
412 }
413
414 pub fn inner_mut(&mut self) -> &mut ClusterRunner<'cluster> {
415 &mut self.runner
416 }
417
418 #[allow(dead_code)]
419 pub fn into_inner(self) -> ClusterRunner<'cluster> {
420 self.runner
421 }
422}
423
424pub async fn wait_for_nodes_listening_on_localhost(
426 driver: &mut Driver<'_>,
427 duration: Duration,
428 nodes: impl IntoIterator<Item = ClusterNodeId>,
429) -> anyhow::Result<bool> {
430 let mut nodes = std::collections::BTreeSet::from_iter(nodes); let _ip4_localhost = libp2p::multiaddr::Protocol::Ip4("127.0.0.1".parse().unwrap());
434 let pred = |node_id, event: &_, _state: &_| {
435 if let Some(_addr) = as_event_mio_listener_ready(event) {
436 nodes.remove(&node_id);
437 nodes.is_empty()
438 } else {
439 false
440 }
441 };
442
443 driver.exec_steps_until(duration, pred).await
445}
446
447pub trait PeerPredicate {
448 fn matches(&mut self, node_id: ClusterNodeId, peer_id: &PeerId) -> bool;
449}
450
451impl<F> PeerPredicate for F
452where
453 F: FnMut(ClusterNodeId, &PeerId) -> bool,
454{
455 fn matches(&mut self, node_id: ClusterNodeId, peer_id: &PeerId) -> bool {
456 self(node_id, peer_id)
457 }
458}
459
460impl PeerPredicate for ClusterNodeId {
461 fn matches(&mut self, node_id: ClusterNodeId, _peer_id: &PeerId) -> bool {
462 *self == node_id
463 }
464}
465
466impl PeerPredicate for (ClusterNodeId, &PeerId) {
467 fn matches(&mut self, node_id: ClusterNodeId, peer_id: &PeerId) -> bool {
468 self.0 == node_id && self.1 == peer_id
469 }
470}
471
472impl PeerPredicate for (ClusterNodeId, &mut BTreeSet<PeerId>) {
473 fn matches(&mut self, node_id: ClusterNodeId, peer_id: &PeerId) -> bool {
474 self.0 == node_id && {
475 self.1.remove(peer_id);
476 self.1.is_empty()
477 }
478 }
479}
480
481fn is_network_connection_finalized(conn_state: &P2pNetworkConnectionState) -> Option<&PeerId> {
484 if let P2pNetworkConnectionState {
485 auth:
486 Some(P2pNetworkAuthState::Noise(P2pNetworkNoiseState {
487 inner: Some(P2pNetworkNoiseStateInner::Done { remote_peer_id, .. }),
488 ..
489 })),
490 mux:
491 Some(P2pNetworkConnectionMuxState::Yamux(P2pNetworkYamuxState {
492 terminated: None,
493 init: true,
494 ..
495 })),
496 ..
497 } = conn_state
498 {
499 Some(remote_peer_id)
500 } else {
501 None
502 }
503}
504
505pub async fn wait_for_connection_established<F: PeerPredicate>(
507 driver: &mut Driver<'_>,
508 duration: Duration,
509 mut f: F,
510) -> anyhow::Result<bool> {
511 let pred = |node_id, event: &_, state: &State| {
512 if let Some(addr) = as_event_mio_data_send_receive(event) {
513 let Some(p2p) = state.p2p.ready() else {
514 return false;
515 };
516 let Some(conn_state) = p2p.network.scheduler.connections.get(&addr) else {
517 return false;
518 };
519 if let Some(peer_id) = is_network_connection_finalized(conn_state) {
520 f.matches(node_id, peer_id)
521 } else {
522 false
523 }
524 } else {
525 false
526 }
527 };
528 driver.exec_steps_until(duration, pred).await
529}
530
531pub fn add_rust_nodes1<N, T>(driver: &mut Driver, num: N, config: RustNodeTestingConfig) -> T
543where
544 N: Into<u16>,
545 T: FromIterator<(ClusterNodeId, PeerId)>,
546{
547 (0..num.into())
548 .map(|_| driver.add_rust_node(config.clone()))
549 .collect()
550}
551
552pub fn add_rust_nodes<N, NodeIds, PeerIds>(
553 driver: &mut Driver,
554 num: N,
555 config: RustNodeTestingConfig,
556) -> (NodeIds, PeerIds)
557where
558 N: Into<u16>,
559 NodeIds: Default + Extend<ClusterNodeId>,
560 PeerIds: Default + Extend<PeerId>,
561{
562 (0..num.into())
563 .map(|_| driver.add_rust_node(config.clone()))
564 .unzip()
565}
566
567pub fn add_rust_nodes_with<N, NodeIds, Items, Item, F>(
569 driver: &mut Driver,
570 num: N,
571 config: RustNodeTestingConfig,
572 mut f: F,
573) -> (NodeIds, Items)
574where
575 N: Into<u16>,
576 NodeIds: Default + Extend<ClusterNodeId>,
577 Items: Default + Extend<Item>,
578 F: FnMut(&State) -> Item,
579{
580 (0..num.into())
581 .map(|_| driver.add_rust_node_with(config.clone(), &mut f))
582 .unzip()
583}
584
585pub async fn run_until_no_events(
589 driver: &mut Driver<'_>,
590 quiet_dur: Duration,
591 timeout: Duration,
592) -> anyhow::Result<bool> {
593 let timeout = Instant::now() + timeout;
594 while driver.run_until(quiet_dur, |_, _, _| true).await? {
595 if Instant::now() >= timeout {
596 return Ok(false);
597 }
598 }
599 Ok(true)
600}
601
602pub trait ConnectionPredicate {
603 fn matches(
604 &mut self,
605 node_id: ClusterNodeId,
606 peer_id: &PeerId,
607 peer_status: &P2pPeerStatus,
608 ) -> bool;
609}
610
611pub enum ConnectionPredicates {
612 PeerFinalized(PeerId),
614 PeerIsReady(PeerId),
615 PeerWithErrorStatus(PeerId),
616 PeerDisconnected(PeerId),
617}
618
619impl ConnectionPredicate for (ClusterNodeId, ConnectionPredicates) {
620 fn matches(
621 &mut self,
622 node_id: ClusterNodeId,
623 peer_id: &PeerId,
624 peer_status: &P2pPeerStatus,
625 ) -> bool {
626 node_id == self.0
627 && match &self.1 {
628 ConnectionPredicates::PeerFinalized(pid) => {
629 peer_id == pid
630 && match peer_status {
631 P2pPeerStatus::Connecting(c) => c.is_error(),
632 P2pPeerStatus::Disconnecting { .. } => true,
633 P2pPeerStatus::Disconnected { .. } => true,
634 P2pPeerStatus::Ready(_) => true,
635 }
636 }
637 ConnectionPredicates::PeerIsReady(pid) => {
638 peer_id == pid && peer_status.as_ready().is_some()
639 }
640 ConnectionPredicates::PeerWithErrorStatus(pid) => {
641 peer_id == pid && peer_status.is_error()
642 }
643 ConnectionPredicates::PeerDisconnected(pid) => {
644 peer_id == pid && matches!(peer_status, P2pPeerStatus::Disconnected { .. })
645 }
646 }
647 }
648}
649
650impl ConnectionPredicates {
651 pub fn peer_with_error_status(peer_id: PeerId) -> Self {
652 ConnectionPredicates::PeerWithErrorStatus(peer_id)
653 }
654
655 pub fn peer_disconnected(peer_id: PeerId) -> Self {
656 ConnectionPredicates::PeerDisconnected(peer_id)
657 }
658}
659
660impl<F> ConnectionPredicate for F
661where
662 F: FnMut(ClusterNodeId, &PeerId, &P2pPeerStatus) -> bool,
663{
664 fn matches(
665 &mut self,
666 node_id: ClusterNodeId,
667 peer_id: &PeerId,
668 peer_status: &P2pPeerStatus,
669 ) -> bool {
670 self(node_id, peer_id, peer_status)
671 }
672}
673
674pub async fn wait_for_connection_event<F>(
675 driver: &mut Driver<'_>,
676 duration: Duration,
677 mut f: F,
678) -> anyhow::Result<bool>
679where
680 F: ConnectionPredicate,
681{
682 let pred = |node_id, event: &_, state: &State| {
683 None.or_else(|| {
684 let addr = as_event_mio_connection_event(event)?;
685 let p2p = state.p2p.ready()?;
686 p2p.peers
687 .iter()
688 .find(|(_, peer)| peer_has_addr(peer, addr))
689 .or_else(|| {
690 p2p.network
691 .scheduler
692 .connections
693 .get(&addr)
694 .and_then(|conn_state| conn_state.peer_id())
695 .and_then(|peer_id| {
696 p2p.peers
697 .get(peer_id)
698 .map(|peer_state| (peer_id, peer_state))
699 })
700 })
701 })
702 .is_some_and(|(peer_id, peer)| f.matches(node_id, peer_id, &peer.status))
703 };
704 driver.exec_steps_until(duration, pred).await
705}
706
707pub async fn wait_for_connection_error<F>(
708 driver: &mut Driver<'_>,
709 duration: Duration,
710 mut f: F,
711) -> anyhow::Result<bool>
712where
713 F: ConnectionPredicate,
714{
715 let pred = |node_id, event: &_, state: &State| {
716 None.or_else(|| {
717 let addr = as_event_mio_error(event)?;
718 let p2p = state.p2p.ready()?;
719 p2p.peers.iter().find(|(_, peer)| peer_has_addr(peer, addr))
720 })
721 .is_some_and(|(peer_id, peer)| f.matches(node_id, peer_id, &peer.status))
722 };
723 driver.exec_steps_until(duration, pred).await
724}
725
726pub fn get_peer_state<'a>(
727 cluster: &'a ClusterRunner<'_>,
728 node_id: ClusterNodeId,
729 peer_id: &PeerId,
730) -> Option<&'a P2pPeerState> {
731 let store = cluster.node(node_id).expect("node does not exist");
732 store.state().p2p.get_peer(peer_id)
733}
734
735pub fn peer_exists(cluster: &ClusterRunner<'_>, node_id: ClusterNodeId, peer_id: &PeerId) -> bool {
736 get_peer_state(cluster, node_id, peer_id).is_some()
737}
738
739pub fn peer_is_ready(
740 cluster: &ClusterRunner<'_>,
741 node_id: ClusterNodeId,
742 peer_id: &PeerId,
743) -> bool {
744 matches!(
745 get_peer_state(cluster, node_id, peer_id),
746 Some(P2pPeerState {
747 status: P2pPeerStatus::Ready(_),
748 ..
749 })
750 )
751}
752
753pub fn get_p2p_state<'a>(cluster: &'a ClusterRunner<'a>, node_id: ClusterNodeId) -> &'a P2pState {
754 cluster
755 .node(node_id)
756 .expect("node should exist")
757 .state()
758 .p2p
759 .unwrap()
760}
761pub async fn connect_rust_nodes(
775 cluster: &mut ClusterRunner<'_>,
776 dialer: ClusterNodeId,
777 listener: ClusterNodeId,
778) {
779 cluster
780 .exec_step(crate::scenario::ScenarioStep::ConnectNodes {
781 dialer,
782 listener: crate::scenario::ListenerNode::Rust(listener),
783 })
784 .await
785 .expect("connect event should be dispatched");
786}
787
788pub async fn trace_steps(runner: &mut ClusterRunner<'_>) -> anyhow::Result<()> {
789 loop {
790 while let Some((node_id, event)) = next_event(runner) {
791 println!("{node_id} event: {event}");
792 let step = ScenarioStep::Event {
793 node_id,
794 event: event.to_string(),
795 };
796 runner.exec_step(step).await?;
797 }
798 idle(runner, Duration::from_millis(100)).await?;
799 }
800}
801
802pub async fn trace_steps_state<T: Debug, F: Fn(&State) -> T>(
803 runner: &mut ClusterRunner<'_>,
804 f: F,
805) -> anyhow::Result<()> {
806 loop {
807 while let Some((node_id, event)) = next_event(runner) {
808 println!("{node_id} event: {event}");
809 let step = ScenarioStep::Event {
810 node_id,
811 event: event.to_string(),
812 };
813 runner.exec_step(step).await?;
814 let state = runner.node(node_id).unwrap().state();
815 let t = f(state);
816 println!("{node_id} state: {t:#?}");
817 }
818 idle(runner, Duration::from_millis(100)).await?;
819 }
820}
821
822pub async fn idle(runner: &mut ClusterRunner<'_>, duration: Duration) -> anyhow::Result<()> {
823 tokio::time::sleep(duration).await;
824 runner
825 .exec_step(ScenarioStep::AdvanceTime {
826 by_nanos: duration.as_nanos().try_into()?,
827 })
828 .await?;
829 let nodes = runner
830 .nodes_iter()
831 .map(|(node_id, _)| node_id)
832 .collect::<Vec<_>>();
833 for node_id in nodes {
834 runner
835 .exec_step(ScenarioStep::CheckTimeouts { node_id })
836 .await?;
837 }
838 Ok(())
839}
840pub fn next_event(runner: &mut ClusterRunner<'_>) -> Option<(ClusterNodeId, Event)> {
841 runner
842 .pending_events(true)
843 .find_map(|(node_id, _, mut events)| {
844 events.next().map(|(_, event)| (node_id, event.clone()))
845 })
846}