Skip to main content

p2p_testing/
cluster.rs

1use std::{
2    collections::BTreeMap,
3    net::{IpAddr, Ipv4Addr, SocketAddr},
4    ops::Range,
5    task::{ready, Context, Poll},
6    time::{Duration, Instant},
7};
8
9use futures::StreamExt;
10use libp2p::{multiaddr::multiaddr, swarm::DialError, Multiaddr};
11use mina_core::{channels::mpsc, ChainId, Substate, DEVNET_CHAIN_ID};
12use mina_p2p::{
13    connection::outgoing::{
14        P2pConnectionOutgoingAction, P2pConnectionOutgoingInitLibp2pOpts,
15        P2pConnectionOutgoingInitOpts, P2pConnectionOutgoingInitOptsParseError,
16    },
17    identity::SecretKey,
18    P2pCallbacks, P2pConfig, P2pMeshsubConfig, P2pState, PeerId,
19};
20use redux::SystemTime;
21
22use crate::{
23    event::{event_mapper_effect, RustNodeEvent},
24    libp2p_node::{create_swarm, Libp2pEvent, Libp2pNode, Libp2pNodeConfig, Libp2pNodeId},
25    redux::{log_action, Action, State},
26    rust_node::{RustNode, RustNodeConfig, RustNodeId},
27    service::ClusterService,
28    stream::{ClusterStreamExt, MapErrors, TakeDuring},
29    test_node::TestNode,
30};
31
32#[derive(Debug, Default, Clone)]
33pub enum PeerIdConfig {
34    #[default]
35    Derived,
36    Bytes([u8; 32]),
37}
38
39#[derive(Debug, Clone, derive_more::From)]
40pub enum Listener {
41    Rust(RustNodeId),
42    Libp2p(Libp2pNodeId),
43    Multiaddr(Multiaddr),
44    SocketPeerId(SocketAddr, PeerId),
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, derive_more::From)]
48pub enum NodeId {
49    Rust(RustNodeId),
50    Libp2p(Libp2pNodeId),
51}
52
53pub struct Cluster {
54    chain_id: ChainId,
55    ports: Range<u16>,
56    ip: IpAddr,
57
58    rust_nodes: Vec<RustNode>,
59    libp2p_nodes: Vec<Libp2pNode>,
60    last_idle_instant: Instant,
61    idle_interval: tokio::time::Interval,
62    next_poll: NextPoll,
63    timeouts: BTreeMap<usize, Duration>,
64
65    is_error: fn(&ClusterEvent) -> bool,
66    total_duration: Duration,
67}
68
69enum PortsConfig {
70    Range(Range<u16>),
71    Len(u16),
72    ExactLen(u16),
73}
74
75impl PortsConfig {
76    async fn ports(self) -> Result<Range<u16>> {
77        match self {
78            PortsConfig::Range(range) => Ok(range),
79            PortsConfig::Len(len) => GLOBAL_PORTS.take(len).await,
80            PortsConfig::ExactLen(len) => GLOBAL_PORTS.take_exact(len).await,
81        }
82    }
83}
84
85pub struct ClusterBuilder {
86    chain_id: ChainId,
87    ports: Option<PortsConfig>,
88    ip: IpAddr,
89    idle_duration: Duration,
90    is_error: fn(&ClusterEvent) -> bool,
91    total_duration: Duration,
92}
93
94impl Default for ClusterBuilder {
95    fn default() -> Self {
96        ClusterBuilder {
97            chain_id: DEVNET_CHAIN_ID,
98            ports: None,
99            ip: Ipv4Addr::LOCALHOST.into(),
100            idle_duration: Duration::from_millis(100),
101            is_error: super::event::is_error,
102            total_duration: Duration::from_secs(60),
103        }
104    }
105}
106
107impl ClusterBuilder {
108    pub fn new() -> Self {
109        ClusterBuilder::default()
110    }
111
112    pub fn chain_id(mut self, chain_id: ChainId) -> Self {
113        self.chain_id = chain_id;
114        self
115    }
116
117    pub fn ports(mut self, ports: Range<u16>) -> Self {
118        self.ports = Some(PortsConfig::Range(ports));
119        self
120    }
121
122    pub fn ports_with_len(mut self, len: u16) -> Self {
123        self.ports = Some(PortsConfig::Len(len));
124        self
125    }
126
127    pub fn ports_with_exact_len(mut self, len: u16) -> Self {
128        self.ports = Some(PortsConfig::ExactLen(len));
129        self
130    }
131
132    pub fn ip(mut self, ip: IpAddr) -> Self {
133        self.ip = ip;
134        self
135    }
136
137    pub fn idle_duration(mut self, duration: Duration) -> Self {
138        self.idle_duration = duration;
139        self
140    }
141
142    pub fn is_error(mut self, f: fn(&ClusterEvent) -> bool) -> Self {
143        self.is_error = f;
144        self
145    }
146
147    pub fn total_duration(mut self, duration: Duration) -> Self {
148        self.total_duration = duration;
149        self
150    }
151
152    pub async fn start(self) -> Result<Cluster> {
153        *crate::log::LOG;
154
155        let chain_id = self.chain_id;
156        let ports = self
157            .ports
158            .ok_or_else(|| Error::UninitializedField("ports"))?
159            .ports()
160            .await?;
161        let ip = self.ip;
162        let mut idle_interval = tokio::time::interval(self.idle_duration);
163        let last_idle_instant = idle_interval.tick().await.into_std();
164        let is_error = self.is_error;
165        let total_duration = self.total_duration;
166        mina_core::info!(mina_core::log::system_time(); "starting the cluster");
167        let next_poll = Default::default();
168        Ok(Cluster {
169            chain_id,
170            ports,
171            ip,
172            is_error,
173            total_duration,
174            rust_nodes: Default::default(),
175            libp2p_nodes: Default::default(),
176            last_idle_instant,
177            idle_interval,
178            next_poll,
179            timeouts: Default::default(),
180        })
181    }
182}
183
184pub struct Ports {
185    start: tokio::sync::Mutex<u16>,
186    end: u16,
187}
188
189impl Ports {
190    pub fn new(range: Range<u16>) -> Self {
191        Ports {
192            start: range.start.into(),
193            end: range.end,
194        }
195    }
196
197    fn round(u: u16) -> u16 {
198        u.div_ceil(100) * 100
199    }
200
201    pub async fn take(&self, len: u16) -> Result<Range<u16>> {
202        let mut start = self.start.lock().await;
203        let res = Self::round(*start)..Self::round(*start + len);
204        if res.end > self.end {
205            return Err(Error::NoMorePorts);
206        }
207        *start = res.end;
208        Ok(res)
209    }
210
211    pub async fn take_exact(&self, len: u16) -> Result<Range<u16>> {
212        let mut start = self.start.lock().await;
213        let res = *start..(*start + len);
214        if res.end > self.end {
215            return Err(Error::NoMorePorts);
216        }
217        *start += len;
218        Ok(res)
219    }
220}
221
222impl Default for Ports {
223    fn default() -> Self {
224        Self {
225            start: 10000.into(),
226            end: 20000,
227        }
228    }
229}
230
231/// Declares a shared storage for ports.
232///
233/// ```
234/// ports_store!(GLOBAL_PORTS);
235///
236/// #[tokio::test]
237/// fn test1() {
238///     let cluster = ClusterBuilder::default()
239///         .ports(GLOBAL_PORTS.take(20).await.expect("enough ports"))
240///         .start()
241///         .await;
242/// }
243///
244/// ```
245#[macro_export]
246macro_rules! ports_store {
247    ($name:ident, $range:expr) => {
248        $crate::lazy_static::lazy_static! {
249            static ref $name: $crate::cluster::Ports = $crate::cluster::Ports::new($range);
250        }
251    };
252    ($name:ident) => {
253        $crate::lazy_static::lazy_static! {
254            static ref $name: $crate::cluster::Ports = $crate::cluster::Ports::default();
255        }
256    };
257}
258
259ports_store!(GLOBAL_PORTS);
260
261#[derive(Debug, thiserror::Error)]
262pub enum Error {
263    #[error("No more ports")]
264    NoMorePorts,
265    #[error(transparent)]
266    AddrParse(#[from] P2pConnectionOutgoingInitOptsParseError),
267    #[error("uninitialized field `{0}`")]
268    UninitializedField(&'static str),
269    #[error("swarm creation error: {0}")]
270    Libp2pSwarm(String),
271    #[error(transparent)]
272    Libp2pDial(Box<DialError>),
273    #[error("Error occurred: {0}")]
274    Other(String),
275}
276
277impl From<DialError> for Error {
278    fn from(err: DialError) -> Self {
279        Error::Libp2pDial(Box::new(err))
280    }
281}
282
283pub type Result<T> = std::result::Result<T, Error>;
284
285const RUST_NODE_SIG_BYTE: u8 = 0xf0;
286#[allow(dead_code)]
287const LIBP2P_NODE_SIG_BYTE: u8 = 0xf1;
288
289impl Cluster {
290    pub fn next_port(&mut self) -> Result<u16> {
291        self.ports.next().ok_or(Error::NoMorePorts)
292    }
293
294    fn init_opts(&self, listener: Listener) -> Result<P2pConnectionOutgoingInitOpts> {
295        match listener {
296            Listener::Rust(RustNodeId(i)) => {
297                let node = &self.rust_nodes[i];
298                let port = node.libp2p_port();
299                let peer_id = node.peer_id();
300                let host = self.ip.into();
301                Ok(P2pConnectionOutgoingInitOpts::LibP2P(
302                    P2pConnectionOutgoingInitLibp2pOpts {
303                        peer_id,
304                        host,
305                        port,
306                    },
307                ))
308            }
309            Listener::Libp2p(Libp2pNodeId(i)) => {
310                let node = &self.libp2p_nodes[i];
311                let port = node.libp2p_port();
312                let peer_id = node.peer_id();
313                let host = self.ip.into();
314                Ok(P2pConnectionOutgoingInitOpts::LibP2P(
315                    P2pConnectionOutgoingInitLibp2pOpts {
316                        peer_id,
317                        host,
318                        port,
319                    },
320                ))
321            }
322            Listener::Multiaddr(ref maddr) => {
323                Ok(P2pConnectionOutgoingInitOpts::LibP2P(maddr.try_into()?))
324            }
325            Listener::SocketPeerId(socket_addr, peer_id) => Ok(
326                P2pConnectionOutgoingInitOpts::LibP2P((peer_id, socket_addr).into()),
327            ),
328        }
329    }
330
331    fn secret_key(config: PeerIdConfig, index: usize, fill_byte: u8) -> SecretKey {
332        let bytes = match config {
333            PeerIdConfig::Derived => {
334                let mut bytes = [fill_byte; 32];
335                let bytes_len = bytes.len();
336                let i_bytes = index.to_be_bytes();
337                let i = bytes_len - i_bytes.len();
338                bytes[i..bytes_len].copy_from_slice(&i_bytes);
339                bytes
340            }
341            PeerIdConfig::Bytes(bytes) => bytes,
342        };
343        SecretKey::from_bytes(bytes)
344    }
345
346    fn rust_node_config(&mut self, config: RustNodeConfig) -> Result<(P2pConfig, SecretKey)> {
347        let secret_key =
348            Self::secret_key(config.peer_id, self.rust_nodes.len(), RUST_NODE_SIG_BYTE);
349        let libp2p_port = self.next_port()?;
350        let listen_port = self.next_port()?;
351        let initial_peers = config
352            .initial_peers
353            .into_iter()
354            .map(|p| self.init_opts(p))
355            .collect::<Result<_>>()?;
356        let config = P2pConfig {
357            libp2p_port: Some(libp2p_port),
358            listen_port: Some(listen_port),
359            identity_pub_key: secret_key.public_key(),
360            initial_peers,
361            external_addrs: vec![],
362            enabled_channels: mina_p2p::channels::ChannelId::for_libp2p().collect(),
363            peer_discovery: config.discovery,
364            timeouts: config.timeouts,
365            limits: config.limits,
366            meshsub: P2pMeshsubConfig::default(),
367        };
368
369        Ok((config, secret_key))
370    }
371
372    pub fn add_rust_node(&mut self, config: RustNodeConfig) -> Result<RustNodeId> {
373        let override_fn = config.override_fn;
374        let reducer_override_fn = config.override_reducer;
375        let node_idx = self.rust_nodes.len();
376        let (event_sender, event_receiver) = mpsc::unbounded_channel();
377        let (config, secret_key) = self.rust_node_config(config)?;
378        let (cmd_sender, _cmd_receiver) = mpsc::unbounded_channel();
379
380        let service = ClusterService::new(
381            node_idx,
382            secret_key,
383            event_sender,
384            cmd_sender,
385            self.last_idle_instant,
386        );
387
388        let store = crate::redux::Store::new(
389            reducer_override_fn.unwrap_or(|state, action, dispatcher| {
390                let meta = action.meta().clone();
391                let action = action.action();
392
393                log_action(action, &meta, state.0.my_id());
394
395                let time = meta.time();
396                let state_context = Substate::new(state, dispatcher);
397                let result = match action {
398                    Action::P2p(action) => {
399                        P2pState::reducer(state_context, meta.with_action(action.clone()))
400                    }
401                    Action::Idle(_) => P2pState::p2p_timeout_dispatch(state_context, &meta),
402                    Action::P2pEffectful(_) => Ok(()),
403                };
404
405                if let Err(error) = result {
406                    mina_core::warn!(time; "error = {error}");
407                }
408            }),
409            override_fn.unwrap_or(|store, action| {
410                let (action, meta) = action.split();
411                match action {
412                    Action::P2p(a) => {
413                        event_mapper_effect(store, a);
414                    }
415                    Action::P2pEffectful(a) => a.effects(meta, store),
416                    Action::Idle(_) => {
417                        // handled by reducer
418                    }
419                }
420            }),
421            service,
422            SystemTime::now(),
423            State(P2pState::new(
424                config,
425                P2pCallbacks::default(),
426                &self.chain_id,
427            )),
428        );
429
430        let node_id = RustNodeId(self.rust_nodes.len());
431        self.rust_nodes.push(RustNode::new(store, event_receiver));
432        Ok(node_id)
433    }
434
435    pub fn add_libp2p_node(&mut self, config: Libp2pNodeConfig) -> Result<Libp2pNodeId> {
436        let node_id = Libp2pNodeId(self.libp2p_nodes.len());
437        let secret_key = Self::secret_key(config.peer_id, node_id.0, LIBP2P_NODE_SIG_BYTE);
438        let libp2p_port = self.next_port()?;
439
440        let swarm = create_swarm(secret_key, libp2p_port, config.port_reuse, &self.chain_id)
441            .map_err(|err| Error::Libp2pSwarm(err.to_string()))?;
442        self.libp2p_nodes.push(Libp2pNode::new(swarm));
443
444        Ok(node_id)
445    }
446
447    fn rust_dial_opts(&self, listener: Listener) -> Result<P2pConnectionOutgoingInitOpts> {
448        match listener {
449            Listener::Rust(id) => Ok(self.rust_node(id).rust_dial_opts(self.ip)),
450            Listener::Libp2p(id) => Ok(self.libp2p_node(id).rust_dial_opts(self.ip)),
451            Listener::Multiaddr(maddr) => Ok(maddr.try_into().map_err(Error::AddrParse)?),
452            Listener::SocketPeerId(socket, peer_id) => Ok(P2pConnectionOutgoingInitOpts::LibP2P(
453                (peer_id, socket).into(),
454            )),
455        }
456    }
457
458    fn libp2p_dial_opts(&self, listener: Listener) -> Result<Multiaddr> {
459        match listener {
460            Listener::Rust(id) => Ok(self.rust_node(id).libp2p_dial_opts(self.ip)),
461            Listener::Libp2p(id) => Ok(self.libp2p_node(id).libp2p_dial_opts(self.ip)),
462            Listener::Multiaddr(maddr) => Ok(maddr),
463            Listener::SocketPeerId(socket, peer_id) => {
464                let peer_id: libp2p::PeerId = peer_id
465                    .try_into()
466                    .map_err(|_| Error::Other("Listener: invalid peer_id".to_string()))?;
467
468                match socket {
469                    SocketAddr::V4(ipv4) => {
470                        Ok(multiaddr!(Ip4(*ipv4.ip()), Tcp(ipv4.port()), P2p(peer_id)))
471                    }
472                    SocketAddr::V6(ipv6) => {
473                        Ok(multiaddr!(Ip6(*ipv6.ip()), Tcp(ipv6.port()), P2p(peer_id)))
474                    }
475                }
476            }
477        }
478    }
479
480    pub fn connect<T, U>(&mut self, id: T, other: U) -> Result<()>
481    where
482        T: Into<NodeId>,
483        U: Into<Listener>,
484    {
485        match id.into() {
486            NodeId::Rust(id) => {
487                let dial_opts = self.rust_dial_opts(other.into())?;
488                self.rust_node_mut(id)
489                    .dispatch_action(P2pConnectionOutgoingAction::Init {
490                        opts: dial_opts,
491                        rpc_id: None,
492                        on_success: None,
493                    });
494            }
495            NodeId::Libp2p(id) => {
496                let dial_opts = self.libp2p_dial_opts(other.into())?;
497                self.libp2p_node_mut(id).swarm_mut().dial(dial_opts)?;
498            }
499        }
500        Ok(())
501    }
502
503    pub fn rust_node(&self, id: RustNodeId) -> &RustNode {
504        &self.rust_nodes[id.0]
505    }
506
507    pub fn rust_node_mut(&mut self, id: RustNodeId) -> &mut RustNode {
508        &mut self.rust_nodes[id.0]
509    }
510
511    pub fn libp2p_node(&self, id: Libp2pNodeId) -> &Libp2pNode {
512        &self.libp2p_nodes[id.0]
513    }
514
515    pub fn libp2p_node_mut(&mut self, id: Libp2pNodeId) -> &mut Libp2pNode {
516        &mut self.libp2p_nodes[id.0]
517    }
518
519    pub fn timestamp(&self) -> Instant {
520        self.last_idle_instant
521    }
522
523    pub fn peer_id<T>(&self, id: T) -> PeerId
524    where
525        T: Into<NodeId>,
526    {
527        match id.into() {
528            NodeId::Rust(id) => self.rust_node(id).peer_id(),
529            NodeId::Libp2p(id) => self.libp2p_node(id).peer_id(),
530        }
531    }
532}
533
534#[derive(Debug, thiserror::Error)]
535#[error("error event: {:?}", self)]
536pub enum ClusterEvent {
537    Rust {
538        id: RustNodeId,
539        event: RustNodeEvent,
540    },
541    Libp2p {
542        id: Libp2pNodeId,
543        event: Libp2pEvent,
544    },
545    Idle {
546        // TODO(akoptelov): individual events on timeout?
547        instant: Instant,
548    },
549}
550
551impl ClusterEvent {
552    pub fn rust(&self) -> Option<(&RustNodeId, &RustNodeEvent)> {
553        if let ClusterEvent::Rust { id, event } = self {
554            Some((id, event))
555        } else {
556            None
557        }
558    }
559
560    pub fn idle(&self) -> Option<&Instant> {
561        if let ClusterEvent::Idle { instant } = self {
562            Some(instant)
563        } else {
564            None
565        }
566    }
567}
568
569pub trait TimestampEvent {
570    fn timestamp(&self) -> Option<Instant>;
571}
572
573impl TimestampEvent for ClusterEvent {
574    fn timestamp(&self) -> Option<Instant> {
575        if let ClusterEvent::Idle { instant } = self {
576            Some(*instant)
577        } else {
578            None
579        }
580    }
581}
582
583impl<T> TimestampEvent for std::result::Result<ClusterEvent, T> {
584    fn timestamp(&self) -> Option<Instant> {
585        self.as_ref().ok().and_then(|event| event.timestamp())
586    }
587}
588
589pub trait TimestampSource {
590    fn timestamp(&self) -> Instant;
591}
592
593#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
594enum NextPoll {
595    #[default]
596    Idle,
597    Rust(RustNodeId),
598    Libp2p(Libp2pNodeId),
599}
600
601impl Cluster {
602    fn next_poll(&mut self) {
603        self.next_poll = match self.next_poll {
604            NextPoll::Rust(RustNodeId(id)) if id + 1 < self.rust_nodes.len() => {
605                NextPoll::Rust(RustNodeId(id + 1))
606            }
607            NextPoll::Rust(_) if !self.libp2p_nodes.is_empty() => NextPoll::Libp2p(Libp2pNodeId(0)),
608            NextPoll::Rust(_) => NextPoll::Idle,
609            NextPoll::Libp2p(Libp2pNodeId(id)) if id + 1 < self.libp2p_nodes.len() => {
610                NextPoll::Libp2p(Libp2pNodeId(id + 1))
611            }
612            NextPoll::Libp2p(_) => NextPoll::Idle,
613            NextPoll::Idle if !self.rust_nodes.is_empty() => NextPoll::Rust(RustNodeId(0)),
614            NextPoll::Idle if !self.libp2p_nodes.is_empty() => NextPoll::Libp2p(Libp2pNodeId(0)),
615            NextPoll::Idle => NextPoll::Idle,
616        }
617    }
618
619    fn poll_idle(&mut self, cx: &mut Context<'_>) -> Poll<Instant> {
620        Poll::Ready({
621            let instant = ready!(self.idle_interval.poll_tick(cx)).into_std();
622            let dur = instant - self.last_idle_instant;
623            for i in 0..self.rust_nodes.len() {
624                self.timeouts
625                    .entry(i)
626                    .and_modify(|d| *d += dur)
627                    .or_insert(dur);
628            }
629            self.last_idle_instant = instant;
630            instant
631        })
632    }
633
634    fn poll_rust_node(
635        &mut self,
636        id: RustNodeId,
637        cx: &mut Context<'_>,
638    ) -> Poll<Option<RustNodeEvent>> {
639        let rust_node = &mut self.rust_nodes[id.0];
640        let res = if let Some(dur) = self.timeouts.remove(&id.0) {
641            // trigger timeout action and return idle event
642            Poll::Ready(Some(rust_node.idle(dur)))
643        } else {
644            // poll next available event from the node
645            rust_node.poll_next_unpin(cx)
646        };
647        if crate::log::ERROR.swap(false, std::sync::atomic::Ordering::Relaxed) {
648            if let Err(err) = self.dump_state() {
649                eprintln!("error dumping state: {err}");
650            }
651            panic!("error detected");
652        }
653        res
654    }
655
656    fn poll_libp2p_node(
657        &mut self,
658        id: Libp2pNodeId,
659        cx: &mut Context<'_>,
660    ) -> Poll<Option<Libp2pEvent>> {
661        let libp2p_node = &mut self.libp2p_nodes[id.0];
662        Poll::Ready(ready!(libp2p_node.swarm_mut().poll_next_unpin(cx)))
663    }
664
665    fn dump_state(&self) -> std::result::Result<(), Box<dyn std::error::Error>> {
666        let path = std::env::temp_dir().join("p2p-test-node.json");
667        eprintln!("saving state of rust nodes to {:?}", path);
668        let file = std::fs::File::create(path)?;
669        let states = serde_json::Map::from_iter(
670            self.rust_nodes
671                .iter()
672                .map(|node| {
673                    Ok((
674                        node.peer_id().to_string(),
675                        serde_json::to_value(node.state())?,
676                    ))
677                })
678                .collect::<std::result::Result<Vec<_>, serde_json::Error>>()?,
679        );
680        serde_json::to_writer(file, &states)?;
681        Ok(())
682    }
683}
684
685impl ::futures::stream::Stream for Cluster {
686    type Item = ClusterEvent;
687
688    fn poll_next(
689        self: std::pin::Pin<&mut Self>,
690        cx: &mut std::task::Context<'_>,
691    ) -> Poll<Option<Self::Item>> {
692        let this = self.get_mut();
693        let np = this.next_poll;
694        loop {
695            let poll = match this.next_poll {
696                NextPoll::Rust(id) => this
697                    .poll_rust_node(id, cx)
698                    .map(|event| event.map(|event| ClusterEvent::Rust { id, event })),
699                NextPoll::Libp2p(id) => this
700                    .poll_libp2p_node(id, cx)
701                    .map(|event| event.map(|event| ClusterEvent::Libp2p { id, event })),
702                NextPoll::Idle => this
703                    .poll_idle(cx)
704                    .map(|instant| Some(ClusterEvent::Idle { instant })),
705            };
706            if poll.is_ready() {
707                return poll;
708            }
709            this.next_poll();
710            if this.next_poll == np {
711                return Poll::Pending;
712            }
713        }
714    }
715}
716
717impl TimestampSource for Cluster {
718    fn timestamp(&self) -> Instant {
719        self.last_idle_instant
720    }
721}
722
723impl TimestampSource for &mut Cluster {
724    fn timestamp(&self) -> Instant {
725        self.last_idle_instant
726    }
727}
728
729impl Cluster {
730    pub fn stream(&mut self) -> TakeDuring<&mut Cluster> {
731        let duration = self.total_duration;
732        self.take_during(duration)
733    }
734
735    pub fn try_stream(&mut self) -> MapErrors<TakeDuring<&mut Cluster>, ClusterEvent> {
736        let is_error = self.is_error;
737        let duration = self.total_duration;
738        self.take_during(duration).map_errors(is_error)
739    }
740}