mina_node_testing/scenarios/
driver.rs

1use std::{
2    collections::BTreeSet,
3    fmt::Debug,
4    net::SocketAddr,
5    time::{Duration, Instant},
6};
7
8use mina_node::{
9    event_source::Event,
10    p2p::{
11        connection::outgoing::P2pConnectionOutgoingInitOpts,
12        webrtc::{Host, HttpSignalingInfo, SignalingMethod},
13        ConnectionAddr, P2pConnectionEvent, P2pEvent, P2pNetworkConnectionMuxState,
14        P2pNetworkConnectionState, P2pNetworkYamuxState, P2pPeerState, P2pPeerStatus, P2pState,
15        PeerId,
16    },
17    State,
18};
19
20use mina_node::p2p::connection::outgoing::P2pConnectionOutgoingInitLibp2pOpts;
21
22use mina_node::p2p::{
23    MioEvent, P2pNetworkAuthState, P2pNetworkNoiseState, P2pNetworkNoiseStateInner,
24};
25
26use crate::{cluster::ClusterNodeId, node::RustNodeTestingConfig, scenario::ScenarioStep};
27
28use super::ClusterRunner;
29
30pub fn match_addr_with_port_and_peer_id(
31    port: u16,
32    peer_id: PeerId,
33) -> impl Fn(&P2pConnectionOutgoingInitOpts) -> bool {
34    move |conn_opt| match conn_opt {
35        P2pConnectionOutgoingInitOpts::WebRTC {
36            peer_id: pid,
37            signaling:
38                SignalingMethod::Http(HttpSignalingInfo {
39                    host: Host::Ipv4(_ip4),
40                    port: p,
41                }),
42        }
43        | P2pConnectionOutgoingInitOpts::WebRTC {
44            peer_id: pid,
45            signaling:
46                SignalingMethod::Https(HttpSignalingInfo {
47                    host: Host::Ipv4(_ip4),
48                    port: p,
49                }),
50        } => &peer_id == pid && port == *p,
51        P2pConnectionOutgoingInitOpts::LibP2P(libp2p_opts) => {
52            libp2p_opts.peer_id == peer_id && libp2p_opts.port == port
53        }
54        _ => false,
55    }
56}
57
58pub fn get_peers_iter(
59    data: &serde_json::Value,
60) -> Option<impl Iterator<Item = Option<(&str, i64, &str)>>> {
61    let iter = data
62        .as_object()?
63        .get("data")?
64        .get("getPeers")?
65        .as_array()?
66        .iter()
67        .map(|elt| {
68            let elt = elt.as_object()?;
69            let host = elt.get("host")?.as_str()?;
70            let port = elt.get("libp2pPort")?.as_i64()?;
71            let peer_id = elt.get("peerId")?.as_str()?;
72            Some((host, port, peer_id))
73        });
74    Some(iter)
75}
76
77pub const PEERS_QUERY: &str = r#"query {
78  getPeers {
79    host
80    libp2pPort
81    peerId
82  }
83}"#;
84
85pub fn connection_finalized_event(
86    pred: impl Fn(ClusterNodeId, &PeerId) -> bool,
87) -> impl Fn(ClusterNodeId, &Event, &State) -> bool {
88    move |node_id, event, _| {
89        matches!(
90            event,
91            Event::P2p(P2pEvent::Connection(P2pConnectionEvent::Finalized(peer, res))) if pred(node_id, peer) && res.is_ok()
92        )
93    }
94}
95
96fn peer_has_addr(peer: &P2pPeerState, addr: ConnectionAddr) -> bool {
97    match (&peer.dial_opts, addr) {
98        (
99            Some(P2pConnectionOutgoingInitOpts::LibP2P(P2pConnectionOutgoingInitLibp2pOpts {
100                host: Host::Ipv4(host),
101                port,
102                ..
103            })),
104            ConnectionAddr {
105                sock_addr: SocketAddr::V4(addr),
106                ..
107            },
108        ) => addr.ip() == host && addr.port() == *port,
109        _ => false,
110    }
111}
112
113pub fn as_event_mio_interface_detected(event: &Event) -> Option<&std::net::IpAddr> {
114    if let Event::P2p(P2pEvent::MioEvent(MioEvent::InterfaceDetected(ip_addr))) = event {
115        Some(ip_addr)
116    } else {
117        None
118    }
119}
120
121pub fn as_event_mio_listener_ready(event: &Event) -> Option<&std::net::SocketAddr> {
122    if let Event::P2p(P2pEvent::MioEvent(MioEvent::ListenerReady { listener })) = event {
123        Some(listener)
124    } else {
125        None
126    }
127}
128
129pub fn as_event_mio_error(event: &Event) -> Option<ConnectionAddr> {
130    match event {
131        Event::P2p(P2pEvent::MioEvent(MioEvent::ConnectionDidClose(addr, res))) if res.is_err() => {
132            Some(*addr)
133        }
134        _ => None,
135    }
136}
137
138pub fn as_event_mio_data_send_receive(event: &Event) -> Option<ConnectionAddr> {
139    match event {
140        Event::P2p(P2pEvent::MioEvent(
141            MioEvent::IncomingDataDidReceive(addr, _) | MioEvent::OutgoingDataDidSend(addr, _),
142        )) => Some(*addr),
143        _ => None,
144    }
145}
146
147pub fn as_event_mio_connection_event(event: &Event) -> Option<ConnectionAddr> {
148    match event {
149        Event::P2p(P2pEvent::MioEvent(
150            MioEvent::IncomingDataDidReceive(addr, _)
151            | MioEvent::OutgoingDataDidSend(addr, _)
152            | MioEvent::ConnectionDidClose(addr, _),
153        )) => Some(*addr),
154        _ => None,
155    }
156}
157
158pub fn as_event_mio_outgoing_connection(
159    event: &Event,
160) -> Option<(ConnectionAddr, &Result<(), String>)> {
161    match event {
162        Event::P2p(P2pEvent::MioEvent(MioEvent::OutgoingConnectionDidConnect(addr, result))) => {
163            Some((*addr, result))
164        }
165        _ => None,
166    }
167}
168
169pub struct Driver<'cluster> {
170    runner: ClusterRunner<'cluster>,
171    emulated_time: bool,
172}
173
174impl<'cluster> Driver<'cluster> {
175    pub fn new(runner: ClusterRunner<'cluster>) -> Self {
176        Driver {
177            runner,
178            emulated_time: false,
179        }
180    }
181
182    pub fn with_emulated_time(runner: ClusterRunner<'cluster>) -> Self {
183        Driver {
184            runner,
185            emulated_time: true,
186        }
187    }
188
189    async fn sleep(&self, duration: Duration) {
190        if !self.emulated_time {
191            tokio::time::sleep(duration).await;
192        }
193    }
194
195    /// Waits for a specific event that satisfies the given predicate, executing all events encountered along the way.
196    ///
197    /// # Arguments
198    ///
199    /// * `duration` - Maximum time to wait for the event
200    /// * `f` - A predicate function that takes a node ID, event, and state, returning true when the desired event is found
201    ///
202    /// # Returns
203    ///
204    /// Returns a Result containing:
205    /// * `Some((node_id, event))` - If an event satisfying the predicate is found before the timeout
206    /// * `None` - If no matching event is found within the timeout period
207    ///
208    /// # Example
209    ///
210    /// ```no_run
211    /// driver.wait_for(Duration::from_secs(5), |node_id, event, state| {
212    ///     matches!(event, Event::BlockReceived { .. })
213    /// }).await?;
214    /// ```
215    pub async fn wait_for(
216        &mut self,
217        duration: Duration,
218        mut f: impl FnMut(ClusterNodeId, &Event, &State) -> bool,
219    ) -> anyhow::Result<Option<(ClusterNodeId, Event)>> {
220        let timeout = redux::Instant::now() + duration;
221        while redux::Instant::now() < timeout {
222            let mut steps = Vec::new();
223            let mut found = None;
224            for (node_id, state, events) in self.runner.pending_events(true) {
225                for (_, event) in events {
226                    if f(node_id, event, state) {
227                        found = Some((node_id, event.clone()));
228                        break;
229                    } else {
230                        let event = event.to_string();
231                        steps.push(ScenarioStep::Event { node_id, event });
232                    }
233                }
234            }
235            for step in steps {
236                self.runner.exec_step(step).await?;
237            }
238            if found.is_some() {
239                return Ok(found);
240            }
241            self.idle(Duration::from_millis(100)).await?;
242        }
243        Ok(None)
244    }
245
246    pub async fn run_until(
247        &mut self,
248        duration: Duration,
249        mut f: impl FnMut(ClusterNodeId, &Event, &State) -> bool,
250    ) -> anyhow::Result<bool> {
251        let timeout = redux::Instant::now() + duration;
252        while redux::Instant::now() < timeout {
253            let mut steps = Vec::new();
254            let mut found = false;
255            'pending_events: for (node_id, state, events) in self.runner.pending_events(true) {
256                for (_, event) in events {
257                    found = f(node_id, event, state);
258                    steps.push(ScenarioStep::Event {
259                        node_id,
260                        event: event.to_string(),
261                    });
262                    if found {
263                        break 'pending_events;
264                    }
265                }
266            }
267            for step in steps {
268                self.runner.exec_step(step).await?;
269            }
270            if found {
271                return Ok(true);
272            }
273            self.idle(Duration::from_millis(100)).await?;
274        }
275        Ok(false)
276    }
277
278    /// Executes all events as steps, until the predicate `f` reports true. The
279    /// predicate is checked each time after executing an event step.
280    pub async fn exec_steps_until(
281        &mut self,
282        duration: Duration,
283        mut f: impl FnMut(ClusterNodeId, &Event, &State) -> bool,
284    ) -> anyhow::Result<bool> {
285        let timeout = redux::Instant::now() + duration;
286        while redux::Instant::now() < timeout {
287            while let Some((node_id, event)) = self.next_event() {
288                let step = ScenarioStep::Event {
289                    node_id,
290                    event: event.to_string(),
291                };
292                self.runner.exec_step(step).await?;
293                let state = self.runner.node(node_id).unwrap().state();
294                if f(node_id, &event, state) {
295                    return Ok(true);
296                }
297            }
298            self.idle(Duration::from_millis(100)).await?;
299        }
300        Ok(false)
301    }
302
303    pub fn next_event(&mut self) -> Option<(ClusterNodeId, Event)> {
304        self.runner
305            .pending_events(true)
306            .find_map(|(node_id, _, mut events)| {
307                events.next().map(|(_, event)| (node_id, event.clone()))
308            })
309    }
310
311    pub async fn trace_steps(&mut self) -> anyhow::Result<()> {
312        loop {
313            while let Some((node_id, event)) = self.next_event() {
314                println!("{node_id} event: {event}");
315                let step = ScenarioStep::Event {
316                    node_id,
317                    event: event.to_string(),
318                };
319                self.runner.exec_step(step).await?;
320                let _state = self.runner.node(node_id).unwrap().state();
321                // println!("{node_id} state: {state:#?}, state = state.p2p");
322            }
323            self.idle(Duration::from_millis(100)).await?;
324        }
325    }
326
327    pub async fn run(&mut self, duration: Duration) -> anyhow::Result<()> {
328        let finish = redux::Instant::now() + duration;
329        while redux::Instant::now() < finish {
330            while let Some((node_id, event)) = self.next_event() {
331                let step = ScenarioStep::Event {
332                    node_id,
333                    event: event.to_string(),
334                };
335                self.runner.exec_step(step).await?;
336                let _state = self.runner.node(node_id).unwrap().state();
337            }
338            self.idle(Duration::from_millis(100)).await?;
339        }
340        Ok(())
341    }
342
343    pub async fn idle(&mut self, duration: Duration) -> anyhow::Result<()> {
344        self.sleep(duration).await;
345        self.runner
346            .exec_step(ScenarioStep::AdvanceTime {
347                by_nanos: duration.as_nanos().try_into()?,
348            })
349            .await?;
350        let nodes = self
351            .runner
352            .nodes_iter()
353            .map(|(node_id, _)| node_id)
354            .collect::<Vec<_>>();
355        for node_id in nodes {
356            self.runner
357                .exec_step(ScenarioStep::CheckTimeouts { node_id })
358                .await?;
359        }
360        Ok(())
361    }
362
363    pub async fn exec_step(&mut self, step: ScenarioStep) -> anyhow::Result<bool> {
364        self.runner.exec_step(step).await
365    }
366
367    pub async fn exec_even_step(
368        &mut self,
369        (node_id, event): (ClusterNodeId, Event),
370    ) -> anyhow::Result<Option<&State>> {
371        let event = event.to_string();
372        let result = if self
373            .runner
374            .exec_step(ScenarioStep::Event { node_id, event })
375            .await?
376        {
377            Some(
378                self.runner
379                    .node(node_id)
380                    .ok_or(anyhow::format_err!("no node {}", node_id.index()))?
381                    .state(),
382            )
383        } else {
384            None
385        };
386        Ok(result)
387    }
388
389    pub fn add_rust_node(
390        &mut self,
391        testing_config: RustNodeTestingConfig,
392    ) -> (ClusterNodeId, PeerId) {
393        let node_id = self.runner.add_rust_node(testing_config);
394        let peer_id = self.runner.node(node_id).unwrap().peer_id();
395        (node_id, peer_id)
396    }
397
398    pub fn add_rust_node_with<Item, F>(
399        &mut self,
400        testing_config: RustNodeTestingConfig,
401        mut f: F,
402    ) -> (ClusterNodeId, Item)
403    where
404        F: FnMut(&State) -> Item,
405    {
406        let node_id = self.runner.add_rust_node(testing_config);
407        let state = self.runner.node(node_id).unwrap().state();
408        let item = f(state);
409        (node_id, item)
410    }
411
412    pub fn inner(&self) -> &ClusterRunner<'_> {
413        &self.runner
414    }
415
416    pub fn inner_mut(&mut self) -> &mut ClusterRunner<'cluster> {
417        &mut self.runner
418    }
419
420    #[allow(dead_code)]
421    pub fn into_inner(self) -> ClusterRunner<'cluster> {
422        self.runner
423    }
424}
425
426/// Runs the cluster until each of the `nodes` is listening on the localhost interface.
427pub async fn wait_for_nodes_listening_on_localhost(
428    driver: &mut Driver<'_>,
429    duration: Duration,
430    nodes: impl IntoIterator<Item = ClusterNodeId>,
431) -> anyhow::Result<bool> {
432    let mut nodes = std::collections::BTreeSet::from_iter(nodes); // TODO: filter out nodes that already listening
433
434    // predicate matching event "listening on localhost interface"
435    let _ip4_localhost = libp2p::multiaddr::Protocol::Ip4("127.0.0.1".parse().unwrap());
436    let pred = |node_id, event: &_, _state: &_| {
437        if let Some(_addr) = as_event_mio_listener_ready(event) {
438            nodes.remove(&node_id);
439            nodes.is_empty()
440        } else {
441            false
442        }
443    };
444
445    // wait for all peers to listen
446    driver.exec_steps_until(duration, pred).await
447}
448
449pub trait PeerPredicate {
450    fn matches(&mut self, node_id: ClusterNodeId, peer_id: &PeerId) -> bool;
451}
452
453impl<F> PeerPredicate for F
454where
455    F: FnMut(ClusterNodeId, &PeerId) -> bool,
456{
457    fn matches(&mut self, node_id: ClusterNodeId, peer_id: &PeerId) -> bool {
458        self(node_id, peer_id)
459    }
460}
461
462impl PeerPredicate for ClusterNodeId {
463    fn matches(&mut self, node_id: ClusterNodeId, _peer_id: &PeerId) -> bool {
464        *self == node_id
465    }
466}
467
468impl PeerPredicate for (ClusterNodeId, &PeerId) {
469    fn matches(&mut self, node_id: ClusterNodeId, peer_id: &PeerId) -> bool {
470        self.0 == node_id && self.1 == peer_id
471    }
472}
473
474impl PeerPredicate for (ClusterNodeId, &mut BTreeSet<PeerId>) {
475    fn matches(&mut self, node_id: ClusterNodeId, peer_id: &PeerId) -> bool {
476        self.0 == node_id && {
477            self.1.remove(peer_id);
478            self.1.is_empty()
479        }
480    }
481}
482
483/// Returns connection peer_id iff the connection is finalized, i.e. multiplexing protocol is
484/// negotiated.
485fn is_network_connection_finalized(conn_state: &P2pNetworkConnectionState) -> Option<&PeerId> {
486    if let P2pNetworkConnectionState {
487        auth:
488            Some(P2pNetworkAuthState::Noise(P2pNetworkNoiseState {
489                inner: Some(P2pNetworkNoiseStateInner::Done { remote_peer_id, .. }),
490                ..
491            })),
492        mux:
493            Some(P2pNetworkConnectionMuxState::Yamux(P2pNetworkYamuxState {
494                terminated: None,
495                init: true,
496                ..
497            })),
498        ..
499    } = conn_state
500    {
501        Some(remote_peer_id)
502    } else {
503        None
504    }
505}
506
507/// Runst the cluster until the node is connected to the node that satisfies the predicate.
508pub async fn wait_for_connection_established<F: PeerPredicate>(
509    driver: &mut Driver<'_>,
510    duration: Duration,
511    mut f: F,
512) -> anyhow::Result<bool> {
513    let pred = |node_id, event: &_, state: &State| {
514        if let Some(addr) = as_event_mio_data_send_receive(event) {
515            let Some(p2p) = state.p2p.ready() else {
516                return false;
517            };
518            let Some(conn_state) = p2p.network.scheduler.connections.get(&addr) else {
519                return false;
520            };
521            if let Some(peer_id) = is_network_connection_finalized(conn_state) {
522                f.matches(node_id, peer_id)
523            } else {
524                false
525            }
526        } else {
527            false
528        }
529    };
530    driver.exec_steps_until(duration, pred).await
531}
532
533// pub async fn wait_for_disconnected<P: PeerPredicate>(
534//     driver: &mut Driver<'_>,
535//     duration: Duration,
536//     mut p: P,
537// ) -> anyhow::Result<bool> {
538//     driver.exec_steps_until(duration, |node_id, event, state| {
539//         if let as_
540//     })
541// }
542
543/// Creates `num` Rust nodes in the cluster
544pub fn add_rust_nodes1<N, T>(driver: &mut Driver, num: N, config: RustNodeTestingConfig) -> T
545where
546    N: Into<u16>,
547    T: FromIterator<(ClusterNodeId, PeerId)>,
548{
549    (0..num.into())
550        .map(|_| driver.add_rust_node(config.clone()))
551        .collect()
552}
553
554pub fn add_rust_nodes<N, NodeIds, PeerIds>(
555    driver: &mut Driver,
556    num: N,
557    config: RustNodeTestingConfig,
558) -> (NodeIds, PeerIds)
559where
560    N: Into<u16>,
561    NodeIds: Default + Extend<ClusterNodeId>,
562    PeerIds: Default + Extend<PeerId>,
563{
564    (0..num.into())
565        .map(|_| driver.add_rust_node(config.clone()))
566        .unzip()
567}
568
569/// Creates `num` Rust nodes in the cluster
570pub fn add_rust_nodes_with<N, NodeIds, Items, Item, F>(
571    driver: &mut Driver,
572    num: N,
573    config: RustNodeTestingConfig,
574    mut f: F,
575) -> (NodeIds, Items)
576where
577    N: Into<u16>,
578    NodeIds: Default + Extend<ClusterNodeId>,
579    Items: Default + Extend<Item>,
580    F: FnMut(&State) -> Item,
581{
582    (0..num.into())
583        .map(|_| driver.add_rust_node_with(config.clone(), &mut f))
584        .unzip()
585}
586
587/// Runs cluster until there is a `quiet_dur` period of no events, returning
588/// `Ok(true)` in this case. If there is no such period for `timeout` period of
589/// time, then returns `Ok(false)`
590pub async fn run_until_no_events(
591    driver: &mut Driver<'_>,
592    quiet_dur: Duration,
593    timeout: Duration,
594) -> anyhow::Result<bool> {
595    let timeout = Instant::now() + timeout;
596    while driver.run_until(quiet_dur, |_, _, _| true).await? {
597        if Instant::now() >= timeout {
598            return Ok(false);
599        }
600    }
601    Ok(true)
602}
603
604pub trait ConnectionPredicate {
605    fn matches(
606        &mut self,
607        node_id: ClusterNodeId,
608        peer_id: &PeerId,
609        peer_status: &P2pPeerStatus,
610    ) -> bool;
611}
612
613pub enum ConnectionPredicates {
614    /// Connection with peer is finalized, either successfully or with error.
615    PeerFinalized(PeerId),
616    PeerIsReady(PeerId),
617    PeerWithErrorStatus(PeerId),
618    PeerDisconnected(PeerId),
619}
620
621impl ConnectionPredicate for (ClusterNodeId, ConnectionPredicates) {
622    fn matches(
623        &mut self,
624        node_id: ClusterNodeId,
625        peer_id: &PeerId,
626        peer_status: &P2pPeerStatus,
627    ) -> bool {
628        node_id == self.0
629            && match &self.1 {
630                ConnectionPredicates::PeerFinalized(pid) => {
631                    peer_id == pid
632                        && match peer_status {
633                            P2pPeerStatus::Connecting(c) => c.is_error(),
634                            P2pPeerStatus::Disconnecting { .. } => true,
635                            P2pPeerStatus::Disconnected { .. } => true,
636                            P2pPeerStatus::Ready(_) => true,
637                        }
638                }
639                ConnectionPredicates::PeerIsReady(pid) => {
640                    peer_id == pid && peer_status.as_ready().is_some()
641                }
642                ConnectionPredicates::PeerWithErrorStatus(pid) => {
643                    peer_id == pid && peer_status.is_error()
644                }
645                ConnectionPredicates::PeerDisconnected(pid) => {
646                    peer_id == pid && matches!(peer_status, P2pPeerStatus::Disconnected { .. })
647                }
648            }
649    }
650}
651
652impl ConnectionPredicates {
653    pub fn peer_with_error_status(peer_id: PeerId) -> Self {
654        ConnectionPredicates::PeerWithErrorStatus(peer_id)
655    }
656
657    pub fn peer_disconnected(peer_id: PeerId) -> Self {
658        ConnectionPredicates::PeerDisconnected(peer_id)
659    }
660}
661
662impl<F> ConnectionPredicate for F
663where
664    F: FnMut(ClusterNodeId, &PeerId, &P2pPeerStatus) -> bool,
665{
666    fn matches(
667        &mut self,
668        node_id: ClusterNodeId,
669        peer_id: &PeerId,
670        peer_status: &P2pPeerStatus,
671    ) -> bool {
672        self(node_id, peer_id, peer_status)
673    }
674}
675
676pub async fn wait_for_connection_event<F>(
677    driver: &mut Driver<'_>,
678    duration: Duration,
679    mut f: F,
680) -> anyhow::Result<bool>
681where
682    F: ConnectionPredicate,
683{
684    let pred = |node_id, event: &_, state: &State| {
685        None.or_else(|| {
686            let addr = as_event_mio_connection_event(event)?;
687            let p2p = state.p2p.ready()?;
688            p2p.peers
689                .iter()
690                .find(|(_, peer)| peer_has_addr(peer, addr))
691                .or_else(|| {
692                    p2p.network
693                        .scheduler
694                        .connections
695                        .get(&addr)
696                        .and_then(|conn_state| conn_state.peer_id())
697                        .and_then(|peer_id| {
698                            p2p.peers
699                                .get(peer_id)
700                                .map(|peer_state| (peer_id, peer_state))
701                        })
702                })
703        })
704        .is_some_and(|(peer_id, peer)| f.matches(node_id, peer_id, &peer.status))
705    };
706    driver.exec_steps_until(duration, pred).await
707}
708
709pub async fn wait_for_connection_error<F>(
710    driver: &mut Driver<'_>,
711    duration: Duration,
712    mut f: F,
713) -> anyhow::Result<bool>
714where
715    F: ConnectionPredicate,
716{
717    let pred = |node_id, event: &_, state: &State| {
718        None.or_else(|| {
719            let addr = as_event_mio_error(event)?;
720            let p2p = state.p2p.ready()?;
721            p2p.peers.iter().find(|(_, peer)| peer_has_addr(peer, addr))
722        })
723        .is_some_and(|(peer_id, peer)| f.matches(node_id, peer_id, &peer.status))
724    };
725    driver.exec_steps_until(duration, pred).await
726}
727
728pub fn get_peer_state<'a>(
729    cluster: &'a ClusterRunner<'_>,
730    node_id: ClusterNodeId,
731    peer_id: &PeerId,
732) -> Option<&'a P2pPeerState> {
733    let store = cluster.node(node_id).expect("node does not exist");
734    store.state().p2p.get_peer(peer_id)
735}
736
737pub fn peer_exists(cluster: &ClusterRunner<'_>, node_id: ClusterNodeId, peer_id: &PeerId) -> bool {
738    get_peer_state(cluster, node_id, peer_id).is_some()
739}
740
741pub fn peer_is_ready(
742    cluster: &ClusterRunner<'_>,
743    node_id: ClusterNodeId,
744    peer_id: &PeerId,
745) -> bool {
746    matches!(
747        get_peer_state(cluster, node_id, peer_id),
748        Some(P2pPeerState {
749            status: P2pPeerStatus::Ready(_),
750            ..
751        })
752    )
753}
754
755pub fn get_p2p_state<'a>(cluster: &'a ClusterRunner<'a>, node_id: ClusterNodeId) -> &'a P2pState {
756    cluster
757        .node(node_id)
758        .expect("node should exist")
759        .state()
760        .p2p
761        .unwrap()
762}
763// pub fn get_peers<'a>(
764//     cluster: &'a ClusterRunner<'a>,
765//     node_id: ClusterNodeId,
766// ) -> impl Iterator<Item = (&'a PeerId, &'a P2pPeerState)> {
767//     cluster
768//         .node(node_id)
769//         .expect("node should exist")
770//         .state()
771//         .p2p
772//         .peers
773//         .iter()
774// }
775
776pub async fn connect_rust_nodes(
777    cluster: &mut ClusterRunner<'_>,
778    dialer: ClusterNodeId,
779    listener: ClusterNodeId,
780) {
781    cluster
782        .exec_step(crate::scenario::ScenarioStep::ConnectNodes {
783            dialer,
784            listener: crate::scenario::ListenerNode::Rust(listener),
785        })
786        .await
787        .expect("connect event should be dispatched");
788}
789
790pub async fn trace_steps(runner: &mut ClusterRunner<'_>) -> anyhow::Result<()> {
791    loop {
792        while let Some((node_id, event)) = next_event(runner) {
793            println!("{node_id} event: {event}");
794            let step = ScenarioStep::Event {
795                node_id,
796                event: event.to_string(),
797            };
798            runner.exec_step(step).await?;
799        }
800        idle(runner, Duration::from_millis(100)).await?;
801    }
802}
803
804pub async fn trace_steps_state<T: Debug, F: Fn(&State) -> T>(
805    runner: &mut ClusterRunner<'_>,
806    f: F,
807) -> anyhow::Result<()> {
808    loop {
809        while let Some((node_id, event)) = next_event(runner) {
810            println!("{node_id} event: {event}");
811            let step = ScenarioStep::Event {
812                node_id,
813                event: event.to_string(),
814            };
815            runner.exec_step(step).await?;
816            let state = runner.node(node_id).unwrap().state();
817            let t = f(state);
818            println!("{node_id} state: {t:#?}");
819        }
820        idle(runner, Duration::from_millis(100)).await?;
821    }
822}
823
824pub async fn idle(runner: &mut ClusterRunner<'_>, duration: Duration) -> anyhow::Result<()> {
825    tokio::time::sleep(duration).await;
826    runner
827        .exec_step(ScenarioStep::AdvanceTime {
828            by_nanos: duration.as_nanos().try_into()?,
829        })
830        .await?;
831    let nodes = runner
832        .nodes_iter()
833        .map(|(node_id, _)| node_id)
834        .collect::<Vec<_>>();
835    for node_id in nodes {
836        runner
837            .exec_step(ScenarioStep::CheckTimeouts { node_id })
838            .await?;
839    }
840    Ok(())
841}
842pub fn next_event(runner: &mut ClusterRunner<'_>) -> Option<(ClusterNodeId, Event)> {
843    runner
844        .pending_events(true)
845        .find_map(|(node_id, _, mut events)| {
846            events.next().map(|(_, event)| (node_id, event.clone()))
847        })
848}