1use std::{
2 collections::BTreeMap,
3 net::{IpAddr, Ipv4Addr, SocketAddr},
4 ops::Range,
5 task::{ready, Context, Poll},
6 time::{Duration, Instant},
7};
8
9use futures::StreamExt;
10use libp2p::{multiaddr::multiaddr, swarm::DialError, Multiaddr};
11use openmina_core::{channels::mpsc, ChainId, Substate, DEVNET_CHAIN_ID};
12use p2p::{
13 connection::outgoing::{
14 P2pConnectionOutgoingAction, P2pConnectionOutgoingInitLibp2pOpts,
15 P2pConnectionOutgoingInitOpts, P2pConnectionOutgoingInitOptsParseError,
16 },
17 identity::SecretKey,
18 P2pCallbacks, P2pConfig, P2pMeshsubConfig, P2pState, PeerId,
19};
20use redux::SystemTime;
21
22use crate::{
23 event::{event_mapper_effect, RustNodeEvent},
24 libp2p_node::{create_swarm, Libp2pEvent, Libp2pNode, Libp2pNodeConfig, Libp2pNodeId},
25 redux::{log_action, Action, State},
26 rust_node::{RustNode, RustNodeConfig, RustNodeId},
27 service::ClusterService,
28 stream::{ClusterStreamExt, MapErrors, TakeDuring},
29 test_node::TestNode,
30};
31
32#[derive(Debug, Default, Clone)]
33pub enum PeerIdConfig {
34 #[default]
35 Derived,
36 Bytes([u8; 32]),
37}
38
39#[derive(Debug, Clone, derive_more::From)]
40pub enum Listener {
41 Rust(RustNodeId),
42 Libp2p(Libp2pNodeId),
43 Multiaddr(Multiaddr),
44 SocketPeerId(SocketAddr, PeerId),
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, derive_more::From)]
48pub enum NodeId {
49 Rust(RustNodeId),
50 Libp2p(Libp2pNodeId),
51}
52
53pub struct Cluster {
54 chain_id: ChainId,
55 ports: Range<u16>,
56 ip: IpAddr,
57
58 rust_nodes: Vec<RustNode>,
59 libp2p_nodes: Vec<Libp2pNode>,
60 last_idle_instant: Instant,
61 idle_interval: tokio::time::Interval,
62 next_poll: NextPoll,
63 timeouts: BTreeMap<usize, Duration>,
64
65 is_error: fn(&ClusterEvent) -> bool,
66 total_duration: Duration,
67}
68
69enum PortsConfig {
70 Range(Range<u16>),
71 Len(u16),
72 ExactLen(u16),
73}
74
75impl PortsConfig {
76 async fn ports(self) -> Result<Range<u16>> {
77 match self {
78 PortsConfig::Range(range) => Ok(range),
79 PortsConfig::Len(len) => GLOBAL_PORTS.take(len).await,
80 PortsConfig::ExactLen(len) => GLOBAL_PORTS.take_exact(len).await,
81 }
82 }
83}
84
85pub struct ClusterBuilder {
86 chain_id: ChainId,
87 ports: Option<PortsConfig>,
88 ip: IpAddr,
89 idle_duration: Duration,
90 is_error: fn(&ClusterEvent) -> bool,
91 total_duration: Duration,
92}
93
94impl Default for ClusterBuilder {
95 fn default() -> Self {
96 ClusterBuilder {
97 chain_id: DEVNET_CHAIN_ID,
98 ports: None,
99 ip: Ipv4Addr::LOCALHOST.into(),
100 idle_duration: Duration::from_millis(100),
101 is_error: super::event::is_error,
102 total_duration: Duration::from_secs(60),
103 }
104 }
105}
106
107impl ClusterBuilder {
108 pub fn new() -> Self {
109 ClusterBuilder::default()
110 }
111
112 pub fn chain_id(mut self, chain_id: ChainId) -> Self {
113 self.chain_id = chain_id;
114 self
115 }
116
117 pub fn ports(mut self, ports: Range<u16>) -> Self {
118 self.ports = Some(PortsConfig::Range(ports));
119 self
120 }
121
122 pub fn ports_with_len(mut self, len: u16) -> Self {
123 self.ports = Some(PortsConfig::Len(len));
124 self
125 }
126
127 pub fn ports_with_exact_len(mut self, len: u16) -> Self {
128 self.ports = Some(PortsConfig::ExactLen(len));
129 self
130 }
131
132 pub fn ip(mut self, ip: IpAddr) -> Self {
133 self.ip = ip;
134 self
135 }
136
137 pub fn idle_duration(mut self, duration: Duration) -> Self {
138 self.idle_duration = duration;
139 self
140 }
141
142 pub fn is_error(mut self, f: fn(&ClusterEvent) -> bool) -> Self {
143 self.is_error = f;
144 self
145 }
146
147 pub fn total_duration(mut self, duration: Duration) -> Self {
148 self.total_duration = duration;
149 self
150 }
151
152 pub async fn start(self) -> Result<Cluster> {
153 *crate::log::LOG;
154
155 let chain_id = self.chain_id;
156 let ports = self
157 .ports
158 .ok_or_else(|| Error::UninitializedField("ports"))?
159 .ports()
160 .await?;
161 let ip = self.ip;
162 let mut idle_interval = tokio::time::interval(self.idle_duration);
163 let last_idle_instant = idle_interval.tick().await.into_std();
164 let is_error = self.is_error;
165 let total_duration = self.total_duration;
166 openmina_core::info!(openmina_core::log::system_time(); "starting the cluster");
167 let next_poll = Default::default();
168 Ok(Cluster {
169 chain_id,
170 ports,
171 ip,
172 is_error,
173 total_duration,
174 rust_nodes: Default::default(),
175 libp2p_nodes: Default::default(),
176 last_idle_instant,
177 idle_interval,
178 next_poll,
179 timeouts: Default::default(),
180 })
181 }
182}
183
184pub struct Ports {
185 start: tokio::sync::Mutex<u16>,
186 end: u16,
187}
188
189impl Ports {
190 pub fn new(range: Range<u16>) -> Self {
191 Ports {
192 start: range.start.into(),
193 end: range.end,
194 }
195 }
196
197 fn round(u: u16) -> u16 {
198 ((u + 99) / 100) * 100
199 }
200
201 pub async fn take(&self, len: u16) -> Result<Range<u16>> {
202 let mut start = self.start.lock().await;
203 let res = Self::round(*start)..Self::round(*start + len);
204 if res.end > self.end {
205 return Err(Error::NoMorePorts);
206 }
207 *start = res.end;
208 Ok(res)
209 }
210
211 pub async fn take_exact(&self, len: u16) -> Result<Range<u16>> {
212 let mut start = self.start.lock().await;
213 let res = *start..(*start + len);
214 if res.end > self.end {
215 return Err(Error::NoMorePorts);
216 }
217 *start += len;
218 Ok(res)
219 }
220}
221
222impl Default for Ports {
223 fn default() -> Self {
224 Self {
225 start: 10000.into(),
226 end: 20000,
227 }
228 }
229}
230
231#[macro_export]
246macro_rules! ports_store {
247 ($name:ident, $range:expr) => {
248 $crate::lazy_static::lazy_static! {
249 static ref $name: $crate::cluster::Ports = $crate::cluster::Ports::new($range);
250 }
251 };
252 ($name:ident) => {
253 $crate::lazy_static::lazy_static! {
254 static ref $name: $crate::cluster::Ports = $crate::cluster::Ports::default();
255 }
256 };
257}
258
259ports_store!(GLOBAL_PORTS);
260
261#[derive(Debug, thiserror::Error)]
262pub enum Error {
263 #[error("No more ports")]
264 NoMorePorts,
265 #[error(transparent)]
266 AddrParse(#[from] P2pConnectionOutgoingInitOptsParseError),
267 #[error("uninitialized field `{0}`")]
268 UninitializedField(&'static str),
269 #[error("swarm creation error: {0}")]
270 Libp2pSwarm(String),
271 #[error(transparent)]
272 Libp2pDial(#[from] DialError),
273 #[error("Error occurred: {0}")]
274 Other(String),
275}
276
277pub type Result<T> = std::result::Result<T, Error>;
278
279const RUST_NODE_SIG_BYTE: u8 = 0xf0;
280#[allow(dead_code)]
281const LIBP2P_NODE_SIG_BYTE: u8 = 0xf1;
282
283impl Cluster {
284 pub fn next_port(&mut self) -> Result<u16> {
285 self.ports.next().ok_or(Error::NoMorePorts)
286 }
287
288 fn init_opts(&self, listener: Listener) -> Result<P2pConnectionOutgoingInitOpts> {
289 match listener {
290 Listener::Rust(RustNodeId(i)) => {
291 let node = &self.rust_nodes[i];
292 let port = node.libp2p_port();
293 let peer_id = node.peer_id();
294 let host = self.ip.into();
295 Ok(P2pConnectionOutgoingInitOpts::LibP2P(
296 P2pConnectionOutgoingInitLibp2pOpts {
297 peer_id,
298 host,
299 port,
300 },
301 ))
302 }
303 Listener::Libp2p(Libp2pNodeId(i)) => {
304 let node = &self.libp2p_nodes[i];
305 let port = node.libp2p_port();
306 let peer_id = node.peer_id();
307 let host = self.ip.into();
308 Ok(P2pConnectionOutgoingInitOpts::LibP2P(
309 P2pConnectionOutgoingInitLibp2pOpts {
310 peer_id,
311 host,
312 port,
313 },
314 ))
315 }
316 Listener::Multiaddr(ref maddr) => {
317 Ok(P2pConnectionOutgoingInitOpts::LibP2P(maddr.try_into()?))
318 }
319 Listener::SocketPeerId(socket_addr, peer_id) => Ok(
320 P2pConnectionOutgoingInitOpts::LibP2P((peer_id, socket_addr).into()),
321 ),
322 }
323 }
324
325 fn secret_key(config: PeerIdConfig, index: usize, fill_byte: u8) -> SecretKey {
326 let bytes = match config {
327 PeerIdConfig::Derived => {
328 let mut bytes = [fill_byte; 32];
329 let bytes_len = bytes.len();
330 let i_bytes = index.to_be_bytes();
331 let i = bytes_len - i_bytes.len();
332 bytes[i..bytes_len].copy_from_slice(&i_bytes);
333 bytes
334 }
335 PeerIdConfig::Bytes(bytes) => bytes,
336 };
337 SecretKey::from_bytes(bytes)
338 }
339
340 fn rust_node_config(&mut self, config: RustNodeConfig) -> Result<(P2pConfig, SecretKey)> {
341 let secret_key =
342 Self::secret_key(config.peer_id, self.rust_nodes.len(), RUST_NODE_SIG_BYTE);
343 let libp2p_port = self.next_port()?;
344 let listen_port = self.next_port()?;
345 let initial_peers = config
346 .initial_peers
347 .into_iter()
348 .map(|p| self.init_opts(p))
349 .collect::<Result<_>>()?;
350 let config = P2pConfig {
351 libp2p_port: Some(libp2p_port),
352 listen_port: Some(listen_port),
353 identity_pub_key: secret_key.public_key(),
354 initial_peers,
355 external_addrs: vec![],
356 enabled_channels: p2p::channels::ChannelId::for_libp2p().collect(),
357 peer_discovery: config.discovery,
358 timeouts: config.timeouts,
359 limits: config.limits,
360 meshsub: P2pMeshsubConfig::default(),
361 };
362
363 Ok((config, secret_key))
364 }
365
366 pub fn add_rust_node(&mut self, config: RustNodeConfig) -> Result<RustNodeId> {
367 let override_fn = config.override_fn;
368 let reducer_override_fn = config.override_reducer;
369 let node_idx = self.rust_nodes.len();
370 let (event_sender, event_receiver) = mpsc::unbounded_channel();
371 let (config, secret_key) = self.rust_node_config(config)?;
372 let (cmd_sender, _cmd_receiver) = mpsc::unbounded_channel();
373
374 let service = ClusterService::new(
375 node_idx,
376 secret_key,
377 event_sender,
378 cmd_sender,
379 self.last_idle_instant,
380 );
381
382 let store = crate::redux::Store::new(
383 reducer_override_fn.unwrap_or(|state, action, dispatcher| {
384 let meta = action.meta().clone();
385 let action = action.action();
386
387 log_action(action, &meta, state.0.my_id());
388
389 let time = meta.time();
390 let state_context = Substate::new(state, dispatcher);
391 let result = match action {
392 Action::P2p(action) => {
393 P2pState::reducer(state_context, meta.with_action(action.clone()))
394 }
395 Action::Idle(_) => P2pState::p2p_timeout_dispatch(state_context, &meta),
396 Action::P2pEffectful(_) => Ok(()),
397 };
398
399 if let Err(error) = result {
400 openmina_core::warn!(time; "error = {error}");
401 }
402 }),
403 override_fn.unwrap_or(|store, action| {
404 let (action, meta) = action.split();
405 match action {
406 Action::P2p(a) => {
407 event_mapper_effect(store, a);
408 }
409 Action::P2pEffectful(a) => a.effects(meta, store),
410 Action::Idle(_) => {
411 }
413 }
414 }),
415 service,
416 SystemTime::now(),
417 State(P2pState::new(
418 config,
419 P2pCallbacks::default(),
420 &self.chain_id,
421 )),
422 );
423
424 let node_id = RustNodeId(self.rust_nodes.len());
425 self.rust_nodes.push(RustNode::new(store, event_receiver));
426 Ok(node_id)
427 }
428
429 pub fn add_libp2p_node(&mut self, config: Libp2pNodeConfig) -> Result<Libp2pNodeId> {
430 let node_id = Libp2pNodeId(self.libp2p_nodes.len());
431 let secret_key = Self::secret_key(config.peer_id, node_id.0, LIBP2P_NODE_SIG_BYTE);
432 let libp2p_port = self.next_port()?;
433
434 let swarm = create_swarm(secret_key, libp2p_port, config.port_reuse, &self.chain_id)
435 .map_err(|err| Error::Libp2pSwarm(err.to_string()))?;
436 self.libp2p_nodes.push(Libp2pNode::new(swarm));
437
438 Ok(node_id)
439 }
440
441 fn rust_dial_opts(&self, listener: Listener) -> Result<P2pConnectionOutgoingInitOpts> {
442 match listener {
443 Listener::Rust(id) => Ok(self.rust_node(id).rust_dial_opts(self.ip)),
444 Listener::Libp2p(id) => Ok(self.libp2p_node(id).rust_dial_opts(self.ip)),
445 Listener::Multiaddr(maddr) => Ok(maddr.try_into().map_err(Error::AddrParse)?),
446 Listener::SocketPeerId(socket, peer_id) => Ok(P2pConnectionOutgoingInitOpts::LibP2P(
447 (peer_id, socket).into(),
448 )),
449 }
450 }
451
452 fn libp2p_dial_opts(&self, listener: Listener) -> Result<Multiaddr> {
453 match listener {
454 Listener::Rust(id) => Ok(self.rust_node(id).libp2p_dial_opts(self.ip)),
455 Listener::Libp2p(id) => Ok(self.libp2p_node(id).libp2p_dial_opts(self.ip)),
456 Listener::Multiaddr(maddr) => Ok(maddr),
457 Listener::SocketPeerId(socket, peer_id) => {
458 let peer_id: libp2p::PeerId = peer_id
459 .try_into()
460 .map_err(|_| Error::Other("Listener: invalid peer_id".to_string()))?;
461
462 match socket {
463 SocketAddr::V4(ipv4) => {
464 Ok(multiaddr!(Ip4(*ipv4.ip()), Tcp(ipv4.port()), P2p(peer_id)))
465 }
466 SocketAddr::V6(ipv6) => {
467 Ok(multiaddr!(Ip6(*ipv6.ip()), Tcp(ipv6.port()), P2p(peer_id)))
468 }
469 }
470 }
471 }
472 }
473
474 pub fn connect<T, U>(&mut self, id: T, other: U) -> Result<()>
475 where
476 T: Into<NodeId>,
477 U: Into<Listener>,
478 {
479 match id.into() {
480 NodeId::Rust(id) => {
481 let dial_opts = self.rust_dial_opts(other.into())?;
482 self.rust_node_mut(id)
483 .dispatch_action(P2pConnectionOutgoingAction::Init {
484 opts: dial_opts,
485 rpc_id: None,
486 on_success: None,
487 });
488 }
489 NodeId::Libp2p(id) => {
490 let dial_opts = self.libp2p_dial_opts(other.into())?;
491 self.libp2p_node_mut(id).swarm_mut().dial(dial_opts)?;
492 }
493 }
494 Ok(())
495 }
496
497 pub fn rust_node(&self, id: RustNodeId) -> &RustNode {
498 &self.rust_nodes[id.0]
499 }
500
501 pub fn rust_node_mut(&mut self, id: RustNodeId) -> &mut RustNode {
502 &mut self.rust_nodes[id.0]
503 }
504
505 pub fn libp2p_node(&self, id: Libp2pNodeId) -> &Libp2pNode {
506 &self.libp2p_nodes[id.0]
507 }
508
509 pub fn libp2p_node_mut(&mut self, id: Libp2pNodeId) -> &mut Libp2pNode {
510 &mut self.libp2p_nodes[id.0]
511 }
512
513 pub fn timestamp(&self) -> Instant {
514 self.last_idle_instant
515 }
516
517 pub fn peer_id<T>(&self, id: T) -> PeerId
518 where
519 T: Into<NodeId>,
520 {
521 match id.into() {
522 NodeId::Rust(id) => self.rust_node(id).peer_id(),
523 NodeId::Libp2p(id) => self.libp2p_node(id).peer_id(),
524 }
525 }
526}
527
528#[derive(Debug, thiserror::Error)]
529#[error("error event: {:?}", self)]
530pub enum ClusterEvent {
531 Rust {
532 id: RustNodeId,
533 event: RustNodeEvent,
534 },
535 Libp2p {
536 id: Libp2pNodeId,
537 event: Libp2pEvent,
538 },
539 Idle {
540 instant: Instant,
542 },
543}
544
545impl ClusterEvent {
546 pub fn rust(&self) -> Option<(&RustNodeId, &RustNodeEvent)> {
547 if let ClusterEvent::Rust { id, event } = self {
548 Some((id, event))
549 } else {
550 None
551 }
552 }
553
554 pub fn idle(&self) -> Option<&Instant> {
555 if let ClusterEvent::Idle { instant } = self {
556 Some(instant)
557 } else {
558 None
559 }
560 }
561}
562
563pub trait TimestampEvent {
564 fn timestamp(&self) -> Option<Instant>;
565}
566
567impl TimestampEvent for ClusterEvent {
568 fn timestamp(&self) -> Option<Instant> {
569 if let ClusterEvent::Idle { instant } = self {
570 Some(*instant)
571 } else {
572 None
573 }
574 }
575}
576
577impl<T> TimestampEvent for std::result::Result<ClusterEvent, T> {
578 fn timestamp(&self) -> Option<Instant> {
579 self.as_ref().ok().and_then(|event| event.timestamp())
580 }
581}
582
583pub trait TimestampSource {
584 fn timestamp(&self) -> Instant;
585}
586
587#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
588enum NextPoll {
589 #[default]
590 Idle,
591 Rust(RustNodeId),
592 Libp2p(Libp2pNodeId),
593}
594
595impl Cluster {
596 fn next_poll(&mut self) {
597 self.next_poll = match self.next_poll {
598 NextPoll::Rust(RustNodeId(id)) if id + 1 < self.rust_nodes.len() => {
599 NextPoll::Rust(RustNodeId(id + 1))
600 }
601 NextPoll::Rust(_) if !self.libp2p_nodes.is_empty() => NextPoll::Libp2p(Libp2pNodeId(0)),
602 NextPoll::Rust(_) => NextPoll::Idle,
603 NextPoll::Libp2p(Libp2pNodeId(id)) if id + 1 < self.libp2p_nodes.len() => {
604 NextPoll::Libp2p(Libp2pNodeId(id + 1))
605 }
606 NextPoll::Libp2p(_) => NextPoll::Idle,
607 NextPoll::Idle if !self.rust_nodes.is_empty() => NextPoll::Rust(RustNodeId(0)),
608 NextPoll::Idle if !self.libp2p_nodes.is_empty() => NextPoll::Libp2p(Libp2pNodeId(0)),
609 NextPoll::Idle => NextPoll::Idle,
610 }
611 }
612
613 fn poll_idle(&mut self, cx: &mut Context<'_>) -> Poll<Instant> {
614 Poll::Ready({
615 let instant = ready!(self.idle_interval.poll_tick(cx)).into_std();
616 let dur = instant - self.last_idle_instant;
617 for i in 0..self.rust_nodes.len() {
618 self.timeouts
619 .entry(i)
620 .and_modify(|d| *d += dur)
621 .or_insert(dur);
622 }
623 self.last_idle_instant = instant;
624 instant
625 })
626 }
627
628 fn poll_rust_node(
629 &mut self,
630 id: RustNodeId,
631 cx: &mut Context<'_>,
632 ) -> Poll<Option<RustNodeEvent>> {
633 let rust_node = &mut self.rust_nodes[id.0];
634 let res = if let Some(dur) = self.timeouts.remove(&id.0) {
635 Poll::Ready(Some(rust_node.idle(dur)))
637 } else {
638 rust_node.poll_next_unpin(cx)
640 };
641 if crate::log::ERROR.swap(false, std::sync::atomic::Ordering::Relaxed) {
642 if let Err(err) = self.dump_state() {
643 eprintln!("error dumping state: {err}");
644 }
645 panic!("error detected");
646 }
647 res
648 }
649
650 fn poll_libp2p_node(
651 &mut self,
652 id: Libp2pNodeId,
653 cx: &mut Context<'_>,
654 ) -> Poll<Option<Libp2pEvent>> {
655 let libp2p_node = &mut self.libp2p_nodes[id.0];
656 Poll::Ready(ready!(libp2p_node.swarm_mut().poll_next_unpin(cx)))
657 }
658
659 fn dump_state(&self) -> std::result::Result<(), Box<dyn std::error::Error>> {
660 let path = std::env::temp_dir().join("p2p-test-node.json");
661 eprintln!("saving state of rust nodes to {:?}", path);
662 let file = std::fs::File::create(path)?;
663 let states = serde_json::Map::from_iter(
664 self.rust_nodes
665 .iter()
666 .map(|node| {
667 Ok((
668 node.peer_id().to_string(),
669 serde_json::to_value(node.state())?,
670 ))
671 })
672 .collect::<std::result::Result<Vec<_>, serde_json::Error>>()?,
673 );
674 serde_json::to_writer(file, &states)?;
675 Ok(())
676 }
677}
678
679impl ::futures::stream::Stream for Cluster {
680 type Item = ClusterEvent;
681
682 fn poll_next(
683 self: std::pin::Pin<&mut Self>,
684 cx: &mut std::task::Context<'_>,
685 ) -> Poll<Option<Self::Item>> {
686 let this = self.get_mut();
687 let np = this.next_poll;
688 loop {
689 let poll = match this.next_poll {
690 NextPoll::Rust(id) => this
691 .poll_rust_node(id, cx)
692 .map(|event| event.map(|event| ClusterEvent::Rust { id, event })),
693 NextPoll::Libp2p(id) => this
694 .poll_libp2p_node(id, cx)
695 .map(|event| event.map(|event| ClusterEvent::Libp2p { id, event })),
696 NextPoll::Idle => this
697 .poll_idle(cx)
698 .map(|instant| Some(ClusterEvent::Idle { instant })),
699 };
700 if poll.is_ready() {
701 return poll;
702 }
703 this.next_poll();
704 if this.next_poll == np {
705 return Poll::Pending;
706 }
707 }
708 }
709}
710
711impl TimestampSource for Cluster {
712 fn timestamp(&self) -> Instant {
713 self.last_idle_instant
714 }
715}
716
717impl TimestampSource for &mut Cluster {
718 fn timestamp(&self) -> Instant {
719 self.last_idle_instant
720 }
721}
722
723impl Cluster {
724 pub fn stream(&mut self) -> TakeDuring<&mut Cluster> {
725 let duration = self.total_duration;
726 self.take_during(duration)
727 }
728
729 pub fn try_stream(&mut self) -> MapErrors<TakeDuring<&mut Cluster>, ClusterEvent> {
730 let is_error = self.is_error;
731 let duration = self.total_duration;
732 self.take_during(duration).map_errors(is_error)
733 }
734}