p2p_testing/
cluster.rs

1use std::{
2    collections::BTreeMap,
3    net::{IpAddr, Ipv4Addr, SocketAddr},
4    ops::Range,
5    task::{ready, Context, Poll},
6    time::{Duration, Instant},
7};
8
9use futures::StreamExt;
10use libp2p::{multiaddr::multiaddr, swarm::DialError, Multiaddr};
11use openmina_core::{channels::mpsc, ChainId, Substate, DEVNET_CHAIN_ID};
12use p2p::{
13    connection::outgoing::{
14        P2pConnectionOutgoingAction, P2pConnectionOutgoingInitLibp2pOpts,
15        P2pConnectionOutgoingInitOpts, P2pConnectionOutgoingInitOptsParseError,
16    },
17    identity::SecretKey,
18    P2pCallbacks, P2pConfig, P2pMeshsubConfig, P2pState, PeerId,
19};
20use redux::SystemTime;
21
22use crate::{
23    event::{event_mapper_effect, RustNodeEvent},
24    libp2p_node::{create_swarm, Libp2pEvent, Libp2pNode, Libp2pNodeConfig, Libp2pNodeId},
25    redux::{log_action, Action, State},
26    rust_node::{RustNode, RustNodeConfig, RustNodeId},
27    service::ClusterService,
28    stream::{ClusterStreamExt, MapErrors, TakeDuring},
29    test_node::TestNode,
30};
31
32#[derive(Debug, Default, Clone)]
33pub enum PeerIdConfig {
34    #[default]
35    Derived,
36    Bytes([u8; 32]),
37}
38
39#[derive(Debug, Clone, derive_more::From)]
40pub enum Listener {
41    Rust(RustNodeId),
42    Libp2p(Libp2pNodeId),
43    Multiaddr(Multiaddr),
44    SocketPeerId(SocketAddr, PeerId),
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, derive_more::From)]
48pub enum NodeId {
49    Rust(RustNodeId),
50    Libp2p(Libp2pNodeId),
51}
52
53pub struct Cluster {
54    chain_id: ChainId,
55    ports: Range<u16>,
56    ip: IpAddr,
57
58    rust_nodes: Vec<RustNode>,
59    libp2p_nodes: Vec<Libp2pNode>,
60    last_idle_instant: Instant,
61    idle_interval: tokio::time::Interval,
62    next_poll: NextPoll,
63    timeouts: BTreeMap<usize, Duration>,
64
65    is_error: fn(&ClusterEvent) -> bool,
66    total_duration: Duration,
67}
68
69enum PortsConfig {
70    Range(Range<u16>),
71    Len(u16),
72    ExactLen(u16),
73}
74
75impl PortsConfig {
76    async fn ports(self) -> Result<Range<u16>> {
77        match self {
78            PortsConfig::Range(range) => Ok(range),
79            PortsConfig::Len(len) => GLOBAL_PORTS.take(len).await,
80            PortsConfig::ExactLen(len) => GLOBAL_PORTS.take_exact(len).await,
81        }
82    }
83}
84
85pub struct ClusterBuilder {
86    chain_id: ChainId,
87    ports: Option<PortsConfig>,
88    ip: IpAddr,
89    idle_duration: Duration,
90    is_error: fn(&ClusterEvent) -> bool,
91    total_duration: Duration,
92}
93
94impl Default for ClusterBuilder {
95    fn default() -> Self {
96        ClusterBuilder {
97            chain_id: DEVNET_CHAIN_ID,
98            ports: None,
99            ip: Ipv4Addr::LOCALHOST.into(),
100            idle_duration: Duration::from_millis(100),
101            is_error: super::event::is_error,
102            total_duration: Duration::from_secs(60),
103        }
104    }
105}
106
107impl ClusterBuilder {
108    pub fn new() -> Self {
109        ClusterBuilder::default()
110    }
111
112    pub fn chain_id(mut self, chain_id: ChainId) -> Self {
113        self.chain_id = chain_id;
114        self
115    }
116
117    pub fn ports(mut self, ports: Range<u16>) -> Self {
118        self.ports = Some(PortsConfig::Range(ports));
119        self
120    }
121
122    pub fn ports_with_len(mut self, len: u16) -> Self {
123        self.ports = Some(PortsConfig::Len(len));
124        self
125    }
126
127    pub fn ports_with_exact_len(mut self, len: u16) -> Self {
128        self.ports = Some(PortsConfig::ExactLen(len));
129        self
130    }
131
132    pub fn ip(mut self, ip: IpAddr) -> Self {
133        self.ip = ip;
134        self
135    }
136
137    pub fn idle_duration(mut self, duration: Duration) -> Self {
138        self.idle_duration = duration;
139        self
140    }
141
142    pub fn is_error(mut self, f: fn(&ClusterEvent) -> bool) -> Self {
143        self.is_error = f;
144        self
145    }
146
147    pub fn total_duration(mut self, duration: Duration) -> Self {
148        self.total_duration = duration;
149        self
150    }
151
152    pub async fn start(self) -> Result<Cluster> {
153        *crate::log::LOG;
154
155        let chain_id = self.chain_id;
156        let ports = self
157            .ports
158            .ok_or_else(|| Error::UninitializedField("ports"))?
159            .ports()
160            .await?;
161        let ip = self.ip;
162        let mut idle_interval = tokio::time::interval(self.idle_duration);
163        let last_idle_instant = idle_interval.tick().await.into_std();
164        let is_error = self.is_error;
165        let total_duration = self.total_duration;
166        openmina_core::info!(openmina_core::log::system_time(); "starting the cluster");
167        let next_poll = Default::default();
168        Ok(Cluster {
169            chain_id,
170            ports,
171            ip,
172            is_error,
173            total_duration,
174            rust_nodes: Default::default(),
175            libp2p_nodes: Default::default(),
176            last_idle_instant,
177            idle_interval,
178            next_poll,
179            timeouts: Default::default(),
180        })
181    }
182}
183
184pub struct Ports {
185    start: tokio::sync::Mutex<u16>,
186    end: u16,
187}
188
189impl Ports {
190    pub fn new(range: Range<u16>) -> Self {
191        Ports {
192            start: range.start.into(),
193            end: range.end,
194        }
195    }
196
197    fn round(u: u16) -> u16 {
198        ((u + 99) / 100) * 100
199    }
200
201    pub async fn take(&self, len: u16) -> Result<Range<u16>> {
202        let mut start = self.start.lock().await;
203        let res = Self::round(*start)..Self::round(*start + len);
204        if res.end > self.end {
205            return Err(Error::NoMorePorts);
206        }
207        *start = res.end;
208        Ok(res)
209    }
210
211    pub async fn take_exact(&self, len: u16) -> Result<Range<u16>> {
212        let mut start = self.start.lock().await;
213        let res = *start..(*start + len);
214        if res.end > self.end {
215            return Err(Error::NoMorePorts);
216        }
217        *start += len;
218        Ok(res)
219    }
220}
221
222impl Default for Ports {
223    fn default() -> Self {
224        Self {
225            start: 10000.into(),
226            end: 20000,
227        }
228    }
229}
230
231/// Declares a shared storage for ports.
232///
233/// ```
234/// ports_store!(GLOBAL_PORTS);
235///
236/// #[tokio::test]
237/// fn test1() {
238///     let cluster = ClusterBuilder::default()
239///         .ports(GLOBAL_PORTS.take(20).await.expect("enough ports"))
240///         .start()
241///         .await;
242/// }
243///
244/// ```
245#[macro_export]
246macro_rules! ports_store {
247    ($name:ident, $range:expr) => {
248        $crate::lazy_static::lazy_static! {
249            static ref $name: $crate::cluster::Ports = $crate::cluster::Ports::new($range);
250        }
251    };
252    ($name:ident) => {
253        $crate::lazy_static::lazy_static! {
254            static ref $name: $crate::cluster::Ports = $crate::cluster::Ports::default();
255        }
256    };
257}
258
259ports_store!(GLOBAL_PORTS);
260
261#[derive(Debug, thiserror::Error)]
262pub enum Error {
263    #[error("No more ports")]
264    NoMorePorts,
265    #[error(transparent)]
266    AddrParse(#[from] P2pConnectionOutgoingInitOptsParseError),
267    #[error("uninitialized field `{0}`")]
268    UninitializedField(&'static str),
269    #[error("swarm creation error: {0}")]
270    Libp2pSwarm(String),
271    #[error(transparent)]
272    Libp2pDial(#[from] DialError),
273    #[error("Error occurred: {0}")]
274    Other(String),
275}
276
277pub type Result<T> = std::result::Result<T, Error>;
278
279const RUST_NODE_SIG_BYTE: u8 = 0xf0;
280#[allow(dead_code)]
281const LIBP2P_NODE_SIG_BYTE: u8 = 0xf1;
282
283impl Cluster {
284    pub fn next_port(&mut self) -> Result<u16> {
285        self.ports.next().ok_or(Error::NoMorePorts)
286    }
287
288    fn init_opts(&self, listener: Listener) -> Result<P2pConnectionOutgoingInitOpts> {
289        match listener {
290            Listener::Rust(RustNodeId(i)) => {
291                let node = &self.rust_nodes[i];
292                let port = node.libp2p_port();
293                let peer_id = node.peer_id();
294                let host = self.ip.into();
295                Ok(P2pConnectionOutgoingInitOpts::LibP2P(
296                    P2pConnectionOutgoingInitLibp2pOpts {
297                        peer_id,
298                        host,
299                        port,
300                    },
301                ))
302            }
303            Listener::Libp2p(Libp2pNodeId(i)) => {
304                let node = &self.libp2p_nodes[i];
305                let port = node.libp2p_port();
306                let peer_id = node.peer_id();
307                let host = self.ip.into();
308                Ok(P2pConnectionOutgoingInitOpts::LibP2P(
309                    P2pConnectionOutgoingInitLibp2pOpts {
310                        peer_id,
311                        host,
312                        port,
313                    },
314                ))
315            }
316            Listener::Multiaddr(ref maddr) => {
317                Ok(P2pConnectionOutgoingInitOpts::LibP2P(maddr.try_into()?))
318            }
319            Listener::SocketPeerId(socket_addr, peer_id) => Ok(
320                P2pConnectionOutgoingInitOpts::LibP2P((peer_id, socket_addr).into()),
321            ),
322        }
323    }
324
325    fn secret_key(config: PeerIdConfig, index: usize, fill_byte: u8) -> SecretKey {
326        let bytes = match config {
327            PeerIdConfig::Derived => {
328                let mut bytes = [fill_byte; 32];
329                let bytes_len = bytes.len();
330                let i_bytes = index.to_be_bytes();
331                let i = bytes_len - i_bytes.len();
332                bytes[i..bytes_len].copy_from_slice(&i_bytes);
333                bytes
334            }
335            PeerIdConfig::Bytes(bytes) => bytes,
336        };
337        SecretKey::from_bytes(bytes)
338    }
339
340    fn rust_node_config(&mut self, config: RustNodeConfig) -> Result<(P2pConfig, SecretKey)> {
341        let secret_key =
342            Self::secret_key(config.peer_id, self.rust_nodes.len(), RUST_NODE_SIG_BYTE);
343        let libp2p_port = self.next_port()?;
344        let listen_port = self.next_port()?;
345        let initial_peers = config
346            .initial_peers
347            .into_iter()
348            .map(|p| self.init_opts(p))
349            .collect::<Result<_>>()?;
350        let config = P2pConfig {
351            libp2p_port: Some(libp2p_port),
352            listen_port: Some(listen_port),
353            identity_pub_key: secret_key.public_key(),
354            initial_peers,
355            external_addrs: vec![],
356            enabled_channels: p2p::channels::ChannelId::for_libp2p().collect(),
357            peer_discovery: config.discovery,
358            timeouts: config.timeouts,
359            limits: config.limits,
360            meshsub: P2pMeshsubConfig::default(),
361        };
362
363        Ok((config, secret_key))
364    }
365
366    pub fn add_rust_node(&mut self, config: RustNodeConfig) -> Result<RustNodeId> {
367        let override_fn = config.override_fn;
368        let reducer_override_fn = config.override_reducer;
369        let node_idx = self.rust_nodes.len();
370        let (event_sender, event_receiver) = mpsc::unbounded_channel();
371        let (config, secret_key) = self.rust_node_config(config)?;
372        let (cmd_sender, _cmd_receiver) = mpsc::unbounded_channel();
373
374        let service = ClusterService::new(
375            node_idx,
376            secret_key,
377            event_sender,
378            cmd_sender,
379            self.last_idle_instant,
380        );
381
382        let store = crate::redux::Store::new(
383            reducer_override_fn.unwrap_or(|state, action, dispatcher| {
384                let meta = action.meta().clone();
385                let action = action.action();
386
387                log_action(action, &meta, state.0.my_id());
388
389                let time = meta.time();
390                let state_context = Substate::new(state, dispatcher);
391                let result = match action {
392                    Action::P2p(action) => {
393                        P2pState::reducer(state_context, meta.with_action(action.clone()))
394                    }
395                    Action::Idle(_) => P2pState::p2p_timeout_dispatch(state_context, &meta),
396                    Action::P2pEffectful(_) => Ok(()),
397                };
398
399                if let Err(error) = result {
400                    openmina_core::warn!(time; "error = {error}");
401                }
402            }),
403            override_fn.unwrap_or(|store, action| {
404                let (action, meta) = action.split();
405                match action {
406                    Action::P2p(a) => {
407                        event_mapper_effect(store, a);
408                    }
409                    Action::P2pEffectful(a) => a.effects(meta, store),
410                    Action::Idle(_) => {
411                        // handled by reducer
412                    }
413                }
414            }),
415            service,
416            SystemTime::now(),
417            State(P2pState::new(
418                config,
419                P2pCallbacks::default(),
420                &self.chain_id,
421            )),
422        );
423
424        let node_id = RustNodeId(self.rust_nodes.len());
425        self.rust_nodes.push(RustNode::new(store, event_receiver));
426        Ok(node_id)
427    }
428
429    pub fn add_libp2p_node(&mut self, config: Libp2pNodeConfig) -> Result<Libp2pNodeId> {
430        let node_id = Libp2pNodeId(self.libp2p_nodes.len());
431        let secret_key = Self::secret_key(config.peer_id, node_id.0, LIBP2P_NODE_SIG_BYTE);
432        let libp2p_port = self.next_port()?;
433
434        let swarm = create_swarm(secret_key, libp2p_port, config.port_reuse, &self.chain_id)
435            .map_err(|err| Error::Libp2pSwarm(err.to_string()))?;
436        self.libp2p_nodes.push(Libp2pNode::new(swarm));
437
438        Ok(node_id)
439    }
440
441    fn rust_dial_opts(&self, listener: Listener) -> Result<P2pConnectionOutgoingInitOpts> {
442        match listener {
443            Listener::Rust(id) => Ok(self.rust_node(id).rust_dial_opts(self.ip)),
444            Listener::Libp2p(id) => Ok(self.libp2p_node(id).rust_dial_opts(self.ip)),
445            Listener::Multiaddr(maddr) => Ok(maddr.try_into().map_err(Error::AddrParse)?),
446            Listener::SocketPeerId(socket, peer_id) => Ok(P2pConnectionOutgoingInitOpts::LibP2P(
447                (peer_id, socket).into(),
448            )),
449        }
450    }
451
452    fn libp2p_dial_opts(&self, listener: Listener) -> Result<Multiaddr> {
453        match listener {
454            Listener::Rust(id) => Ok(self.rust_node(id).libp2p_dial_opts(self.ip)),
455            Listener::Libp2p(id) => Ok(self.libp2p_node(id).libp2p_dial_opts(self.ip)),
456            Listener::Multiaddr(maddr) => Ok(maddr),
457            Listener::SocketPeerId(socket, peer_id) => {
458                let peer_id: libp2p::PeerId = peer_id
459                    .try_into()
460                    .map_err(|_| Error::Other("Listener: invalid peer_id".to_string()))?;
461
462                match socket {
463                    SocketAddr::V4(ipv4) => {
464                        Ok(multiaddr!(Ip4(*ipv4.ip()), Tcp(ipv4.port()), P2p(peer_id)))
465                    }
466                    SocketAddr::V6(ipv6) => {
467                        Ok(multiaddr!(Ip6(*ipv6.ip()), Tcp(ipv6.port()), P2p(peer_id)))
468                    }
469                }
470            }
471        }
472    }
473
474    pub fn connect<T, U>(&mut self, id: T, other: U) -> Result<()>
475    where
476        T: Into<NodeId>,
477        U: Into<Listener>,
478    {
479        match id.into() {
480            NodeId::Rust(id) => {
481                let dial_opts = self.rust_dial_opts(other.into())?;
482                self.rust_node_mut(id)
483                    .dispatch_action(P2pConnectionOutgoingAction::Init {
484                        opts: dial_opts,
485                        rpc_id: None,
486                        on_success: None,
487                    });
488            }
489            NodeId::Libp2p(id) => {
490                let dial_opts = self.libp2p_dial_opts(other.into())?;
491                self.libp2p_node_mut(id).swarm_mut().dial(dial_opts)?;
492            }
493        }
494        Ok(())
495    }
496
497    pub fn rust_node(&self, id: RustNodeId) -> &RustNode {
498        &self.rust_nodes[id.0]
499    }
500
501    pub fn rust_node_mut(&mut self, id: RustNodeId) -> &mut RustNode {
502        &mut self.rust_nodes[id.0]
503    }
504
505    pub fn libp2p_node(&self, id: Libp2pNodeId) -> &Libp2pNode {
506        &self.libp2p_nodes[id.0]
507    }
508
509    pub fn libp2p_node_mut(&mut self, id: Libp2pNodeId) -> &mut Libp2pNode {
510        &mut self.libp2p_nodes[id.0]
511    }
512
513    pub fn timestamp(&self) -> Instant {
514        self.last_idle_instant
515    }
516
517    pub fn peer_id<T>(&self, id: T) -> PeerId
518    where
519        T: Into<NodeId>,
520    {
521        match id.into() {
522            NodeId::Rust(id) => self.rust_node(id).peer_id(),
523            NodeId::Libp2p(id) => self.libp2p_node(id).peer_id(),
524        }
525    }
526}
527
528#[derive(Debug, thiserror::Error)]
529#[error("error event: {:?}", self)]
530pub enum ClusterEvent {
531    Rust {
532        id: RustNodeId,
533        event: RustNodeEvent,
534    },
535    Libp2p {
536        id: Libp2pNodeId,
537        event: Libp2pEvent,
538    },
539    Idle {
540        // TODO(akoptelov): individual events on timeout?
541        instant: Instant,
542    },
543}
544
545impl ClusterEvent {
546    pub fn rust(&self) -> Option<(&RustNodeId, &RustNodeEvent)> {
547        if let ClusterEvent::Rust { id, event } = self {
548            Some((id, event))
549        } else {
550            None
551        }
552    }
553
554    pub fn idle(&self) -> Option<&Instant> {
555        if let ClusterEvent::Idle { instant } = self {
556            Some(instant)
557        } else {
558            None
559        }
560    }
561}
562
563pub trait TimestampEvent {
564    fn timestamp(&self) -> Option<Instant>;
565}
566
567impl TimestampEvent for ClusterEvent {
568    fn timestamp(&self) -> Option<Instant> {
569        if let ClusterEvent::Idle { instant } = self {
570            Some(*instant)
571        } else {
572            None
573        }
574    }
575}
576
577impl<T> TimestampEvent for std::result::Result<ClusterEvent, T> {
578    fn timestamp(&self) -> Option<Instant> {
579        self.as_ref().ok().and_then(|event| event.timestamp())
580    }
581}
582
583pub trait TimestampSource {
584    fn timestamp(&self) -> Instant;
585}
586
587#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
588enum NextPoll {
589    #[default]
590    Idle,
591    Rust(RustNodeId),
592    Libp2p(Libp2pNodeId),
593}
594
595impl Cluster {
596    fn next_poll(&mut self) {
597        self.next_poll = match self.next_poll {
598            NextPoll::Rust(RustNodeId(id)) if id + 1 < self.rust_nodes.len() => {
599                NextPoll::Rust(RustNodeId(id + 1))
600            }
601            NextPoll::Rust(_) if !self.libp2p_nodes.is_empty() => NextPoll::Libp2p(Libp2pNodeId(0)),
602            NextPoll::Rust(_) => NextPoll::Idle,
603            NextPoll::Libp2p(Libp2pNodeId(id)) if id + 1 < self.libp2p_nodes.len() => {
604                NextPoll::Libp2p(Libp2pNodeId(id + 1))
605            }
606            NextPoll::Libp2p(_) => NextPoll::Idle,
607            NextPoll::Idle if !self.rust_nodes.is_empty() => NextPoll::Rust(RustNodeId(0)),
608            NextPoll::Idle if !self.libp2p_nodes.is_empty() => NextPoll::Libp2p(Libp2pNodeId(0)),
609            NextPoll::Idle => NextPoll::Idle,
610        }
611    }
612
613    fn poll_idle(&mut self, cx: &mut Context<'_>) -> Poll<Instant> {
614        Poll::Ready({
615            let instant = ready!(self.idle_interval.poll_tick(cx)).into_std();
616            let dur = instant - self.last_idle_instant;
617            for i in 0..self.rust_nodes.len() {
618                self.timeouts
619                    .entry(i)
620                    .and_modify(|d| *d += dur)
621                    .or_insert(dur);
622            }
623            self.last_idle_instant = instant;
624            instant
625        })
626    }
627
628    fn poll_rust_node(
629        &mut self,
630        id: RustNodeId,
631        cx: &mut Context<'_>,
632    ) -> Poll<Option<RustNodeEvent>> {
633        let rust_node = &mut self.rust_nodes[id.0];
634        let res = if let Some(dur) = self.timeouts.remove(&id.0) {
635            // trigger timeout action and return idle event
636            Poll::Ready(Some(rust_node.idle(dur)))
637        } else {
638            // poll next available event from the node
639            rust_node.poll_next_unpin(cx)
640        };
641        if crate::log::ERROR.swap(false, std::sync::atomic::Ordering::Relaxed) {
642            if let Err(err) = self.dump_state() {
643                eprintln!("error dumping state: {err}");
644            }
645            panic!("error detected");
646        }
647        res
648    }
649
650    fn poll_libp2p_node(
651        &mut self,
652        id: Libp2pNodeId,
653        cx: &mut Context<'_>,
654    ) -> Poll<Option<Libp2pEvent>> {
655        let libp2p_node = &mut self.libp2p_nodes[id.0];
656        Poll::Ready(ready!(libp2p_node.swarm_mut().poll_next_unpin(cx)))
657    }
658
659    fn dump_state(&self) -> std::result::Result<(), Box<dyn std::error::Error>> {
660        let path = std::env::temp_dir().join("p2p-test-node.json");
661        eprintln!("saving state of rust nodes to {:?}", path);
662        let file = std::fs::File::create(path)?;
663        let states = serde_json::Map::from_iter(
664            self.rust_nodes
665                .iter()
666                .map(|node| {
667                    Ok((
668                        node.peer_id().to_string(),
669                        serde_json::to_value(node.state())?,
670                    ))
671                })
672                .collect::<std::result::Result<Vec<_>, serde_json::Error>>()?,
673        );
674        serde_json::to_writer(file, &states)?;
675        Ok(())
676    }
677}
678
679impl ::futures::stream::Stream for Cluster {
680    type Item = ClusterEvent;
681
682    fn poll_next(
683        self: std::pin::Pin<&mut Self>,
684        cx: &mut std::task::Context<'_>,
685    ) -> Poll<Option<Self::Item>> {
686        let this = self.get_mut();
687        let np = this.next_poll;
688        loop {
689            let poll = match this.next_poll {
690                NextPoll::Rust(id) => this
691                    .poll_rust_node(id, cx)
692                    .map(|event| event.map(|event| ClusterEvent::Rust { id, event })),
693                NextPoll::Libp2p(id) => this
694                    .poll_libp2p_node(id, cx)
695                    .map(|event| event.map(|event| ClusterEvent::Libp2p { id, event })),
696                NextPoll::Idle => this
697                    .poll_idle(cx)
698                    .map(|instant| Some(ClusterEvent::Idle { instant })),
699            };
700            if poll.is_ready() {
701                return poll;
702            }
703            this.next_poll();
704            if this.next_poll == np {
705                return Poll::Pending;
706            }
707        }
708    }
709}
710
711impl TimestampSource for Cluster {
712    fn timestamp(&self) -> Instant {
713        self.last_idle_instant
714    }
715}
716
717impl TimestampSource for &mut Cluster {
718    fn timestamp(&self) -> Instant {
719        self.last_idle_instant
720    }
721}
722
723impl Cluster {
724    pub fn stream(&mut self) -> TakeDuring<&mut Cluster> {
725        let duration = self.total_duration;
726        self.take_during(duration)
727    }
728
729    pub fn try_stream(&mut self) -> MapErrors<TakeDuring<&mut Cluster>, ClusterEvent> {
730        let is_error = self.is_error;
731        let duration = self.total_duration;
732        self.take_during(duration).map_errors(is_error)
733    }
734}