mina_node_testing/node/rust/
mod.rs

1mod config;
2pub use config::*;
3
4mod event;
5pub use event::*;
6
7use node::{
8    event_source::EventSourceAction,
9    p2p::{
10        connection::outgoing::{
11            P2pConnectionOutgoingInitLibp2pOpts, P2pConnectionOutgoingInitOpts,
12        },
13        webrtc::SignalingMethod,
14        PeerId,
15    },
16    service::P2pDisconnectionService,
17    Action, CheckTimeoutsAction, State, Store,
18};
19use redux::EnablingCondition;
20use temp_dir::TempDir;
21
22use crate::{
23    cluster::ClusterNodeId,
24    service::{DynEffects, NodeTestingService, PendingEventId},
25};
26
27pub struct Node {
28    work_dir: TempDir,
29    config: RustNodeTestingConfig,
30    store: Store<NodeTestingService>,
31}
32
33impl Node {
34    pub fn new(
35        work_dir: TempDir,
36        config: RustNodeTestingConfig,
37        store: Store<NodeTestingService>,
38    ) -> Self {
39        Self {
40            work_dir,
41            config,
42            store,
43        }
44    }
45
46    pub fn work_dir(&self) -> &TempDir {
47        &self.work_dir
48    }
49
50    pub fn config(&self) -> &RustNodeTestingConfig {
51        &self.config
52    }
53
54    pub fn service(&self) -> &NodeTestingService {
55        &self.store.service
56    }
57
58    fn service_mut(&mut self) -> &mut NodeTestingService {
59        &mut self.store.service
60    }
61
62    pub fn set_dyn_effects(&mut self, effects: DynEffects) {
63        self.service_mut().set_dyn_effects(effects)
64    }
65
66    pub fn remove_dyn_effects(&mut self) -> Option<DynEffects> {
67        self.service_mut().remove_dyn_effects()
68    }
69
70    pub fn dial_addr(&self) -> P2pConnectionOutgoingInitOpts {
71        let peer_id = self.store.state().p2p.my_id();
72        if self.service().rust_to_rust_use_webrtc() {
73            let port = self.store.state().p2p.config().listen_port.unwrap();
74            let signaling = SignalingMethod::Http(([127, 0, 0, 1], port).into());
75            P2pConnectionOutgoingInitOpts::WebRTC { peer_id, signaling }
76        } else {
77            let opts = P2pConnectionOutgoingInitLibp2pOpts {
78                peer_id,
79                host: node::p2p::webrtc::Host::Ipv4([127, 0, 0, 1].into()),
80                port: self.store.state().p2p.config().libp2p_port.unwrap(),
81            };
82            P2pConnectionOutgoingInitOpts::LibP2P(opts)
83        }
84    }
85
86    pub fn state(&self) -> &State {
87        self.store.state()
88    }
89
90    pub fn node_id(&self) -> ClusterNodeId {
91        self.service().node_id()
92    }
93
94    pub fn peer_id(&self) -> PeerId {
95        self.state().p2p.my_id()
96    }
97
98    pub fn pending_events(&mut self, poll: bool) -> impl Iterator<Item = (PendingEventId, &Event)> {
99        self.pending_events_with_state(poll).1
100    }
101
102    pub fn pending_events_with_state(
103        &mut self,
104        poll: bool,
105    ) -> (&State, impl Iterator<Item = (PendingEventId, &Event)>) {
106        (
107            self.store.state.get(),
108            self.store.service.pending_events(poll),
109        )
110    }
111
112    fn dispatch<T>(&mut self, action: T) -> bool
113    where
114        T: Into<Action> + EnablingCondition<State>,
115    {
116        self.store.dispatch(action)
117    }
118
119    pub fn dispatch_event(&mut self, event: Event) -> bool {
120        self.dispatch(EventSourceAction::NewEvent { event })
121    }
122
123    pub fn get_pending_event(&self, event_id: PendingEventId) -> Option<&Event> {
124        self.service().get_pending_event(event_id)
125    }
126
127    pub fn take_pending_event(&mut self, event_id: PendingEventId) -> Option<Event> {
128        self.service_mut().take_pending_event(event_id)
129    }
130
131    pub fn take_event_and_dispatch(&mut self, event_id: PendingEventId) -> bool {
132        let event = self.service_mut().take_pending_event(event_id).unwrap();
133        self.dispatch_event(event)
134    }
135
136    pub fn check_timeouts(&mut self) {
137        self.dispatch(CheckTimeoutsAction {});
138    }
139
140    pub fn advance_time(&mut self, by_nanos: u64) {
141        self.store.service.advance_time(by_nanos)
142    }
143
144    pub async fn wait_for_next_pending_event(&mut self) -> Option<(PendingEventId, &Event)> {
145        self.service_mut().next_pending_event().await
146    }
147
148    pub async fn wait_for_event(&mut self, event_pattern: &str) -> Option<PendingEventId> {
149        let readonly_rpcs = self
150            .service_mut()
151            .pending_events(false)
152            .filter(|(_, event)| {
153                matches!(
154                    NonDeterministicEvent::new(event).as_deref(),
155                    Some(NonDeterministicEvent::RpcReadonly(..))
156                )
157            })
158            .map(|(id, _)| id)
159            .collect::<Vec<_>>();
160
161        for event_id in readonly_rpcs {
162            self.take_event_and_dispatch(event_id);
163        }
164
165        let event_id = self
166            .service_mut()
167            .pending_events(false)
168            .find(|(_, event)| event.to_string().starts_with(event_pattern))
169            .map(|(id, _)| id);
170        match event_id {
171            Some(id) => Some(id),
172            None => loop {
173                let (id, event) = match self.service_mut().next_pending_event().await {
174                    Some(v) => v,
175                    None => break None,
176                };
177                if event.to_string().starts_with(event_pattern) {
178                    break Some(id);
179                } else if matches!(
180                    NonDeterministicEvent::new(event).as_deref(),
181                    Some(NonDeterministicEvent::RpcReadonly(..))
182                ) {
183                    self.take_event_and_dispatch(id);
184                }
185            },
186        }
187    }
188
189    pub async fn wait_for_event_and_dispatch(&mut self, event_pattern: &str) -> bool {
190        if let Some(id) = self.wait_for_event(event_pattern).await {
191            return self.take_event_and_dispatch(id);
192        }
193        false
194    }
195
196    pub fn p2p_disconnect(&mut self, peer_id: PeerId) -> bool {
197        self.service_mut().disconnect(peer_id)
198    }
199}