mina_node_testing/node/rust/
mod.rs1mod config;
2pub use config::*;
3
4mod event;
5pub use event::*;
6
7use node::{
8 event_source::EventSourceAction,
9 p2p::{
10 connection::outgoing::{
11 P2pConnectionOutgoingInitLibp2pOpts, P2pConnectionOutgoingInitOpts,
12 },
13 webrtc::SignalingMethod,
14 PeerId,
15 },
16 service::P2pDisconnectionService,
17 Action, CheckTimeoutsAction, State, Store,
18};
19use redux::EnablingCondition;
20use temp_dir::TempDir;
21
22use crate::{
23 cluster::ClusterNodeId,
24 service::{DynEffects, NodeTestingService, PendingEventId},
25};
26
27pub struct Node {
28 work_dir: TempDir,
29 config: RustNodeTestingConfig,
30 store: Store<NodeTestingService>,
31}
32
33impl Node {
34 pub fn new(
35 work_dir: TempDir,
36 config: RustNodeTestingConfig,
37 store: Store<NodeTestingService>,
38 ) -> Self {
39 Self {
40 work_dir,
41 config,
42 store,
43 }
44 }
45
46 pub fn work_dir(&self) -> &TempDir {
47 &self.work_dir
48 }
49
50 pub fn config(&self) -> &RustNodeTestingConfig {
51 &self.config
52 }
53
54 pub fn service(&self) -> &NodeTestingService {
55 &self.store.service
56 }
57
58 fn service_mut(&mut self) -> &mut NodeTestingService {
59 &mut self.store.service
60 }
61
62 pub fn set_dyn_effects(&mut self, effects: DynEffects) {
63 self.service_mut().set_dyn_effects(effects)
64 }
65
66 pub fn remove_dyn_effects(&mut self) -> Option<DynEffects> {
67 self.service_mut().remove_dyn_effects()
68 }
69
70 pub fn dial_addr(&self) -> P2pConnectionOutgoingInitOpts {
71 let peer_id = self.store.state().p2p.my_id();
72 if self.service().rust_to_rust_use_webrtc() {
73 let port = self.store.state().p2p.config().listen_port.unwrap();
74 let signaling = SignalingMethod::Http(([127, 0, 0, 1], port).into());
75 P2pConnectionOutgoingInitOpts::WebRTC { peer_id, signaling }
76 } else {
77 let opts = P2pConnectionOutgoingInitLibp2pOpts {
78 peer_id,
79 host: node::p2p::webrtc::Host::Ipv4([127, 0, 0, 1].into()),
80 port: self.store.state().p2p.config().libp2p_port.unwrap(),
81 };
82 P2pConnectionOutgoingInitOpts::LibP2P(opts)
83 }
84 }
85
86 pub fn state(&self) -> &State {
87 self.store.state()
88 }
89
90 pub fn node_id(&self) -> ClusterNodeId {
91 self.service().node_id()
92 }
93
94 pub fn peer_id(&self) -> PeerId {
95 self.state().p2p.my_id()
96 }
97
98 pub fn pending_events(&mut self, poll: bool) -> impl Iterator<Item = (PendingEventId, &Event)> {
99 self.pending_events_with_state(poll).1
100 }
101
102 pub fn pending_events_with_state(
103 &mut self,
104 poll: bool,
105 ) -> (&State, impl Iterator<Item = (PendingEventId, &Event)>) {
106 (
107 self.store.state.get(),
108 self.store.service.pending_events(poll),
109 )
110 }
111
112 fn dispatch<T>(&mut self, action: T) -> bool
113 where
114 T: Into<Action> + EnablingCondition<State>,
115 {
116 self.store.dispatch(action)
117 }
118
119 pub fn dispatch_event(&mut self, event: Event) -> bool {
120 self.dispatch(EventSourceAction::NewEvent { event })
121 }
122
123 pub fn get_pending_event(&self, event_id: PendingEventId) -> Option<&Event> {
124 self.service().get_pending_event(event_id)
125 }
126
127 pub fn take_pending_event(&mut self, event_id: PendingEventId) -> Option<Event> {
128 self.service_mut().take_pending_event(event_id)
129 }
130
131 pub fn take_event_and_dispatch(&mut self, event_id: PendingEventId) -> bool {
132 let event = self.service_mut().take_pending_event(event_id).unwrap();
133 self.dispatch_event(event)
134 }
135
136 pub fn check_timeouts(&mut self) {
137 self.dispatch(CheckTimeoutsAction {});
138 }
139
140 pub fn advance_time(&mut self, by_nanos: u64) {
141 self.store.service.advance_time(by_nanos)
142 }
143
144 pub async fn wait_for_next_pending_event(&mut self) -> Option<(PendingEventId, &Event)> {
145 self.service_mut().next_pending_event().await
146 }
147
148 pub async fn wait_for_event(&mut self, event_pattern: &str) -> Option<PendingEventId> {
149 let readonly_rpcs = self
150 .service_mut()
151 .pending_events(false)
152 .filter(|(_, event)| {
153 matches!(
154 NonDeterministicEvent::new(event).as_deref(),
155 Some(NonDeterministicEvent::RpcReadonly(..))
156 )
157 })
158 .map(|(id, _)| id)
159 .collect::<Vec<_>>();
160
161 for event_id in readonly_rpcs {
162 self.take_event_and_dispatch(event_id);
163 }
164
165 let event_id = self
166 .service_mut()
167 .pending_events(false)
168 .find(|(_, event)| event.to_string().starts_with(event_pattern))
169 .map(|(id, _)| id);
170 match event_id {
171 Some(id) => Some(id),
172 None => loop {
173 let (id, event) = match self.service_mut().next_pending_event().await {
174 Some(v) => v,
175 None => break None,
176 };
177 if event.to_string().starts_with(event_pattern) {
178 break Some(id);
179 } else if matches!(
180 NonDeterministicEvent::new(event).as_deref(),
181 Some(NonDeterministicEvent::RpcReadonly(..))
182 ) {
183 self.take_event_and_dispatch(id);
184 }
185 },
186 }
187 }
188
189 pub async fn wait_for_event_and_dispatch(&mut self, event_pattern: &str) -> bool {
190 if let Some(id) = self.wait_for_event(event_pattern).await {
191 return self.take_event_and_dispatch(id);
192 }
193 false
194 }
195
196 pub fn p2p_disconnect(&mut self, peer_id: PeerId) -> bool {
197 self.service_mut().disconnect(peer_id)
198 }
199}