p2p_testing/
rust_node.rs

1use std::{
2    pin::Pin,
3    task::{ready, Context, Poll},
4    time::Duration,
5};
6
7use futures::Stream;
8use openmina_core::channels::mpsc;
9use p2p::{P2pAction, P2pEvent, P2pLimits, P2pState, P2pTimeouts, PeerId};
10use redux::{Effects, EnablingCondition, Reducer, SubStore};
11
12use crate::{
13    cluster::{Listener, PeerIdConfig},
14    event::RustNodeEvent,
15    redux::{Action, IdleAction, State, Store},
16    service::ClusterService,
17    test_node::TestNode,
18};
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
21pub struct RustNodeId(pub(super) usize);
22
23#[derive(Debug, Default, Clone)]
24pub struct RustNodeConfig {
25    pub peer_id: PeerIdConfig,
26    pub initial_peers: Vec<Listener>,
27    pub timeouts: P2pTimeouts,
28    pub limits: P2pLimits,
29    pub discovery: bool,
30    pub override_fn: Option<Effects<State, ClusterService, Action>>,
31    pub override_reducer: Option<Reducer<State, Action>>,
32}
33
34impl RustNodeConfig {
35    pub fn with_peer_id(mut self, peer_id: PeerIdConfig) -> Self {
36        self.peer_id = peer_id;
37        self
38    }
39
40    pub fn with_initial_peers<T>(mut self, initial_peers: T) -> Self
41    where
42        T: IntoIterator<Item = Listener>,
43    {
44        self.initial_peers = Vec::from_iter(initial_peers);
45        self
46    }
47
48    pub fn with_timeouts(mut self, timeouts: P2pTimeouts) -> Self {
49        self.timeouts = timeouts;
50        self
51    }
52
53    pub fn with_limits(mut self, limits: P2pLimits) -> Self {
54        self.limits = limits;
55        self
56    }
57
58    pub fn with_discovery(mut self, discovery: bool) -> Self {
59        self.discovery = discovery;
60        self
61    }
62
63    pub fn with_override(mut self, override_fn: Effects<State, ClusterService, Action>) -> Self {
64        self.override_fn = Some(override_fn);
65        self
66    }
67
68    pub fn with_override_reducer(mut self, override_fn: Reducer<State, Action>) -> Self {
69        self.override_reducer = Some(override_fn);
70        self
71    }
72}
73
74pub struct RustNode {
75    store: Store,
76    event_receiver: mpsc::RecvStream<P2pEvent>,
77}
78
79impl RustNode {
80    pub(super) fn new(store: Store, event_receiver: mpsc::UnboundedReceiver<P2pEvent>) -> Self {
81        RustNode {
82            store,
83            event_receiver: event_receiver.stream(),
84        }
85    }
86
87    pub fn dispatch_action<A>(&mut self, action: A) -> bool
88    where
89        A: Into<P2pAction> + EnablingCondition<P2pState>,
90    {
91        SubStore::dispatch(&mut self.store, action)
92    }
93
94    pub(super) fn idle(&mut self, duration: Duration) -> RustNodeEvent {
95        self.store.service.advance_time(duration);
96        self.store.dispatch(IdleAction);
97        self.store
98            .service
99            .rust_node_event()
100            .unwrap_or(RustNodeEvent::Idle)
101    }
102
103    pub fn state(&self) -> &P2pState {
104        &self.store.state().0
105    }
106
107    fn next_stored_event(&mut self) -> Option<RustNodeEvent> {
108        self.store.service.rust_node_event()
109    }
110
111    fn poll_event_receiver(&mut self, cx: &mut Context<'_>) -> Poll<Option<RustNodeEvent>> {
112        let event = ready!(Pin::new(&mut self.event_receiver).poll_next(cx));
113        Poll::Ready(event.map(|event| {
114            self.dispatch_event(event.clone());
115            RustNodeEvent::P2p { event }
116        }))
117    }
118
119    pub(crate) fn dispatch_event(&mut self, event: P2pEvent) -> RustNodeEvent {
120        super::redux::event_effect(&mut self.store, event.clone());
121        RustNodeEvent::P2p { event }
122    }
123}
124
125impl TestNode for RustNode {
126    fn peer_id(&self) -> PeerId {
127        self.state().my_id()
128    }
129
130    fn libp2p_port(&self) -> u16 {
131        self.state()
132            .config
133            .libp2p_port
134            .expect("port should be present")
135    }
136}
137
138impl Stream for RustNode {
139    type Item = RustNodeEvent;
140
141    fn poll_next(
142        self: std::pin::Pin<&mut Self>,
143        cx: &mut std::task::Context<'_>,
144    ) -> std::task::Poll<Option<Self::Item>> {
145        let this = self.get_mut();
146        if let Some(event) = this.next_stored_event() {
147            Poll::Ready(Some(event))
148        } else {
149            this.poll_event_receiver(cx)
150        }
151    }
152}