1use std::{
2 pin::Pin,
3 task::{ready, Context, Poll},
4 time::Duration,
5};
6
7use futures::Stream;
8use openmina_core::channels::mpsc;
9use p2p::{P2pAction, P2pEvent, P2pLimits, P2pState, P2pTimeouts, PeerId};
10use redux::{Effects, EnablingCondition, Reducer, SubStore};
11
12use crate::{
13 cluster::{Listener, PeerIdConfig},
14 event::RustNodeEvent,
15 redux::{Action, IdleAction, State, Store},
16 service::ClusterService,
17 test_node::TestNode,
18};
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
21pub struct RustNodeId(pub(super) usize);
22
23#[derive(Debug, Default, Clone)]
24pub struct RustNodeConfig {
25 pub peer_id: PeerIdConfig,
26 pub initial_peers: Vec<Listener>,
27 pub timeouts: P2pTimeouts,
28 pub limits: P2pLimits,
29 pub discovery: bool,
30 pub override_fn: Option<Effects<State, ClusterService, Action>>,
31 pub override_reducer: Option<Reducer<State, Action>>,
32}
33
34impl RustNodeConfig {
35 pub fn with_peer_id(mut self, peer_id: PeerIdConfig) -> Self {
36 self.peer_id = peer_id;
37 self
38 }
39
40 pub fn with_initial_peers<T>(mut self, initial_peers: T) -> Self
41 where
42 T: IntoIterator<Item = Listener>,
43 {
44 self.initial_peers = Vec::from_iter(initial_peers);
45 self
46 }
47
48 pub fn with_timeouts(mut self, timeouts: P2pTimeouts) -> Self {
49 self.timeouts = timeouts;
50 self
51 }
52
53 pub fn with_limits(mut self, limits: P2pLimits) -> Self {
54 self.limits = limits;
55 self
56 }
57
58 pub fn with_discovery(mut self, discovery: bool) -> Self {
59 self.discovery = discovery;
60 self
61 }
62
63 pub fn with_override(mut self, override_fn: Effects<State, ClusterService, Action>) -> Self {
64 self.override_fn = Some(override_fn);
65 self
66 }
67
68 pub fn with_override_reducer(mut self, override_fn: Reducer<State, Action>) -> Self {
69 self.override_reducer = Some(override_fn);
70 self
71 }
72}
73
74pub struct RustNode {
75 store: Store,
76 event_receiver: mpsc::RecvStream<P2pEvent>,
77}
78
79impl RustNode {
80 pub(super) fn new(store: Store, event_receiver: mpsc::UnboundedReceiver<P2pEvent>) -> Self {
81 RustNode {
82 store,
83 event_receiver: event_receiver.stream(),
84 }
85 }
86
87 pub fn dispatch_action<A>(&mut self, action: A) -> bool
88 where
89 A: Into<P2pAction> + EnablingCondition<P2pState>,
90 {
91 SubStore::dispatch(&mut self.store, action)
92 }
93
94 pub(super) fn idle(&mut self, duration: Duration) -> RustNodeEvent {
95 self.store.service.advance_time(duration);
96 self.store.dispatch(IdleAction);
97 self.store
98 .service
99 .rust_node_event()
100 .unwrap_or(RustNodeEvent::Idle)
101 }
102
103 pub fn state(&self) -> &P2pState {
104 &self.store.state().0
105 }
106
107 fn next_stored_event(&mut self) -> Option<RustNodeEvent> {
108 self.store.service.rust_node_event()
109 }
110
111 fn poll_event_receiver(&mut self, cx: &mut Context<'_>) -> Poll<Option<RustNodeEvent>> {
112 let event = ready!(Pin::new(&mut self.event_receiver).poll_next(cx));
113 Poll::Ready(event.map(|event| {
114 self.dispatch_event(event.clone());
115 RustNodeEvent::P2p { event }
116 }))
117 }
118
119 pub(crate) fn dispatch_event(&mut self, event: P2pEvent) -> RustNodeEvent {
120 super::redux::event_effect(&mut self.store, event.clone());
121 RustNodeEvent::P2p { event }
122 }
123}
124
125impl TestNode for RustNode {
126 fn peer_id(&self) -> PeerId {
127 self.state().my_id()
128 }
129
130 fn libp2p_port(&self) -> u16 {
131 self.state()
132 .config
133 .libp2p_port
134 .expect("port should be present")
135 }
136}
137
138impl Stream for RustNode {
139 type Item = RustNodeEvent;
140
141 fn poll_next(
142 self: std::pin::Pin<&mut Self>,
143 cx: &mut std::task::Context<'_>,
144 ) -> std::task::Poll<Option<Self::Item>> {
145 let this = self.get_mut();
146 if let Some(event) = this.next_stored_event() {
147 Poll::Ready(Some(event))
148 } else {
149 this.poll_event_receiver(cx)
150 }
151 }
152}