1use std::{
2 collections::BTreeSet,
3 fmt::Debug,
4 net::SocketAddr,
5 time::{Duration, Instant},
6};
7
8use mina_node::{
9 event_source::Event,
10 p2p::{
11 connection::outgoing::P2pConnectionOutgoingInitOpts,
12 webrtc::{Host, HttpSignalingInfo, SignalingMethod},
13 ConnectionAddr, P2pConnectionEvent, P2pEvent, P2pNetworkConnectionMuxState,
14 P2pNetworkConnectionState, P2pNetworkYamuxState, P2pPeerState, P2pPeerStatus, P2pState,
15 PeerId,
16 },
17 State,
18};
19
20use mina_node::p2p::connection::outgoing::P2pConnectionOutgoingInitLibp2pOpts;
21
22use mina_node::p2p::{
23 MioEvent, P2pNetworkAuthState, P2pNetworkNoiseState, P2pNetworkNoiseStateInner,
24};
25
26use crate::{cluster::ClusterNodeId, node::RustNodeTestingConfig, scenario::ScenarioStep};
27
28use super::ClusterRunner;
29
30pub fn match_addr_with_port_and_peer_id(
31 port: u16,
32 peer_id: PeerId,
33) -> impl Fn(&P2pConnectionOutgoingInitOpts) -> bool {
34 move |conn_opt| match conn_opt {
35 P2pConnectionOutgoingInitOpts::WebRTC {
36 peer_id: pid,
37 signaling:
38 SignalingMethod::Http(HttpSignalingInfo {
39 host: Host::Ipv4(_ip4),
40 port: p,
41 }),
42 }
43 | P2pConnectionOutgoingInitOpts::WebRTC {
44 peer_id: pid,
45 signaling:
46 SignalingMethod::Https(HttpSignalingInfo {
47 host: Host::Ipv4(_ip4),
48 port: p,
49 }),
50 } => &peer_id == pid && port == *p,
51 P2pConnectionOutgoingInitOpts::LibP2P(libp2p_opts) => {
52 libp2p_opts.peer_id == peer_id && libp2p_opts.port == port
53 }
54 _ => false,
55 }
56}
57
58pub fn get_peers_iter(
59 data: &serde_json::Value,
60) -> Option<impl Iterator<Item = Option<(&str, i64, &str)>>> {
61 let iter = data
62 .as_object()?
63 .get("data")?
64 .get("getPeers")?
65 .as_array()?
66 .iter()
67 .map(|elt| {
68 let elt = elt.as_object()?;
69 let host = elt.get("host")?.as_str()?;
70 let port = elt.get("libp2pPort")?.as_i64()?;
71 let peer_id = elt.get("peerId")?.as_str()?;
72 Some((host, port, peer_id))
73 });
74 Some(iter)
75}
76
77pub const PEERS_QUERY: &str = r#"query {
78 getPeers {
79 host
80 libp2pPort
81 peerId
82 }
83}"#;
84
85pub fn connection_finalized_event(
86 pred: impl Fn(ClusterNodeId, &PeerId) -> bool,
87) -> impl Fn(ClusterNodeId, &Event, &State) -> bool {
88 move |node_id, event, _| {
89 matches!(
90 event,
91 Event::P2p(P2pEvent::Connection(P2pConnectionEvent::Finalized(peer, res))) if pred(node_id, peer) && res.is_ok()
92 )
93 }
94}
95
96fn peer_has_addr(peer: &P2pPeerState, addr: ConnectionAddr) -> bool {
97 match (&peer.dial_opts, addr) {
98 (
99 Some(P2pConnectionOutgoingInitOpts::LibP2P(P2pConnectionOutgoingInitLibp2pOpts {
100 host: Host::Ipv4(host),
101 port,
102 ..
103 })),
104 ConnectionAddr {
105 sock_addr: SocketAddr::V4(addr),
106 ..
107 },
108 ) => addr.ip() == host && addr.port() == *port,
109 _ => false,
110 }
111}
112
113pub fn as_event_mio_interface_detected(event: &Event) -> Option<&std::net::IpAddr> {
114 if let Event::P2p(P2pEvent::MioEvent(MioEvent::InterfaceDetected(ip_addr))) = event {
115 Some(ip_addr)
116 } else {
117 None
118 }
119}
120
121pub fn as_event_mio_listener_ready(event: &Event) -> Option<&std::net::SocketAddr> {
122 if let Event::P2p(P2pEvent::MioEvent(MioEvent::ListenerReady { listener })) = event {
123 Some(listener)
124 } else {
125 None
126 }
127}
128
129pub fn as_event_mio_error(event: &Event) -> Option<ConnectionAddr> {
130 match event {
131 Event::P2p(P2pEvent::MioEvent(MioEvent::ConnectionDidClose(addr, res))) if res.is_err() => {
132 Some(*addr)
133 }
134 _ => None,
135 }
136}
137
138pub fn as_event_mio_data_send_receive(event: &Event) -> Option<ConnectionAddr> {
139 match event {
140 Event::P2p(P2pEvent::MioEvent(
141 MioEvent::IncomingDataDidReceive(addr, _) | MioEvent::OutgoingDataDidSend(addr, _),
142 )) => Some(*addr),
143 _ => None,
144 }
145}
146
147pub fn as_event_mio_connection_event(event: &Event) -> Option<ConnectionAddr> {
148 match event {
149 Event::P2p(P2pEvent::MioEvent(
150 MioEvent::IncomingDataDidReceive(addr, _)
151 | MioEvent::OutgoingDataDidSend(addr, _)
152 | MioEvent::ConnectionDidClose(addr, _),
153 )) => Some(*addr),
154 _ => None,
155 }
156}
157
158pub fn as_event_mio_outgoing_connection(
159 event: &Event,
160) -> Option<(ConnectionAddr, &Result<(), String>)> {
161 match event {
162 Event::P2p(P2pEvent::MioEvent(MioEvent::OutgoingConnectionDidConnect(addr, result))) => {
163 Some((*addr, result))
164 }
165 _ => None,
166 }
167}
168
169pub struct Driver<'cluster> {
170 runner: ClusterRunner<'cluster>,
171 emulated_time: bool,
172}
173
174impl<'cluster> Driver<'cluster> {
175 pub fn new(runner: ClusterRunner<'cluster>) -> Self {
176 Driver {
177 runner,
178 emulated_time: false,
179 }
180 }
181
182 pub fn with_emulated_time(runner: ClusterRunner<'cluster>) -> Self {
183 Driver {
184 runner,
185 emulated_time: true,
186 }
187 }
188
189 async fn sleep(&self, duration: Duration) {
190 if !self.emulated_time {
191 tokio::time::sleep(duration).await;
192 }
193 }
194
195 pub async fn wait_for(
216 &mut self,
217 duration: Duration,
218 mut f: impl FnMut(ClusterNodeId, &Event, &State) -> bool,
219 ) -> anyhow::Result<Option<(ClusterNodeId, Event)>> {
220 let timeout = redux::Instant::now() + duration;
221 while redux::Instant::now() < timeout {
222 let mut steps = Vec::new();
223 let mut found = None;
224 for (node_id, state, events) in self.runner.pending_events(true) {
225 for (_, event) in events {
226 if f(node_id, event, state) {
227 found = Some((node_id, event.clone()));
228 break;
229 } else {
230 let event = event.to_string();
231 steps.push(ScenarioStep::Event { node_id, event });
232 }
233 }
234 }
235 for step in steps {
236 self.runner.exec_step(step).await?;
237 }
238 if found.is_some() {
239 return Ok(found);
240 }
241 self.idle(Duration::from_millis(100)).await?;
242 }
243 Ok(None)
244 }
245
246 pub async fn run_until(
247 &mut self,
248 duration: Duration,
249 mut f: impl FnMut(ClusterNodeId, &Event, &State) -> bool,
250 ) -> anyhow::Result<bool> {
251 let timeout = redux::Instant::now() + duration;
252 while redux::Instant::now() < timeout {
253 let mut steps = Vec::new();
254 let mut found = false;
255 'pending_events: for (node_id, state, events) in self.runner.pending_events(true) {
256 for (_, event) in events {
257 found = f(node_id, event, state);
258 steps.push(ScenarioStep::Event {
259 node_id,
260 event: event.to_string(),
261 });
262 if found {
263 break 'pending_events;
264 }
265 }
266 }
267 for step in steps {
268 self.runner.exec_step(step).await?;
269 }
270 if found {
271 return Ok(true);
272 }
273 self.idle(Duration::from_millis(100)).await?;
274 }
275 Ok(false)
276 }
277
278 pub async fn exec_steps_until(
281 &mut self,
282 duration: Duration,
283 mut f: impl FnMut(ClusterNodeId, &Event, &State) -> bool,
284 ) -> anyhow::Result<bool> {
285 let timeout = redux::Instant::now() + duration;
286 while redux::Instant::now() < timeout {
287 while let Some((node_id, event)) = self.next_event() {
288 let step = ScenarioStep::Event {
289 node_id,
290 event: event.to_string(),
291 };
292 self.runner.exec_step(step).await?;
293 let state = self.runner.node(node_id).unwrap().state();
294 if f(node_id, &event, state) {
295 return Ok(true);
296 }
297 }
298 self.idle(Duration::from_millis(100)).await?;
299 }
300 Ok(false)
301 }
302
303 pub fn next_event(&mut self) -> Option<(ClusterNodeId, Event)> {
304 self.runner
305 .pending_events(true)
306 .find_map(|(node_id, _, mut events)| {
307 events.next().map(|(_, event)| (node_id, event.clone()))
308 })
309 }
310
311 pub async fn trace_steps(&mut self) -> anyhow::Result<()> {
312 loop {
313 while let Some((node_id, event)) = self.next_event() {
314 println!("{node_id} event: {event}");
315 let step = ScenarioStep::Event {
316 node_id,
317 event: event.to_string(),
318 };
319 self.runner.exec_step(step).await?;
320 let _state = self.runner.node(node_id).unwrap().state();
321 }
323 self.idle(Duration::from_millis(100)).await?;
324 }
325 }
326
327 pub async fn run(&mut self, duration: Duration) -> anyhow::Result<()> {
328 let finish = redux::Instant::now() + duration;
329 while redux::Instant::now() < finish {
330 while let Some((node_id, event)) = self.next_event() {
331 let step = ScenarioStep::Event {
332 node_id,
333 event: event.to_string(),
334 };
335 self.runner.exec_step(step).await?;
336 let _state = self.runner.node(node_id).unwrap().state();
337 }
338 self.idle(Duration::from_millis(100)).await?;
339 }
340 Ok(())
341 }
342
343 pub async fn idle(&mut self, duration: Duration) -> anyhow::Result<()> {
344 self.sleep(duration).await;
345 self.runner
346 .exec_step(ScenarioStep::AdvanceTime {
347 by_nanos: duration.as_nanos().try_into()?,
348 })
349 .await?;
350 let nodes = self
351 .runner
352 .nodes_iter()
353 .map(|(node_id, _)| node_id)
354 .collect::<Vec<_>>();
355 for node_id in nodes {
356 self.runner
357 .exec_step(ScenarioStep::CheckTimeouts { node_id })
358 .await?;
359 }
360 Ok(())
361 }
362
363 pub async fn exec_step(&mut self, step: ScenarioStep) -> anyhow::Result<bool> {
364 self.runner.exec_step(step).await
365 }
366
367 pub async fn exec_even_step(
368 &mut self,
369 (node_id, event): (ClusterNodeId, Event),
370 ) -> anyhow::Result<Option<&State>> {
371 let event = event.to_string();
372 let result = if self
373 .runner
374 .exec_step(ScenarioStep::Event { node_id, event })
375 .await?
376 {
377 Some(
378 self.runner
379 .node(node_id)
380 .ok_or(anyhow::format_err!("no node {}", node_id.index()))?
381 .state(),
382 )
383 } else {
384 None
385 };
386 Ok(result)
387 }
388
389 pub fn add_rust_node(
390 &mut self,
391 testing_config: RustNodeTestingConfig,
392 ) -> (ClusterNodeId, PeerId) {
393 let node_id = self.runner.add_rust_node(testing_config);
394 let peer_id = self.runner.node(node_id).unwrap().peer_id();
395 (node_id, peer_id)
396 }
397
398 pub fn add_rust_node_with<Item, F>(
399 &mut self,
400 testing_config: RustNodeTestingConfig,
401 mut f: F,
402 ) -> (ClusterNodeId, Item)
403 where
404 F: FnMut(&State) -> Item,
405 {
406 let node_id = self.runner.add_rust_node(testing_config);
407 let state = self.runner.node(node_id).unwrap().state();
408 let item = f(state);
409 (node_id, item)
410 }
411
412 pub fn inner(&self) -> &ClusterRunner<'_> {
413 &self.runner
414 }
415
416 pub fn inner_mut(&mut self) -> &mut ClusterRunner<'cluster> {
417 &mut self.runner
418 }
419
420 #[allow(dead_code)]
421 pub fn into_inner(self) -> ClusterRunner<'cluster> {
422 self.runner
423 }
424}
425
426pub async fn wait_for_nodes_listening_on_localhost(
428 driver: &mut Driver<'_>,
429 duration: Duration,
430 nodes: impl IntoIterator<Item = ClusterNodeId>,
431) -> anyhow::Result<bool> {
432 let mut nodes = std::collections::BTreeSet::from_iter(nodes); let _ip4_localhost = libp2p::multiaddr::Protocol::Ip4("127.0.0.1".parse().unwrap());
436 let pred = |node_id, event: &_, _state: &_| {
437 if let Some(_addr) = as_event_mio_listener_ready(event) {
438 nodes.remove(&node_id);
439 nodes.is_empty()
440 } else {
441 false
442 }
443 };
444
445 driver.exec_steps_until(duration, pred).await
447}
448
449pub trait PeerPredicate {
450 fn matches(&mut self, node_id: ClusterNodeId, peer_id: &PeerId) -> bool;
451}
452
453impl<F> PeerPredicate for F
454where
455 F: FnMut(ClusterNodeId, &PeerId) -> bool,
456{
457 fn matches(&mut self, node_id: ClusterNodeId, peer_id: &PeerId) -> bool {
458 self(node_id, peer_id)
459 }
460}
461
462impl PeerPredicate for ClusterNodeId {
463 fn matches(&mut self, node_id: ClusterNodeId, _peer_id: &PeerId) -> bool {
464 *self == node_id
465 }
466}
467
468impl PeerPredicate for (ClusterNodeId, &PeerId) {
469 fn matches(&mut self, node_id: ClusterNodeId, peer_id: &PeerId) -> bool {
470 self.0 == node_id && self.1 == peer_id
471 }
472}
473
474impl PeerPredicate for (ClusterNodeId, &mut BTreeSet<PeerId>) {
475 fn matches(&mut self, node_id: ClusterNodeId, peer_id: &PeerId) -> bool {
476 self.0 == node_id && {
477 self.1.remove(peer_id);
478 self.1.is_empty()
479 }
480 }
481}
482
483fn is_network_connection_finalized(conn_state: &P2pNetworkConnectionState) -> Option<&PeerId> {
486 if let P2pNetworkConnectionState {
487 auth:
488 Some(P2pNetworkAuthState::Noise(P2pNetworkNoiseState {
489 inner: Some(P2pNetworkNoiseStateInner::Done { remote_peer_id, .. }),
490 ..
491 })),
492 mux:
493 Some(P2pNetworkConnectionMuxState::Yamux(P2pNetworkYamuxState {
494 terminated: None,
495 init: true,
496 ..
497 })),
498 ..
499 } = conn_state
500 {
501 Some(remote_peer_id)
502 } else {
503 None
504 }
505}
506
507pub async fn wait_for_connection_established<F: PeerPredicate>(
509 driver: &mut Driver<'_>,
510 duration: Duration,
511 mut f: F,
512) -> anyhow::Result<bool> {
513 let pred = |node_id, event: &_, state: &State| {
514 if let Some(addr) = as_event_mio_data_send_receive(event) {
515 let Some(p2p) = state.p2p.ready() else {
516 return false;
517 };
518 let Some(conn_state) = p2p.network.scheduler.connections.get(&addr) else {
519 return false;
520 };
521 if let Some(peer_id) = is_network_connection_finalized(conn_state) {
522 f.matches(node_id, peer_id)
523 } else {
524 false
525 }
526 } else {
527 false
528 }
529 };
530 driver.exec_steps_until(duration, pred).await
531}
532
533pub fn add_rust_nodes1<N, T>(driver: &mut Driver, num: N, config: RustNodeTestingConfig) -> T
545where
546 N: Into<u16>,
547 T: FromIterator<(ClusterNodeId, PeerId)>,
548{
549 (0..num.into())
550 .map(|_| driver.add_rust_node(config.clone()))
551 .collect()
552}
553
554pub fn add_rust_nodes<N, NodeIds, PeerIds>(
555 driver: &mut Driver,
556 num: N,
557 config: RustNodeTestingConfig,
558) -> (NodeIds, PeerIds)
559where
560 N: Into<u16>,
561 NodeIds: Default + Extend<ClusterNodeId>,
562 PeerIds: Default + Extend<PeerId>,
563{
564 (0..num.into())
565 .map(|_| driver.add_rust_node(config.clone()))
566 .unzip()
567}
568
569pub fn add_rust_nodes_with<N, NodeIds, Items, Item, F>(
571 driver: &mut Driver,
572 num: N,
573 config: RustNodeTestingConfig,
574 mut f: F,
575) -> (NodeIds, Items)
576where
577 N: Into<u16>,
578 NodeIds: Default + Extend<ClusterNodeId>,
579 Items: Default + Extend<Item>,
580 F: FnMut(&State) -> Item,
581{
582 (0..num.into())
583 .map(|_| driver.add_rust_node_with(config.clone(), &mut f))
584 .unzip()
585}
586
587pub async fn run_until_no_events(
591 driver: &mut Driver<'_>,
592 quiet_dur: Duration,
593 timeout: Duration,
594) -> anyhow::Result<bool> {
595 let timeout = Instant::now() + timeout;
596 while driver.run_until(quiet_dur, |_, _, _| true).await? {
597 if Instant::now() >= timeout {
598 return Ok(false);
599 }
600 }
601 Ok(true)
602}
603
604pub trait ConnectionPredicate {
605 fn matches(
606 &mut self,
607 node_id: ClusterNodeId,
608 peer_id: &PeerId,
609 peer_status: &P2pPeerStatus,
610 ) -> bool;
611}
612
613pub enum ConnectionPredicates {
614 PeerFinalized(PeerId),
616 PeerIsReady(PeerId),
617 PeerWithErrorStatus(PeerId),
618 PeerDisconnected(PeerId),
619}
620
621impl ConnectionPredicate for (ClusterNodeId, ConnectionPredicates) {
622 fn matches(
623 &mut self,
624 node_id: ClusterNodeId,
625 peer_id: &PeerId,
626 peer_status: &P2pPeerStatus,
627 ) -> bool {
628 node_id == self.0
629 && match &self.1 {
630 ConnectionPredicates::PeerFinalized(pid) => {
631 peer_id == pid
632 && match peer_status {
633 P2pPeerStatus::Connecting(c) => c.is_error(),
634 P2pPeerStatus::Disconnecting { .. } => true,
635 P2pPeerStatus::Disconnected { .. } => true,
636 P2pPeerStatus::Ready(_) => true,
637 }
638 }
639 ConnectionPredicates::PeerIsReady(pid) => {
640 peer_id == pid && peer_status.as_ready().is_some()
641 }
642 ConnectionPredicates::PeerWithErrorStatus(pid) => {
643 peer_id == pid && peer_status.is_error()
644 }
645 ConnectionPredicates::PeerDisconnected(pid) => {
646 peer_id == pid && matches!(peer_status, P2pPeerStatus::Disconnected { .. })
647 }
648 }
649 }
650}
651
652impl ConnectionPredicates {
653 pub fn peer_with_error_status(peer_id: PeerId) -> Self {
654 ConnectionPredicates::PeerWithErrorStatus(peer_id)
655 }
656
657 pub fn peer_disconnected(peer_id: PeerId) -> Self {
658 ConnectionPredicates::PeerDisconnected(peer_id)
659 }
660}
661
662impl<F> ConnectionPredicate for F
663where
664 F: FnMut(ClusterNodeId, &PeerId, &P2pPeerStatus) -> bool,
665{
666 fn matches(
667 &mut self,
668 node_id: ClusterNodeId,
669 peer_id: &PeerId,
670 peer_status: &P2pPeerStatus,
671 ) -> bool {
672 self(node_id, peer_id, peer_status)
673 }
674}
675
676pub async fn wait_for_connection_event<F>(
677 driver: &mut Driver<'_>,
678 duration: Duration,
679 mut f: F,
680) -> anyhow::Result<bool>
681where
682 F: ConnectionPredicate,
683{
684 let pred = |node_id, event: &_, state: &State| {
685 None.or_else(|| {
686 let addr = as_event_mio_connection_event(event)?;
687 let p2p = state.p2p.ready()?;
688 p2p.peers
689 .iter()
690 .find(|(_, peer)| peer_has_addr(peer, addr))
691 .or_else(|| {
692 p2p.network
693 .scheduler
694 .connections
695 .get(&addr)
696 .and_then(|conn_state| conn_state.peer_id())
697 .and_then(|peer_id| {
698 p2p.peers
699 .get(peer_id)
700 .map(|peer_state| (peer_id, peer_state))
701 })
702 })
703 })
704 .is_some_and(|(peer_id, peer)| f.matches(node_id, peer_id, &peer.status))
705 };
706 driver.exec_steps_until(duration, pred).await
707}
708
709pub async fn wait_for_connection_error<F>(
710 driver: &mut Driver<'_>,
711 duration: Duration,
712 mut f: F,
713) -> anyhow::Result<bool>
714where
715 F: ConnectionPredicate,
716{
717 let pred = |node_id, event: &_, state: &State| {
718 None.or_else(|| {
719 let addr = as_event_mio_error(event)?;
720 let p2p = state.p2p.ready()?;
721 p2p.peers.iter().find(|(_, peer)| peer_has_addr(peer, addr))
722 })
723 .is_some_and(|(peer_id, peer)| f.matches(node_id, peer_id, &peer.status))
724 };
725 driver.exec_steps_until(duration, pred).await
726}
727
728pub fn get_peer_state<'a>(
729 cluster: &'a ClusterRunner<'_>,
730 node_id: ClusterNodeId,
731 peer_id: &PeerId,
732) -> Option<&'a P2pPeerState> {
733 let store = cluster.node(node_id).expect("node does not exist");
734 store.state().p2p.get_peer(peer_id)
735}
736
737pub fn peer_exists(cluster: &ClusterRunner<'_>, node_id: ClusterNodeId, peer_id: &PeerId) -> bool {
738 get_peer_state(cluster, node_id, peer_id).is_some()
739}
740
741pub fn peer_is_ready(
742 cluster: &ClusterRunner<'_>,
743 node_id: ClusterNodeId,
744 peer_id: &PeerId,
745) -> bool {
746 matches!(
747 get_peer_state(cluster, node_id, peer_id),
748 Some(P2pPeerState {
749 status: P2pPeerStatus::Ready(_),
750 ..
751 })
752 )
753}
754
755pub fn get_p2p_state<'a>(cluster: &'a ClusterRunner<'a>, node_id: ClusterNodeId) -> &'a P2pState {
756 cluster
757 .node(node_id)
758 .expect("node should exist")
759 .state()
760 .p2p
761 .unwrap()
762}
763pub async fn connect_rust_nodes(
777 cluster: &mut ClusterRunner<'_>,
778 dialer: ClusterNodeId,
779 listener: ClusterNodeId,
780) {
781 cluster
782 .exec_step(crate::scenario::ScenarioStep::ConnectNodes {
783 dialer,
784 listener: crate::scenario::ListenerNode::Rust(listener),
785 })
786 .await
787 .expect("connect event should be dispatched");
788}
789
790pub async fn trace_steps(runner: &mut ClusterRunner<'_>) -> anyhow::Result<()> {
791 loop {
792 while let Some((node_id, event)) = next_event(runner) {
793 println!("{node_id} event: {event}");
794 let step = ScenarioStep::Event {
795 node_id,
796 event: event.to_string(),
797 };
798 runner.exec_step(step).await?;
799 }
800 idle(runner, Duration::from_millis(100)).await?;
801 }
802}
803
804pub async fn trace_steps_state<T: Debug, F: Fn(&State) -> T>(
805 runner: &mut ClusterRunner<'_>,
806 f: F,
807) -> anyhow::Result<()> {
808 loop {
809 while let Some((node_id, event)) = next_event(runner) {
810 println!("{node_id} event: {event}");
811 let step = ScenarioStep::Event {
812 node_id,
813 event: event.to_string(),
814 };
815 runner.exec_step(step).await?;
816 let state = runner.node(node_id).unwrap().state();
817 let t = f(state);
818 println!("{node_id} state: {t:#?}");
819 }
820 idle(runner, Duration::from_millis(100)).await?;
821 }
822}
823
824pub async fn idle(runner: &mut ClusterRunner<'_>, duration: Duration) -> anyhow::Result<()> {
825 tokio::time::sleep(duration).await;
826 runner
827 .exec_step(ScenarioStep::AdvanceTime {
828 by_nanos: duration.as_nanos().try_into()?,
829 })
830 .await?;
831 let nodes = runner
832 .nodes_iter()
833 .map(|(node_id, _)| node_id)
834 .collect::<Vec<_>>();
835 for node_id in nodes {
836 runner
837 .exec_step(ScenarioStep::CheckTimeouts { node_id })
838 .await?;
839 }
840 Ok(())
841}
842pub fn next_event(runner: &mut ClusterRunner<'_>) -> Option<(ClusterNodeId, Event)> {
843 runner
844 .pending_events(true)
845 .find_map(|(node_id, _, mut events)| {
846 events.next().map(|(_, event)| (node_id, event.clone()))
847 })
848}