mina_node_testing/scenarios/
driver.rs

1use std::{
2    collections::BTreeSet,
3    fmt::Debug,
4    net::SocketAddr,
5    time::{Duration, Instant},
6};
7
8use node::{
9    event_source::Event,
10    p2p::{
11        connection::outgoing::P2pConnectionOutgoingInitOpts,
12        webrtc::{Host, HttpSignalingInfo, SignalingMethod},
13        ConnectionAddr, P2pConnectionEvent, P2pEvent, P2pNetworkConnectionMuxState,
14        P2pNetworkConnectionState, P2pNetworkYamuxState, P2pPeerState, P2pPeerStatus, P2pState,
15        PeerId,
16    },
17    State,
18};
19
20use node::p2p::connection::outgoing::P2pConnectionOutgoingInitLibp2pOpts;
21
22use node::p2p::{MioEvent, P2pNetworkAuthState, P2pNetworkNoiseState, P2pNetworkNoiseStateInner};
23
24use crate::{cluster::ClusterNodeId, node::RustNodeTestingConfig, scenario::ScenarioStep};
25
26use super::ClusterRunner;
27
28pub fn match_addr_with_port_and_peer_id(
29    port: u16,
30    peer_id: PeerId,
31) -> impl Fn(&P2pConnectionOutgoingInitOpts) -> bool {
32    move |conn_opt| match conn_opt {
33        P2pConnectionOutgoingInitOpts::WebRTC {
34            peer_id: pid,
35            signaling:
36                SignalingMethod::Http(HttpSignalingInfo {
37                    host: Host::Ipv4(_ip4),
38                    port: p,
39                }),
40        }
41        | P2pConnectionOutgoingInitOpts::WebRTC {
42            peer_id: pid,
43            signaling:
44                SignalingMethod::Https(HttpSignalingInfo {
45                    host: Host::Ipv4(_ip4),
46                    port: p,
47                }),
48        } => &peer_id == pid && port == *p,
49        P2pConnectionOutgoingInitOpts::LibP2P(libp2p_opts) => {
50            libp2p_opts.peer_id == peer_id && libp2p_opts.port == port
51        }
52        _ => false,
53    }
54}
55
56pub fn get_peers_iter(
57    data: &serde_json::Value,
58) -> Option<impl Iterator<Item = Option<(&str, i64, &str)>>> {
59    let iter = data
60        .as_object()?
61        .get("data")?
62        .get("getPeers")?
63        .as_array()?
64        .iter()
65        .map(|elt| {
66            let elt = elt.as_object()?;
67            let host = elt.get("host")?.as_str()?;
68            let port = elt.get("libp2pPort")?.as_i64()?;
69            let peer_id = elt.get("peerId")?.as_str()?;
70            Some((host, port, peer_id))
71        });
72    Some(iter)
73}
74
75pub const PEERS_QUERY: &str = r#"query {
76  getPeers {
77    host
78    libp2pPort
79    peerId
80  }
81}"#;
82
83pub fn connection_finalized_event(
84    pred: impl Fn(ClusterNodeId, &PeerId) -> bool,
85) -> impl Fn(ClusterNodeId, &Event, &State) -> bool {
86    move |node_id, event, _| {
87        matches!(
88            event,
89            Event::P2p(P2pEvent::Connection(P2pConnectionEvent::Finalized(peer, res))) if pred(node_id, peer) && res.is_ok()
90        )
91    }
92}
93
94fn peer_has_addr(peer: &P2pPeerState, addr: ConnectionAddr) -> bool {
95    match (&peer.dial_opts, addr) {
96        (
97            Some(P2pConnectionOutgoingInitOpts::LibP2P(P2pConnectionOutgoingInitLibp2pOpts {
98                host: Host::Ipv4(host),
99                port,
100                ..
101            })),
102            ConnectionAddr {
103                sock_addr: SocketAddr::V4(addr),
104                ..
105            },
106        ) => addr.ip() == host && addr.port() == *port,
107        _ => false,
108    }
109}
110
111pub fn as_event_mio_interface_detected(event: &Event) -> Option<&std::net::IpAddr> {
112    if let Event::P2p(P2pEvent::MioEvent(MioEvent::InterfaceDetected(ip_addr))) = event {
113        Some(ip_addr)
114    } else {
115        None
116    }
117}
118
119pub fn as_event_mio_listener_ready(event: &Event) -> Option<&std::net::SocketAddr> {
120    if let Event::P2p(P2pEvent::MioEvent(MioEvent::ListenerReady { listener })) = event {
121        Some(listener)
122    } else {
123        None
124    }
125}
126
127pub fn as_event_mio_error(event: &Event) -> Option<ConnectionAddr> {
128    match event {
129        Event::P2p(P2pEvent::MioEvent(MioEvent::ConnectionDidClose(addr, res))) if res.is_err() => {
130            Some(*addr)
131        }
132        _ => None,
133    }
134}
135
136pub fn as_event_mio_data_send_receive(event: &Event) -> Option<ConnectionAddr> {
137    match event {
138        Event::P2p(P2pEvent::MioEvent(
139            MioEvent::IncomingDataDidReceive(addr, _) | MioEvent::OutgoingDataDidSend(addr, _),
140        )) => Some(*addr),
141        _ => None,
142    }
143}
144
145pub fn as_event_mio_connection_event(event: &Event) -> Option<ConnectionAddr> {
146    match event {
147        Event::P2p(P2pEvent::MioEvent(
148            MioEvent::IncomingDataDidReceive(addr, _)
149            | MioEvent::OutgoingDataDidSend(addr, _)
150            | MioEvent::ConnectionDidClose(addr, _),
151        )) => Some(*addr),
152        _ => None,
153    }
154}
155
156pub fn as_event_mio_outgoing_connection(
157    event: &Event,
158) -> Option<(ConnectionAddr, &Result<(), String>)> {
159    match event {
160        Event::P2p(P2pEvent::MioEvent(MioEvent::OutgoingConnectionDidConnect(addr, result))) => {
161            Some((*addr, result))
162        }
163        _ => None,
164    }
165}
166
167pub struct Driver<'cluster> {
168    runner: ClusterRunner<'cluster>,
169    emulated_time: bool,
170}
171
172impl<'cluster> Driver<'cluster> {
173    pub fn new(runner: ClusterRunner<'cluster>) -> Self {
174        Driver {
175            runner,
176            emulated_time: false,
177        }
178    }
179
180    pub fn with_emulated_time(runner: ClusterRunner<'cluster>) -> Self {
181        Driver {
182            runner,
183            emulated_time: true,
184        }
185    }
186
187    async fn sleep(&self, duration: Duration) {
188        if !self.emulated_time {
189            tokio::time::sleep(duration).await;
190        }
191    }
192
193    /// Waits for a specific event that satisfies the given predicate, executing all events encountered along the way.
194    ///
195    /// # Arguments
196    ///
197    /// * `duration` - Maximum time to wait for the event
198    /// * `f` - A predicate function that takes a node ID, event, and state, returning true when the desired event is found
199    ///
200    /// # Returns
201    ///
202    /// Returns a Result containing:
203    /// * `Some((node_id, event))` - If an event satisfying the predicate is found before the timeout
204    /// * `None` - If no matching event is found within the timeout period
205    ///
206    /// # Example
207    ///
208    /// ```no_run
209    /// driver.wait_for(Duration::from_secs(5), |node_id, event, state| {
210    ///     matches!(event, Event::BlockReceived { .. })
211    /// }).await?;
212    /// ```
213    pub async fn wait_for(
214        &mut self,
215        duration: Duration,
216        mut f: impl FnMut(ClusterNodeId, &Event, &State) -> bool,
217    ) -> anyhow::Result<Option<(ClusterNodeId, Event)>> {
218        let timeout = redux::Instant::now() + duration;
219        while redux::Instant::now() < timeout {
220            let mut steps = Vec::new();
221            let mut found = None;
222            for (node_id, state, events) in self.runner.pending_events(true) {
223                for (_, event) in events {
224                    if f(node_id, event, state) {
225                        found = Some((node_id, event.clone()));
226                        break;
227                    } else {
228                        let event = event.to_string();
229                        steps.push(ScenarioStep::Event { node_id, event });
230                    }
231                }
232            }
233            for step in steps {
234                self.runner.exec_step(step).await?;
235            }
236            if found.is_some() {
237                return Ok(found);
238            }
239            self.idle(Duration::from_millis(100)).await?;
240        }
241        Ok(None)
242    }
243
244    pub async fn run_until(
245        &mut self,
246        duration: Duration,
247        mut f: impl FnMut(ClusterNodeId, &Event, &State) -> bool,
248    ) -> anyhow::Result<bool> {
249        let timeout = redux::Instant::now() + duration;
250        while redux::Instant::now() < timeout {
251            let mut steps = Vec::new();
252            let mut found = false;
253            'pending_events: for (node_id, state, events) in self.runner.pending_events(true) {
254                for (_, event) in events {
255                    found = f(node_id, event, state);
256                    steps.push(ScenarioStep::Event {
257                        node_id,
258                        event: event.to_string(),
259                    });
260                    if found {
261                        break 'pending_events;
262                    }
263                }
264            }
265            for step in steps {
266                self.runner.exec_step(step).await?;
267            }
268            if found {
269                return Ok(true);
270            }
271            self.idle(Duration::from_millis(100)).await?;
272        }
273        Ok(false)
274    }
275
276    /// Executes all events as steps, until the predicate `f` reports true. The
277    /// predicate is checked each time after executing an event step.
278    pub async fn exec_steps_until(
279        &mut self,
280        duration: Duration,
281        mut f: impl FnMut(ClusterNodeId, &Event, &State) -> bool,
282    ) -> anyhow::Result<bool> {
283        let timeout = redux::Instant::now() + duration;
284        while redux::Instant::now() < timeout {
285            while let Some((node_id, event)) = self.next_event() {
286                let step = ScenarioStep::Event {
287                    node_id,
288                    event: event.to_string(),
289                };
290                self.runner.exec_step(step).await?;
291                let state = self.runner.node(node_id).unwrap().state();
292                if f(node_id, &event, state) {
293                    return Ok(true);
294                }
295            }
296            self.idle(Duration::from_millis(100)).await?;
297        }
298        Ok(false)
299    }
300
301    pub fn next_event(&mut self) -> Option<(ClusterNodeId, Event)> {
302        self.runner
303            .pending_events(true)
304            .find_map(|(node_id, _, mut events)| {
305                events.next().map(|(_, event)| (node_id, event.clone()))
306            })
307    }
308
309    pub async fn trace_steps(&mut self) -> anyhow::Result<()> {
310        loop {
311            while let Some((node_id, event)) = self.next_event() {
312                println!("{node_id} event: {event}");
313                let step = ScenarioStep::Event {
314                    node_id,
315                    event: event.to_string(),
316                };
317                self.runner.exec_step(step).await?;
318                let _state = self.runner.node(node_id).unwrap().state();
319                // println!("{node_id} state: {state:#?}, state = state.p2p");
320            }
321            self.idle(Duration::from_millis(100)).await?;
322        }
323    }
324
325    pub async fn run(&mut self, duration: Duration) -> anyhow::Result<()> {
326        let finish = redux::Instant::now() + duration;
327        while redux::Instant::now() < finish {
328            while let Some((node_id, event)) = self.next_event() {
329                let step = ScenarioStep::Event {
330                    node_id,
331                    event: event.to_string(),
332                };
333                self.runner.exec_step(step).await?;
334                let _state = self.runner.node(node_id).unwrap().state();
335            }
336            self.idle(Duration::from_millis(100)).await?;
337        }
338        Ok(())
339    }
340
341    pub async fn idle(&mut self, duration: Duration) -> anyhow::Result<()> {
342        self.sleep(duration).await;
343        self.runner
344            .exec_step(ScenarioStep::AdvanceTime {
345                by_nanos: duration.as_nanos().try_into()?,
346            })
347            .await?;
348        let nodes = self
349            .runner
350            .nodes_iter()
351            .map(|(node_id, _)| node_id)
352            .collect::<Vec<_>>();
353        for node_id in nodes {
354            self.runner
355                .exec_step(ScenarioStep::CheckTimeouts { node_id })
356                .await?;
357        }
358        Ok(())
359    }
360
361    pub async fn exec_step(&mut self, step: ScenarioStep) -> anyhow::Result<bool> {
362        self.runner.exec_step(step).await
363    }
364
365    pub async fn exec_even_step(
366        &mut self,
367        (node_id, event): (ClusterNodeId, Event),
368    ) -> anyhow::Result<Option<&State>> {
369        let event = event.to_string();
370        let result = if self
371            .runner
372            .exec_step(ScenarioStep::Event { node_id, event })
373            .await?
374        {
375            Some(
376                self.runner
377                    .node(node_id)
378                    .ok_or(anyhow::format_err!("no node {}", node_id.index()))?
379                    .state(),
380            )
381        } else {
382            None
383        };
384        Ok(result)
385    }
386
387    pub fn add_rust_node(
388        &mut self,
389        testing_config: RustNodeTestingConfig,
390    ) -> (ClusterNodeId, PeerId) {
391        let node_id = self.runner.add_rust_node(testing_config);
392        let peer_id = self.runner.node(node_id).unwrap().peer_id();
393        (node_id, peer_id)
394    }
395
396    pub fn add_rust_node_with<Item, F>(
397        &mut self,
398        testing_config: RustNodeTestingConfig,
399        mut f: F,
400    ) -> (ClusterNodeId, Item)
401    where
402        F: FnMut(&State) -> Item,
403    {
404        let node_id = self.runner.add_rust_node(testing_config);
405        let state = self.runner.node(node_id).unwrap().state();
406        let item = f(state);
407        (node_id, item)
408    }
409
410    pub fn inner(&self) -> &ClusterRunner<'_> {
411        &self.runner
412    }
413
414    pub fn inner_mut(&mut self) -> &mut ClusterRunner<'cluster> {
415        &mut self.runner
416    }
417
418    #[allow(dead_code)]
419    pub fn into_inner(self) -> ClusterRunner<'cluster> {
420        self.runner
421    }
422}
423
424/// Runs the cluster until each of the `nodes` is listening on the localhost interface.
425pub async fn wait_for_nodes_listening_on_localhost(
426    driver: &mut Driver<'_>,
427    duration: Duration,
428    nodes: impl IntoIterator<Item = ClusterNodeId>,
429) -> anyhow::Result<bool> {
430    let mut nodes = std::collections::BTreeSet::from_iter(nodes); // TODO: filter out nodes that already listening
431
432    // predicate matching event "listening on localhost interface"
433    let _ip4_localhost = libp2p::multiaddr::Protocol::Ip4("127.0.0.1".parse().unwrap());
434    let pred = |node_id, event: &_, _state: &_| {
435        if let Some(_addr) = as_event_mio_listener_ready(event) {
436            nodes.remove(&node_id);
437            nodes.is_empty()
438        } else {
439            false
440        }
441    };
442
443    // wait for all peers to listen
444    driver.exec_steps_until(duration, pred).await
445}
446
447pub trait PeerPredicate {
448    fn matches(&mut self, node_id: ClusterNodeId, peer_id: &PeerId) -> bool;
449}
450
451impl<F> PeerPredicate for F
452where
453    F: FnMut(ClusterNodeId, &PeerId) -> bool,
454{
455    fn matches(&mut self, node_id: ClusterNodeId, peer_id: &PeerId) -> bool {
456        self(node_id, peer_id)
457    }
458}
459
460impl PeerPredicate for ClusterNodeId {
461    fn matches(&mut self, node_id: ClusterNodeId, _peer_id: &PeerId) -> bool {
462        *self == node_id
463    }
464}
465
466impl PeerPredicate for (ClusterNodeId, &PeerId) {
467    fn matches(&mut self, node_id: ClusterNodeId, peer_id: &PeerId) -> bool {
468        self.0 == node_id && self.1 == peer_id
469    }
470}
471
472impl PeerPredicate for (ClusterNodeId, &mut BTreeSet<PeerId>) {
473    fn matches(&mut self, node_id: ClusterNodeId, peer_id: &PeerId) -> bool {
474        self.0 == node_id && {
475            self.1.remove(peer_id);
476            self.1.is_empty()
477        }
478    }
479}
480
481/// Returns connection peer_id iff the connection is finalized, i.e. multiplexing protocol is
482/// negotiated.
483fn is_network_connection_finalized(conn_state: &P2pNetworkConnectionState) -> Option<&PeerId> {
484    if let P2pNetworkConnectionState {
485        auth:
486            Some(P2pNetworkAuthState::Noise(P2pNetworkNoiseState {
487                inner: Some(P2pNetworkNoiseStateInner::Done { remote_peer_id, .. }),
488                ..
489            })),
490        mux:
491            Some(P2pNetworkConnectionMuxState::Yamux(P2pNetworkYamuxState {
492                terminated: None,
493                init: true,
494                ..
495            })),
496        ..
497    } = conn_state
498    {
499        Some(remote_peer_id)
500    } else {
501        None
502    }
503}
504
505/// Runst the cluster until the node is connected to the node that satisfies the predicate.
506pub async fn wait_for_connection_established<F: PeerPredicate>(
507    driver: &mut Driver<'_>,
508    duration: Duration,
509    mut f: F,
510) -> anyhow::Result<bool> {
511    let pred = |node_id, event: &_, state: &State| {
512        if let Some(addr) = as_event_mio_data_send_receive(event) {
513            let Some(p2p) = state.p2p.ready() else {
514                return false;
515            };
516            let Some(conn_state) = p2p.network.scheduler.connections.get(&addr) else {
517                return false;
518            };
519            if let Some(peer_id) = is_network_connection_finalized(conn_state) {
520                f.matches(node_id, peer_id)
521            } else {
522                false
523            }
524        } else {
525            false
526        }
527    };
528    driver.exec_steps_until(duration, pred).await
529}
530
531// pub async fn wait_for_disconnected<P: PeerPredicate>(
532//     driver: &mut Driver<'_>,
533//     duration: Duration,
534//     mut p: P,
535// ) -> anyhow::Result<bool> {
536//     driver.exec_steps_until(duration, |node_id, event, state| {
537//         if let as_
538//     })
539// }
540
541/// Creates `num` Rust nodes in the cluster
542pub fn add_rust_nodes1<N, T>(driver: &mut Driver, num: N, config: RustNodeTestingConfig) -> T
543where
544    N: Into<u16>,
545    T: FromIterator<(ClusterNodeId, PeerId)>,
546{
547    (0..num.into())
548        .map(|_| driver.add_rust_node(config.clone()))
549        .collect()
550}
551
552pub fn add_rust_nodes<N, NodeIds, PeerIds>(
553    driver: &mut Driver,
554    num: N,
555    config: RustNodeTestingConfig,
556) -> (NodeIds, PeerIds)
557where
558    N: Into<u16>,
559    NodeIds: Default + Extend<ClusterNodeId>,
560    PeerIds: Default + Extend<PeerId>,
561{
562    (0..num.into())
563        .map(|_| driver.add_rust_node(config.clone()))
564        .unzip()
565}
566
567/// Creates `num` Rust nodes in the cluster
568pub fn add_rust_nodes_with<N, NodeIds, Items, Item, F>(
569    driver: &mut Driver,
570    num: N,
571    config: RustNodeTestingConfig,
572    mut f: F,
573) -> (NodeIds, Items)
574where
575    N: Into<u16>,
576    NodeIds: Default + Extend<ClusterNodeId>,
577    Items: Default + Extend<Item>,
578    F: FnMut(&State) -> Item,
579{
580    (0..num.into())
581        .map(|_| driver.add_rust_node_with(config.clone(), &mut f))
582        .unzip()
583}
584
585/// Runs cluster until there is a `quiet_dur` period of no events, returning
586/// `Ok(true)` in this case. If there is no such period for `timeout` period of
587/// time, then returns `Ok(false)`
588pub async fn run_until_no_events(
589    driver: &mut Driver<'_>,
590    quiet_dur: Duration,
591    timeout: Duration,
592) -> anyhow::Result<bool> {
593    let timeout = Instant::now() + timeout;
594    while driver.run_until(quiet_dur, |_, _, _| true).await? {
595        if Instant::now() >= timeout {
596            return Ok(false);
597        }
598    }
599    Ok(true)
600}
601
602pub trait ConnectionPredicate {
603    fn matches(
604        &mut self,
605        node_id: ClusterNodeId,
606        peer_id: &PeerId,
607        peer_status: &P2pPeerStatus,
608    ) -> bool;
609}
610
611pub enum ConnectionPredicates {
612    /// Connection with peer is finalized, either successfully or with error.
613    PeerFinalized(PeerId),
614    PeerIsReady(PeerId),
615    PeerWithErrorStatus(PeerId),
616    PeerDisconnected(PeerId),
617}
618
619impl ConnectionPredicate for (ClusterNodeId, ConnectionPredicates) {
620    fn matches(
621        &mut self,
622        node_id: ClusterNodeId,
623        peer_id: &PeerId,
624        peer_status: &P2pPeerStatus,
625    ) -> bool {
626        node_id == self.0
627            && match &self.1 {
628                ConnectionPredicates::PeerFinalized(pid) => {
629                    peer_id == pid
630                        && match peer_status {
631                            P2pPeerStatus::Connecting(c) => c.is_error(),
632                            P2pPeerStatus::Disconnecting { .. } => true,
633                            P2pPeerStatus::Disconnected { .. } => true,
634                            P2pPeerStatus::Ready(_) => true,
635                        }
636                }
637                ConnectionPredicates::PeerIsReady(pid) => {
638                    peer_id == pid && peer_status.as_ready().is_some()
639                }
640                ConnectionPredicates::PeerWithErrorStatus(pid) => {
641                    peer_id == pid && peer_status.is_error()
642                }
643                ConnectionPredicates::PeerDisconnected(pid) => {
644                    peer_id == pid && matches!(peer_status, P2pPeerStatus::Disconnected { .. })
645                }
646            }
647    }
648}
649
650impl ConnectionPredicates {
651    pub fn peer_with_error_status(peer_id: PeerId) -> Self {
652        ConnectionPredicates::PeerWithErrorStatus(peer_id)
653    }
654
655    pub fn peer_disconnected(peer_id: PeerId) -> Self {
656        ConnectionPredicates::PeerDisconnected(peer_id)
657    }
658}
659
660impl<F> ConnectionPredicate for F
661where
662    F: FnMut(ClusterNodeId, &PeerId, &P2pPeerStatus) -> bool,
663{
664    fn matches(
665        &mut self,
666        node_id: ClusterNodeId,
667        peer_id: &PeerId,
668        peer_status: &P2pPeerStatus,
669    ) -> bool {
670        self(node_id, peer_id, peer_status)
671    }
672}
673
674pub async fn wait_for_connection_event<F>(
675    driver: &mut Driver<'_>,
676    duration: Duration,
677    mut f: F,
678) -> anyhow::Result<bool>
679where
680    F: ConnectionPredicate,
681{
682    let pred = |node_id, event: &_, state: &State| {
683        None.or_else(|| {
684            let addr = as_event_mio_connection_event(event)?;
685            let p2p = state.p2p.ready()?;
686            p2p.peers
687                .iter()
688                .find(|(_, peer)| peer_has_addr(peer, addr))
689                .or_else(|| {
690                    p2p.network
691                        .scheduler
692                        .connections
693                        .get(&addr)
694                        .and_then(|conn_state| conn_state.peer_id())
695                        .and_then(|peer_id| {
696                            p2p.peers
697                                .get(peer_id)
698                                .map(|peer_state| (peer_id, peer_state))
699                        })
700                })
701        })
702        .is_some_and(|(peer_id, peer)| f.matches(node_id, peer_id, &peer.status))
703    };
704    driver.exec_steps_until(duration, pred).await
705}
706
707pub async fn wait_for_connection_error<F>(
708    driver: &mut Driver<'_>,
709    duration: Duration,
710    mut f: F,
711) -> anyhow::Result<bool>
712where
713    F: ConnectionPredicate,
714{
715    let pred = |node_id, event: &_, state: &State| {
716        None.or_else(|| {
717            let addr = as_event_mio_error(event)?;
718            let p2p = state.p2p.ready()?;
719            p2p.peers.iter().find(|(_, peer)| peer_has_addr(peer, addr))
720        })
721        .is_some_and(|(peer_id, peer)| f.matches(node_id, peer_id, &peer.status))
722    };
723    driver.exec_steps_until(duration, pred).await
724}
725
726pub fn get_peer_state<'a>(
727    cluster: &'a ClusterRunner<'_>,
728    node_id: ClusterNodeId,
729    peer_id: &PeerId,
730) -> Option<&'a P2pPeerState> {
731    let store = cluster.node(node_id).expect("node does not exist");
732    store.state().p2p.get_peer(peer_id)
733}
734
735pub fn peer_exists(cluster: &ClusterRunner<'_>, node_id: ClusterNodeId, peer_id: &PeerId) -> bool {
736    get_peer_state(cluster, node_id, peer_id).is_some()
737}
738
739pub fn peer_is_ready(
740    cluster: &ClusterRunner<'_>,
741    node_id: ClusterNodeId,
742    peer_id: &PeerId,
743) -> bool {
744    matches!(
745        get_peer_state(cluster, node_id, peer_id),
746        Some(P2pPeerState {
747            status: P2pPeerStatus::Ready(_),
748            ..
749        })
750    )
751}
752
753pub fn get_p2p_state<'a>(cluster: &'a ClusterRunner<'a>, node_id: ClusterNodeId) -> &'a P2pState {
754    cluster
755        .node(node_id)
756        .expect("node should exist")
757        .state()
758        .p2p
759        .unwrap()
760}
761// pub fn get_peers<'a>(
762//     cluster: &'a ClusterRunner<'a>,
763//     node_id: ClusterNodeId,
764// ) -> impl Iterator<Item = (&'a PeerId, &'a P2pPeerState)> {
765//     cluster
766//         .node(node_id)
767//         .expect("node should exist")
768//         .state()
769//         .p2p
770//         .peers
771//         .iter()
772// }
773
774pub async fn connect_rust_nodes(
775    cluster: &mut ClusterRunner<'_>,
776    dialer: ClusterNodeId,
777    listener: ClusterNodeId,
778) {
779    cluster
780        .exec_step(crate::scenario::ScenarioStep::ConnectNodes {
781            dialer,
782            listener: crate::scenario::ListenerNode::Rust(listener),
783        })
784        .await
785        .expect("connect event should be dispatched");
786}
787
788pub async fn trace_steps(runner: &mut ClusterRunner<'_>) -> anyhow::Result<()> {
789    loop {
790        while let Some((node_id, event)) = next_event(runner) {
791            println!("{node_id} event: {event}");
792            let step = ScenarioStep::Event {
793                node_id,
794                event: event.to_string(),
795            };
796            runner.exec_step(step).await?;
797        }
798        idle(runner, Duration::from_millis(100)).await?;
799    }
800}
801
802pub async fn trace_steps_state<T: Debug, F: Fn(&State) -> T>(
803    runner: &mut ClusterRunner<'_>,
804    f: F,
805) -> anyhow::Result<()> {
806    loop {
807        while let Some((node_id, event)) = next_event(runner) {
808            println!("{node_id} event: {event}");
809            let step = ScenarioStep::Event {
810                node_id,
811                event: event.to_string(),
812            };
813            runner.exec_step(step).await?;
814            let state = runner.node(node_id).unwrap().state();
815            let t = f(state);
816            println!("{node_id} state: {t:#?}");
817        }
818        idle(runner, Duration::from_millis(100)).await?;
819    }
820}
821
822pub async fn idle(runner: &mut ClusterRunner<'_>, duration: Duration) -> anyhow::Result<()> {
823    tokio::time::sleep(duration).await;
824    runner
825        .exec_step(ScenarioStep::AdvanceTime {
826            by_nanos: duration.as_nanos().try_into()?,
827        })
828        .await?;
829    let nodes = runner
830        .nodes_iter()
831        .map(|(node_id, _)| node_id)
832        .collect::<Vec<_>>();
833    for node_id in nodes {
834        runner
835            .exec_step(ScenarioStep::CheckTimeouts { node_id })
836            .await?;
837    }
838    Ok(())
839}
840pub fn next_event(runner: &mut ClusterRunner<'_>) -> Option<(ClusterNodeId, Event)> {
841    runner
842        .pending_events(true)
843        .find_map(|(node_id, _, mut events)| {
844            events.next().map(|(_, event)| (node_id, event.clone()))
845        })
846}