1use std::{
2 collections::{BTreeSet, HashSet},
3 future::{ready, Ready},
4};
5
6use libp2p::swarm::SwarmEvent;
7use p2p::PeerId;
8
9use crate::{
10 cluster::{ClusterEvent, NodeId},
11 event::RustNodeEvent,
12 libp2p_node::Libp2pEvent,
13 rust_node::RustNodeId,
14};
15
16pub fn async_fn<T, F>(mut f: F) -> impl FnMut(ClusterEvent) -> Ready<T>
18where
19 F: FnMut(ClusterEvent) -> T,
20{
21 move |event| ready(f(event))
22}
23
24pub fn listener_is_ready(id: RustNodeId) -> impl FnMut(ClusterEvent) -> Ready<bool> {
26 move |event| {
27 ready(
28 matches!(event.rust(), Some((event_id, RustNodeEvent::ListenerReady { .. })) if *event_id == id),
29 )
30 }
31}
32
33pub fn kad_finished_bootstrap(id: RustNodeId) -> impl FnMut(ClusterEvent) -> Ready<bool> {
35 move |event| {
36 ready(matches!(
37 event.rust(),
38 Some((event_id, RustNodeEvent::KadBootstrapFinished)) if *event_id == id
39 ))
40 }
41}
42
43pub fn listeners_are_ready<I>(ids: I) -> impl FnMut(ClusterEvent) -> Ready<bool>
45where
46 I: IntoIterator<Item = RustNodeId>,
47{
48 let mut ids: HashSet<RustNodeId> = HashSet::from_iter(ids);
49 move |event| {
50 ready(
51 if let Some((event_id, RustNodeEvent::ListenerReady { .. })) = event.rust() {
52 ids.remove(event_id) && ids.is_empty()
53 } else {
54 false
55 },
56 )
57 }
58}
59
60pub fn all_listeners_are_ready<T, I>(ids: I) -> impl FnMut(ClusterEvent) -> Ready<bool>
62where
63 I: IntoIterator<Item = T>,
64 T: Into<NodeId>,
65{
66 let mut ids: HashSet<NodeId> = HashSet::from_iter(ids.into_iter().map(Into::into));
67 move |event| {
68 ready(
69 match event {
70 ClusterEvent::Rust {
71 id,
72 event: RustNodeEvent::ListenerReady { .. },
73 } => ids.remove(&NodeId::Rust(id)),
74 ClusterEvent::Libp2p {
75 id,
76 event: SwarmEvent::NewListenAddr { address, .. },
77 } => {
78 println!("{id:?}: new listen addr: {address}");
79 ids.remove(&NodeId::Libp2p(id))
80 }
81 _ => false,
82 } && ids.is_empty(),
83 )
84 }
85}
86
87pub fn nodes_peers_are_ready<I>(nodes_peers: I) -> impl FnMut(ClusterEvent) -> Ready<bool>
88where
89 I: IntoIterator<Item = (RustNodeId, PeerId)>,
90{
91 let mut nodes_peers = BTreeSet::from_iter(nodes_peers);
92 move |event| {
93 ready(
94 if let ClusterEvent::Rust {
95 id,
96 event: RustNodeEvent::PeerConnected { peer_id, .. },
97 } = event
98 {
99 nodes_peers.remove(&(id, peer_id)) && nodes_peers.is_empty()
100 } else {
101 false
102 },
103 )
104 }
105}
106
107pub fn all_nodes_peers_are_ready<I>(nodes_peers: I) -> impl FnMut(ClusterEvent) -> Ready<bool>
111where
112 I: IntoIterator<Item = (NodeId, PeerId)>,
113{
114 let mut nodes_peers = BTreeSet::from_iter(nodes_peers);
115 move |event| {
116 ready(match event {
117 ClusterEvent::Rust {
118 id,
119 event: RustNodeEvent::PeerConnected { peer_id, .. },
120 } => nodes_peers.remove(&(id.into(), peer_id)) && nodes_peers.is_empty(),
121 ClusterEvent::Libp2p {
122 id,
123 event: Libp2pEvent::ConnectionEstablished { peer_id, .. },
124 } => {
125 nodes_peers.remove(&(id.into(), peer_id.try_into().expect("Conversion failed")))
126 && nodes_peers.is_empty()
127 }
128 _ => false,
129 })
130 }
131}
132
133pub fn peer_is_connected(
135 id: RustNodeId,
136 peer_id: PeerId,
137) -> impl FnMut(ClusterEvent) -> Ready<bool> {
138 move |event| {
139 ready(
140 matches!(event.rust(), Some((event_id, RustNodeEvent::PeerConnected { peer_id: pid, .. })) if *event_id == id && pid == &peer_id),
141 )
142 }
143}
144
145pub fn default_errors(event: &ClusterEvent) -> bool {
147 match &event {
148 ClusterEvent::Rust { event: e, .. } => match e {
149 RustNodeEvent::ListenerError { .. } => true,
150 RustNodeEvent::PeerConnectionError { .. } => true,
151 RustNodeEvent::PeerDisconnected { .. } => true,
152 RustNodeEvent::P2p { event: e } => match e {
153 p2p::P2pEvent::Connection(_) => false,
154 p2p::P2pEvent::Channel(e) => matches!(
155 e,
156 p2p::P2pChannelEvent::Opened(_, _, Err(_))
157 | p2p::P2pChannelEvent::Sent(_, _, _, Err(_))
158 | p2p::P2pChannelEvent::Received(_, Err(_))
159 ),
160 p2p::P2pEvent::MioEvent(e) => matches!(
161 e,
162 p2p::MioEvent::ListenerError { .. }
163 | p2p::MioEvent::IncomingConnectionDidAccept(_, Err(_))
164 | p2p::MioEvent::IncomingDataDidReceive(_, Err(_))
165 | p2p::MioEvent::OutgoingConnectionDidConnect(_, Err(_))
166 | p2p::MioEvent::OutgoingDataDidSend(_, Err(_))
167 | p2p::MioEvent::ConnectionDidClose(_, Err(_))
168 ),
169 },
170 _ => false,
171 },
172 _ => false,
173 }
174}
175
176pub fn all_nodes_with_value<T, I, F>(
180 nodes_items: I,
181 mut f: F,
182) -> impl FnMut(ClusterEvent) -> Ready<bool>
183where
184 T: PartialEq + Eq,
185 I: IntoIterator<Item = (RustNodeId, T)>,
186 F: FnMut(RustNodeEvent) -> Option<T>,
187{
188 let mut nodes_items = Vec::from_iter(nodes_items);
189 move |event| {
190 ready(if let ClusterEvent::Rust { id, event } = event {
191 f(event)
192 .and_then(|v| {
193 nodes_items
194 .iter()
195 .position(|(_id, _v)| _id == &id && _v == &v)
196 })
197 .is_some_and(|i| {
198 nodes_items.swap_remove(i);
199 nodes_items.is_empty()
200 })
201 } else {
202 false
203 })
204 }
205}