1use std::{
2 collections::BTreeMap,
3 net::{IpAddr, Ipv4Addr, SocketAddr},
4 ops::Range,
5 task::{ready, Context, Poll},
6 time::{Duration, Instant},
7};
8
9use futures::StreamExt;
10use libp2p::{multiaddr::multiaddr, swarm::DialError, Multiaddr};
11use mina_core::{channels::mpsc, ChainId, Substate, DEVNET_CHAIN_ID};
12use mina_p2p::{
13 connection::outgoing::{
14 P2pConnectionOutgoingAction, P2pConnectionOutgoingInitLibp2pOpts,
15 P2pConnectionOutgoingInitOpts, P2pConnectionOutgoingInitOptsParseError,
16 },
17 identity::SecretKey,
18 P2pCallbacks, P2pConfig, P2pMeshsubConfig, P2pState, PeerId,
19};
20use redux::SystemTime;
21
22use crate::{
23 event::{event_mapper_effect, RustNodeEvent},
24 libp2p_node::{create_swarm, Libp2pEvent, Libp2pNode, Libp2pNodeConfig, Libp2pNodeId},
25 redux::{log_action, Action, State},
26 rust_node::{RustNode, RustNodeConfig, RustNodeId},
27 service::ClusterService,
28 stream::{ClusterStreamExt, MapErrors, TakeDuring},
29 test_node::TestNode,
30};
31
32#[derive(Debug, Default, Clone)]
33pub enum PeerIdConfig {
34 #[default]
35 Derived,
36 Bytes([u8; 32]),
37}
38
39#[derive(Debug, Clone, derive_more::From)]
40pub enum Listener {
41 Rust(RustNodeId),
42 Libp2p(Libp2pNodeId),
43 Multiaddr(Multiaddr),
44 SocketPeerId(SocketAddr, PeerId),
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, derive_more::From)]
48pub enum NodeId {
49 Rust(RustNodeId),
50 Libp2p(Libp2pNodeId),
51}
52
53pub struct Cluster {
54 chain_id: ChainId,
55 ports: Range<u16>,
56 ip: IpAddr,
57
58 rust_nodes: Vec<RustNode>,
59 libp2p_nodes: Vec<Libp2pNode>,
60 last_idle_instant: Instant,
61 idle_interval: tokio::time::Interval,
62 next_poll: NextPoll,
63 timeouts: BTreeMap<usize, Duration>,
64
65 is_error: fn(&ClusterEvent) -> bool,
66 total_duration: Duration,
67}
68
69enum PortsConfig {
70 Range(Range<u16>),
71 Len(u16),
72 ExactLen(u16),
73}
74
75impl PortsConfig {
76 async fn ports(self) -> Result<Range<u16>> {
77 match self {
78 PortsConfig::Range(range) => Ok(range),
79 PortsConfig::Len(len) => GLOBAL_PORTS.take(len).await,
80 PortsConfig::ExactLen(len) => GLOBAL_PORTS.take_exact(len).await,
81 }
82 }
83}
84
85pub struct ClusterBuilder {
86 chain_id: ChainId,
87 ports: Option<PortsConfig>,
88 ip: IpAddr,
89 idle_duration: Duration,
90 is_error: fn(&ClusterEvent) -> bool,
91 total_duration: Duration,
92}
93
94impl Default for ClusterBuilder {
95 fn default() -> Self {
96 ClusterBuilder {
97 chain_id: DEVNET_CHAIN_ID,
98 ports: None,
99 ip: Ipv4Addr::LOCALHOST.into(),
100 idle_duration: Duration::from_millis(100),
101 is_error: super::event::is_error,
102 total_duration: Duration::from_secs(60),
103 }
104 }
105}
106
107impl ClusterBuilder {
108 pub fn new() -> Self {
109 ClusterBuilder::default()
110 }
111
112 pub fn chain_id(mut self, chain_id: ChainId) -> Self {
113 self.chain_id = chain_id;
114 self
115 }
116
117 pub fn ports(mut self, ports: Range<u16>) -> Self {
118 self.ports = Some(PortsConfig::Range(ports));
119 self
120 }
121
122 pub fn ports_with_len(mut self, len: u16) -> Self {
123 self.ports = Some(PortsConfig::Len(len));
124 self
125 }
126
127 pub fn ports_with_exact_len(mut self, len: u16) -> Self {
128 self.ports = Some(PortsConfig::ExactLen(len));
129 self
130 }
131
132 pub fn ip(mut self, ip: IpAddr) -> Self {
133 self.ip = ip;
134 self
135 }
136
137 pub fn idle_duration(mut self, duration: Duration) -> Self {
138 self.idle_duration = duration;
139 self
140 }
141
142 pub fn is_error(mut self, f: fn(&ClusterEvent) -> bool) -> Self {
143 self.is_error = f;
144 self
145 }
146
147 pub fn total_duration(mut self, duration: Duration) -> Self {
148 self.total_duration = duration;
149 self
150 }
151
152 pub async fn start(self) -> Result<Cluster> {
153 *crate::log::LOG;
154
155 let chain_id = self.chain_id;
156 let ports = self
157 .ports
158 .ok_or_else(|| Error::UninitializedField("ports"))?
159 .ports()
160 .await?;
161 let ip = self.ip;
162 let mut idle_interval = tokio::time::interval(self.idle_duration);
163 let last_idle_instant = idle_interval.tick().await.into_std();
164 let is_error = self.is_error;
165 let total_duration = self.total_duration;
166 mina_core::info!(mina_core::log::system_time(); "starting the cluster");
167 let next_poll = Default::default();
168 Ok(Cluster {
169 chain_id,
170 ports,
171 ip,
172 is_error,
173 total_duration,
174 rust_nodes: Default::default(),
175 libp2p_nodes: Default::default(),
176 last_idle_instant,
177 idle_interval,
178 next_poll,
179 timeouts: Default::default(),
180 })
181 }
182}
183
184pub struct Ports {
185 start: tokio::sync::Mutex<u16>,
186 end: u16,
187}
188
189impl Ports {
190 pub fn new(range: Range<u16>) -> Self {
191 Ports {
192 start: range.start.into(),
193 end: range.end,
194 }
195 }
196
197 fn round(u: u16) -> u16 {
198 u.div_ceil(100) * 100
199 }
200
201 pub async fn take(&self, len: u16) -> Result<Range<u16>> {
202 let mut start = self.start.lock().await;
203 let res = Self::round(*start)..Self::round(*start + len);
204 if res.end > self.end {
205 return Err(Error::NoMorePorts);
206 }
207 *start = res.end;
208 Ok(res)
209 }
210
211 pub async fn take_exact(&self, len: u16) -> Result<Range<u16>> {
212 let mut start = self.start.lock().await;
213 let res = *start..(*start + len);
214 if res.end > self.end {
215 return Err(Error::NoMorePorts);
216 }
217 *start += len;
218 Ok(res)
219 }
220}
221
222impl Default for Ports {
223 fn default() -> Self {
224 Self {
225 start: 10000.into(),
226 end: 20000,
227 }
228 }
229}
230
231#[macro_export]
246macro_rules! ports_store {
247 ($name:ident, $range:expr) => {
248 $crate::lazy_static::lazy_static! {
249 static ref $name: $crate::cluster::Ports = $crate::cluster::Ports::new($range);
250 }
251 };
252 ($name:ident) => {
253 $crate::lazy_static::lazy_static! {
254 static ref $name: $crate::cluster::Ports = $crate::cluster::Ports::default();
255 }
256 };
257}
258
259ports_store!(GLOBAL_PORTS);
260
261#[derive(Debug, thiserror::Error)]
262pub enum Error {
263 #[error("No more ports")]
264 NoMorePorts,
265 #[error(transparent)]
266 AddrParse(#[from] P2pConnectionOutgoingInitOptsParseError),
267 #[error("uninitialized field `{0}`")]
268 UninitializedField(&'static str),
269 #[error("swarm creation error: {0}")]
270 Libp2pSwarm(String),
271 #[error(transparent)]
272 Libp2pDial(Box<DialError>),
273 #[error("Error occurred: {0}")]
274 Other(String),
275}
276
277impl From<DialError> for Error {
278 fn from(err: DialError) -> Self {
279 Error::Libp2pDial(Box::new(err))
280 }
281}
282
283pub type Result<T> = std::result::Result<T, Error>;
284
285const RUST_NODE_SIG_BYTE: u8 = 0xf0;
286#[allow(dead_code)]
287const LIBP2P_NODE_SIG_BYTE: u8 = 0xf1;
288
289impl Cluster {
290 pub fn next_port(&mut self) -> Result<u16> {
291 self.ports.next().ok_or(Error::NoMorePorts)
292 }
293
294 fn init_opts(&self, listener: Listener) -> Result<P2pConnectionOutgoingInitOpts> {
295 match listener {
296 Listener::Rust(RustNodeId(i)) => {
297 let node = &self.rust_nodes[i];
298 let port = node.libp2p_port();
299 let peer_id = node.peer_id();
300 let host = self.ip.into();
301 Ok(P2pConnectionOutgoingInitOpts::LibP2P(
302 P2pConnectionOutgoingInitLibp2pOpts {
303 peer_id,
304 host,
305 port,
306 },
307 ))
308 }
309 Listener::Libp2p(Libp2pNodeId(i)) => {
310 let node = &self.libp2p_nodes[i];
311 let port = node.libp2p_port();
312 let peer_id = node.peer_id();
313 let host = self.ip.into();
314 Ok(P2pConnectionOutgoingInitOpts::LibP2P(
315 P2pConnectionOutgoingInitLibp2pOpts {
316 peer_id,
317 host,
318 port,
319 },
320 ))
321 }
322 Listener::Multiaddr(ref maddr) => {
323 Ok(P2pConnectionOutgoingInitOpts::LibP2P(maddr.try_into()?))
324 }
325 Listener::SocketPeerId(socket_addr, peer_id) => Ok(
326 P2pConnectionOutgoingInitOpts::LibP2P((peer_id, socket_addr).into()),
327 ),
328 }
329 }
330
331 fn secret_key(config: PeerIdConfig, index: usize, fill_byte: u8) -> SecretKey {
332 let bytes = match config {
333 PeerIdConfig::Derived => {
334 let mut bytes = [fill_byte; 32];
335 let bytes_len = bytes.len();
336 let i_bytes = index.to_be_bytes();
337 let i = bytes_len - i_bytes.len();
338 bytes[i..bytes_len].copy_from_slice(&i_bytes);
339 bytes
340 }
341 PeerIdConfig::Bytes(bytes) => bytes,
342 };
343 SecretKey::from_bytes(bytes)
344 }
345
346 fn rust_node_config(&mut self, config: RustNodeConfig) -> Result<(P2pConfig, SecretKey)> {
347 let secret_key =
348 Self::secret_key(config.peer_id, self.rust_nodes.len(), RUST_NODE_SIG_BYTE);
349 let libp2p_port = self.next_port()?;
350 let listen_port = self.next_port()?;
351 let initial_peers = config
352 .initial_peers
353 .into_iter()
354 .map(|p| self.init_opts(p))
355 .collect::<Result<_>>()?;
356 let config = P2pConfig {
357 libp2p_port: Some(libp2p_port),
358 listen_port: Some(listen_port),
359 identity_pub_key: secret_key.public_key(),
360 initial_peers,
361 external_addrs: vec![],
362 enabled_channels: mina_p2p::channels::ChannelId::for_libp2p().collect(),
363 peer_discovery: config.discovery,
364 timeouts: config.timeouts,
365 limits: config.limits,
366 meshsub: P2pMeshsubConfig::default(),
367 };
368
369 Ok((config, secret_key))
370 }
371
372 pub fn add_rust_node(&mut self, config: RustNodeConfig) -> Result<RustNodeId> {
373 let override_fn = config.override_fn;
374 let reducer_override_fn = config.override_reducer;
375 let node_idx = self.rust_nodes.len();
376 let (event_sender, event_receiver) = mpsc::unbounded_channel();
377 let (config, secret_key) = self.rust_node_config(config)?;
378 let (cmd_sender, _cmd_receiver) = mpsc::unbounded_channel();
379
380 let service = ClusterService::new(
381 node_idx,
382 secret_key,
383 event_sender,
384 cmd_sender,
385 self.last_idle_instant,
386 );
387
388 let store = crate::redux::Store::new(
389 reducer_override_fn.unwrap_or(|state, action, dispatcher| {
390 let meta = action.meta().clone();
391 let action = action.action();
392
393 log_action(action, &meta, state.0.my_id());
394
395 let time = meta.time();
396 let state_context = Substate::new(state, dispatcher);
397 let result = match action {
398 Action::P2p(action) => {
399 P2pState::reducer(state_context, meta.with_action(action.clone()))
400 }
401 Action::Idle(_) => P2pState::p2p_timeout_dispatch(state_context, &meta),
402 Action::P2pEffectful(_) => Ok(()),
403 };
404
405 if let Err(error) = result {
406 mina_core::warn!(time; "error = {error}");
407 }
408 }),
409 override_fn.unwrap_or(|store, action| {
410 let (action, meta) = action.split();
411 match action {
412 Action::P2p(a) => {
413 event_mapper_effect(store, a);
414 }
415 Action::P2pEffectful(a) => a.effects(meta, store),
416 Action::Idle(_) => {
417 }
419 }
420 }),
421 service,
422 SystemTime::now(),
423 State(P2pState::new(
424 config,
425 P2pCallbacks::default(),
426 &self.chain_id,
427 )),
428 );
429
430 let node_id = RustNodeId(self.rust_nodes.len());
431 self.rust_nodes.push(RustNode::new(store, event_receiver));
432 Ok(node_id)
433 }
434
435 pub fn add_libp2p_node(&mut self, config: Libp2pNodeConfig) -> Result<Libp2pNodeId> {
436 let node_id = Libp2pNodeId(self.libp2p_nodes.len());
437 let secret_key = Self::secret_key(config.peer_id, node_id.0, LIBP2P_NODE_SIG_BYTE);
438 let libp2p_port = self.next_port()?;
439
440 let swarm = create_swarm(secret_key, libp2p_port, config.port_reuse, &self.chain_id)
441 .map_err(|err| Error::Libp2pSwarm(err.to_string()))?;
442 self.libp2p_nodes.push(Libp2pNode::new(swarm));
443
444 Ok(node_id)
445 }
446
447 fn rust_dial_opts(&self, listener: Listener) -> Result<P2pConnectionOutgoingInitOpts> {
448 match listener {
449 Listener::Rust(id) => Ok(self.rust_node(id).rust_dial_opts(self.ip)),
450 Listener::Libp2p(id) => Ok(self.libp2p_node(id).rust_dial_opts(self.ip)),
451 Listener::Multiaddr(maddr) => Ok(maddr.try_into().map_err(Error::AddrParse)?),
452 Listener::SocketPeerId(socket, peer_id) => Ok(P2pConnectionOutgoingInitOpts::LibP2P(
453 (peer_id, socket).into(),
454 )),
455 }
456 }
457
458 fn libp2p_dial_opts(&self, listener: Listener) -> Result<Multiaddr> {
459 match listener {
460 Listener::Rust(id) => Ok(self.rust_node(id).libp2p_dial_opts(self.ip)),
461 Listener::Libp2p(id) => Ok(self.libp2p_node(id).libp2p_dial_opts(self.ip)),
462 Listener::Multiaddr(maddr) => Ok(maddr),
463 Listener::SocketPeerId(socket, peer_id) => {
464 let peer_id: libp2p::PeerId = peer_id
465 .try_into()
466 .map_err(|_| Error::Other("Listener: invalid peer_id".to_string()))?;
467
468 match socket {
469 SocketAddr::V4(ipv4) => {
470 Ok(multiaddr!(Ip4(*ipv4.ip()), Tcp(ipv4.port()), P2p(peer_id)))
471 }
472 SocketAddr::V6(ipv6) => {
473 Ok(multiaddr!(Ip6(*ipv6.ip()), Tcp(ipv6.port()), P2p(peer_id)))
474 }
475 }
476 }
477 }
478 }
479
480 pub fn connect<T, U>(&mut self, id: T, other: U) -> Result<()>
481 where
482 T: Into<NodeId>,
483 U: Into<Listener>,
484 {
485 match id.into() {
486 NodeId::Rust(id) => {
487 let dial_opts = self.rust_dial_opts(other.into())?;
488 self.rust_node_mut(id)
489 .dispatch_action(P2pConnectionOutgoingAction::Init {
490 opts: dial_opts,
491 rpc_id: None,
492 on_success: None,
493 });
494 }
495 NodeId::Libp2p(id) => {
496 let dial_opts = self.libp2p_dial_opts(other.into())?;
497 self.libp2p_node_mut(id).swarm_mut().dial(dial_opts)?;
498 }
499 }
500 Ok(())
501 }
502
503 pub fn rust_node(&self, id: RustNodeId) -> &RustNode {
504 &self.rust_nodes[id.0]
505 }
506
507 pub fn rust_node_mut(&mut self, id: RustNodeId) -> &mut RustNode {
508 &mut self.rust_nodes[id.0]
509 }
510
511 pub fn libp2p_node(&self, id: Libp2pNodeId) -> &Libp2pNode {
512 &self.libp2p_nodes[id.0]
513 }
514
515 pub fn libp2p_node_mut(&mut self, id: Libp2pNodeId) -> &mut Libp2pNode {
516 &mut self.libp2p_nodes[id.0]
517 }
518
519 pub fn timestamp(&self) -> Instant {
520 self.last_idle_instant
521 }
522
523 pub fn peer_id<T>(&self, id: T) -> PeerId
524 where
525 T: Into<NodeId>,
526 {
527 match id.into() {
528 NodeId::Rust(id) => self.rust_node(id).peer_id(),
529 NodeId::Libp2p(id) => self.libp2p_node(id).peer_id(),
530 }
531 }
532}
533
534#[derive(Debug, thiserror::Error)]
535#[error("error event: {:?}", self)]
536pub enum ClusterEvent {
537 Rust {
538 id: RustNodeId,
539 event: RustNodeEvent,
540 },
541 Libp2p {
542 id: Libp2pNodeId,
543 event: Libp2pEvent,
544 },
545 Idle {
546 instant: Instant,
548 },
549}
550
551impl ClusterEvent {
552 pub fn rust(&self) -> Option<(&RustNodeId, &RustNodeEvent)> {
553 if let ClusterEvent::Rust { id, event } = self {
554 Some((id, event))
555 } else {
556 None
557 }
558 }
559
560 pub fn idle(&self) -> Option<&Instant> {
561 if let ClusterEvent::Idle { instant } = self {
562 Some(instant)
563 } else {
564 None
565 }
566 }
567}
568
569pub trait TimestampEvent {
570 fn timestamp(&self) -> Option<Instant>;
571}
572
573impl TimestampEvent for ClusterEvent {
574 fn timestamp(&self) -> Option<Instant> {
575 if let ClusterEvent::Idle { instant } = self {
576 Some(*instant)
577 } else {
578 None
579 }
580 }
581}
582
583impl<T> TimestampEvent for std::result::Result<ClusterEvent, T> {
584 fn timestamp(&self) -> Option<Instant> {
585 self.as_ref().ok().and_then(|event| event.timestamp())
586 }
587}
588
589pub trait TimestampSource {
590 fn timestamp(&self) -> Instant;
591}
592
593#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
594enum NextPoll {
595 #[default]
596 Idle,
597 Rust(RustNodeId),
598 Libp2p(Libp2pNodeId),
599}
600
601impl Cluster {
602 fn next_poll(&mut self) {
603 self.next_poll = match self.next_poll {
604 NextPoll::Rust(RustNodeId(id)) if id + 1 < self.rust_nodes.len() => {
605 NextPoll::Rust(RustNodeId(id + 1))
606 }
607 NextPoll::Rust(_) if !self.libp2p_nodes.is_empty() => NextPoll::Libp2p(Libp2pNodeId(0)),
608 NextPoll::Rust(_) => NextPoll::Idle,
609 NextPoll::Libp2p(Libp2pNodeId(id)) if id + 1 < self.libp2p_nodes.len() => {
610 NextPoll::Libp2p(Libp2pNodeId(id + 1))
611 }
612 NextPoll::Libp2p(_) => NextPoll::Idle,
613 NextPoll::Idle if !self.rust_nodes.is_empty() => NextPoll::Rust(RustNodeId(0)),
614 NextPoll::Idle if !self.libp2p_nodes.is_empty() => NextPoll::Libp2p(Libp2pNodeId(0)),
615 NextPoll::Idle => NextPoll::Idle,
616 }
617 }
618
619 fn poll_idle(&mut self, cx: &mut Context<'_>) -> Poll<Instant> {
620 Poll::Ready({
621 let instant = ready!(self.idle_interval.poll_tick(cx)).into_std();
622 let dur = instant - self.last_idle_instant;
623 for i in 0..self.rust_nodes.len() {
624 self.timeouts
625 .entry(i)
626 .and_modify(|d| *d += dur)
627 .or_insert(dur);
628 }
629 self.last_idle_instant = instant;
630 instant
631 })
632 }
633
634 fn poll_rust_node(
635 &mut self,
636 id: RustNodeId,
637 cx: &mut Context<'_>,
638 ) -> Poll<Option<RustNodeEvent>> {
639 let rust_node = &mut self.rust_nodes[id.0];
640 let res = if let Some(dur) = self.timeouts.remove(&id.0) {
641 Poll::Ready(Some(rust_node.idle(dur)))
643 } else {
644 rust_node.poll_next_unpin(cx)
646 };
647 if crate::log::ERROR.swap(false, std::sync::atomic::Ordering::Relaxed) {
648 if let Err(err) = self.dump_state() {
649 eprintln!("error dumping state: {err}");
650 }
651 panic!("error detected");
652 }
653 res
654 }
655
656 fn poll_libp2p_node(
657 &mut self,
658 id: Libp2pNodeId,
659 cx: &mut Context<'_>,
660 ) -> Poll<Option<Libp2pEvent>> {
661 let libp2p_node = &mut self.libp2p_nodes[id.0];
662 Poll::Ready(ready!(libp2p_node.swarm_mut().poll_next_unpin(cx)))
663 }
664
665 fn dump_state(&self) -> std::result::Result<(), Box<dyn std::error::Error>> {
666 let path = std::env::temp_dir().join("p2p-test-node.json");
667 eprintln!("saving state of rust nodes to {:?}", path);
668 let file = std::fs::File::create(path)?;
669 let states = serde_json::Map::from_iter(
670 self.rust_nodes
671 .iter()
672 .map(|node| {
673 Ok((
674 node.peer_id().to_string(),
675 serde_json::to_value(node.state())?,
676 ))
677 })
678 .collect::<std::result::Result<Vec<_>, serde_json::Error>>()?,
679 );
680 serde_json::to_writer(file, &states)?;
681 Ok(())
682 }
683}
684
685impl ::futures::stream::Stream for Cluster {
686 type Item = ClusterEvent;
687
688 fn poll_next(
689 self: std::pin::Pin<&mut Self>,
690 cx: &mut std::task::Context<'_>,
691 ) -> Poll<Option<Self::Item>> {
692 let this = self.get_mut();
693 let np = this.next_poll;
694 loop {
695 let poll = match this.next_poll {
696 NextPoll::Rust(id) => this
697 .poll_rust_node(id, cx)
698 .map(|event| event.map(|event| ClusterEvent::Rust { id, event })),
699 NextPoll::Libp2p(id) => this
700 .poll_libp2p_node(id, cx)
701 .map(|event| event.map(|event| ClusterEvent::Libp2p { id, event })),
702 NextPoll::Idle => this
703 .poll_idle(cx)
704 .map(|instant| Some(ClusterEvent::Idle { instant })),
705 };
706 if poll.is_ready() {
707 return poll;
708 }
709 this.next_poll();
710 if this.next_poll == np {
711 return Poll::Pending;
712 }
713 }
714 }
715}
716
717impl TimestampSource for Cluster {
718 fn timestamp(&self) -> Instant {
719 self.last_idle_instant
720 }
721}
722
723impl TimestampSource for &mut Cluster {
724 fn timestamp(&self) -> Instant {
725 self.last_idle_instant
726 }
727}
728
729impl Cluster {
730 pub fn stream(&mut self) -> TakeDuring<&mut Cluster> {
731 let duration = self.total_duration;
732 self.take_during(duration)
733 }
734
735 pub fn try_stream(&mut self) -> MapErrors<TakeDuring<&mut Cluster>, ClusterEvent> {
736 let is_error = self.is_error;
737 let duration = self.total_duration;
738 self.take_during(duration).map_errors(is_error)
739 }
740}