1use std::{collections::VecDeque, time::Instant};
2
3use openmina_core::channels::mpsc;
4use p2p::{
5 identity::SecretKey,
6 service_impl::{
7 mio::MioService, webrtc::P2pServiceWebrtc, webrtc_with_libp2p::P2pServiceWebrtcWithLibp2p,
8 },
9 P2pCryptoService, P2pEvent,
10};
11use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
12use redux::{Service, TimeService};
13
14use crate::event::{RustNodeEvent, RustNodeEventStore};
15
16pub struct ClusterService {
17 pub rng: StdRng,
18 pub event_sender: mpsc::UnboundedSender<P2pEvent>,
19 pub cmd_sender: mpsc::TrackedUnboundedSender<p2p::service_impl::webrtc::Cmd>,
20 mio: MioService,
21 peers: std::collections::BTreeMap<p2p::PeerId, p2p::service_impl::webrtc::PeerState>,
22 time: Instant,
23
24 rust_node_events: VecDeque<RustNodeEvent>,
25}
26
27impl ClusterService {
28 pub fn new(
29 node_idx: usize,
30 secret_key: SecretKey,
31 event_sender: mpsc::UnboundedSender<P2pEvent>,
32 cmd_sender: mpsc::TrackedUnboundedSender<p2p::service_impl::webrtc::Cmd>,
33 time: Instant,
34 ) -> Self {
35 let mio = {
36 let event_sender = event_sender.clone();
37 let mut mio = MioService::pending(secret_key.try_into().expect("valid keypair"));
38 mio.run(move |mio_event| {
39 let _ = event_sender.send(mio_event.into());
40 });
42 mio
43 };
44 Self {
45 rng: StdRng::seed_from_u64(node_idx as u64),
46 event_sender,
47 cmd_sender,
48 mio,
49 peers: Default::default(),
50 time,
51
52 rust_node_events: Default::default(),
53 }
54 }
55
56 pub(crate) fn advance_time(&mut self, duration: std::time::Duration) {
57 self.time += duration
58 }
59
60 pub(crate) fn rust_node_event(&mut self) -> Option<RustNodeEvent> {
61 self.rust_node_events.pop_front()
62 }
63}
64
65impl TimeService for ClusterService {
66 fn monotonic_time(&mut self) -> redux::Instant {
67 self.time.into()
68 }
69}
70
71impl Service for ClusterService {}
72
73impl P2pServiceWebrtcWithLibp2p for ClusterService {
74 fn mio(&mut self) -> &mut p2p::service_impl::mio::MioService {
75 &mut self.mio
76 }
77
78 fn connections(&self) -> std::collections::BTreeSet<p2p::PeerId> {
79 Default::default()
80 }
81}
82
83impl P2pServiceWebrtc for ClusterService {
84 type Event = P2pEvent;
85
86 fn random_pick(
87 &mut self,
88 list: &[p2p::connection::outgoing::P2pConnectionOutgoingInitOpts],
89 ) -> Option<p2p::connection::outgoing::P2pConnectionOutgoingInitOpts> {
90 list.choose(&mut self.rng).cloned()
91 }
92
93 fn event_sender(&self) -> &mpsc::UnboundedSender<Self::Event> {
94 &self.event_sender
95 }
96
97 fn cmd_sender(&self) -> &mpsc::TrackedUnboundedSender<p2p::service_impl::webrtc::Cmd> {
98 &self.cmd_sender
99 }
100
101 fn peers(
102 &mut self,
103 ) -> &mut std::collections::BTreeMap<p2p::PeerId, p2p::service_impl::webrtc::PeerState> {
104 &mut self.peers
105 }
106
107 fn encrypt<T: p2p::identity::EncryptableType>(
108 &mut self,
109 _other_pk: &p2p::identity::PublicKey,
110 _message: &T,
111 ) -> Result<T::Encrypted, Box<dyn std::error::Error>> {
112 unreachable!("this is webrtc only and this crate tests libp2p only")
113 }
114
115 fn decrypt<T: p2p::identity::EncryptableType>(
116 &mut self,
117 _other_pub_key: &p2p::identity::PublicKey,
118 _encrypted: &T::Encrypted,
119 ) -> Result<T, Box<dyn std::error::Error>> {
120 unreachable!("this is webrtc only and this crate tests libp2p only")
121 }
122
123 fn auth_encrypt_and_send(
124 &mut self,
125 _peer_id: p2p::PeerId,
126 _other_pub_key: &p2p::identity::PublicKey,
127 _auth: p2p::webrtc::ConnectionAuth,
128 ) {
129 unreachable!("this is webrtc only and this crate tests libp2p only")
130 }
131
132 fn auth_decrypt(
133 &mut self,
134 _other_pub_key: &p2p::identity::PublicKey,
135 _auth: p2p::webrtc::ConnectionAuthEncrypted,
136 ) -> Option<p2p::webrtc::ConnectionAuth> {
137 unreachable!("this is webrtc only and this crate tests libp2p only")
138 }
139}
140
141impl P2pCryptoService for ClusterService {
142 fn generate_random_nonce(&mut self) -> [u8; 24] {
143 self.rng.gen()
144 }
145
146 fn ephemeral_sk(&mut self) -> [u8; 32] {
147 self.rng.gen()
148 }
149
150 fn static_sk(&mut self) -> [u8; 32] {
151 self.rng.gen()
152 }
153
154 fn sign_key(&mut self, key: &[u8; 32]) -> Vec<u8> {
156 let msg = [b"noise-libp2p-static-key:", key.as_ref()].concat();
157 let sig = self
158 .mio
159 .keypair()
160 .sign(&msg)
161 .expect("unable to create signature");
162
163 let mut payload = vec![];
164 payload.extend_from_slice(b"\x0a\x24");
165 payload.extend_from_slice(&self.mio.keypair().public().encode_protobuf());
166 payload.extend_from_slice(b"\x12\x40");
167 payload.extend_from_slice(&sig);
168 payload
169 }
170
171 fn sign_publication(&mut self, publication: &[u8]) -> Vec<u8> {
172 let msg: Vec<u8> = [b"libp2p-pubsub:", publication].concat();
173 self.mio
174 .keypair()
175 .sign(&msg)
176 .expect("unable to create signature")
177 }
178
179 fn verify_publication(
180 &mut self,
181 pk: &libp2p_identity::PublicKey,
182 publication: &[u8],
183 sig: &[u8],
184 ) -> bool {
185 let msg: Vec<u8> = [b"libp2p-pubsub:", publication].concat();
186 pk.verify(&msg, sig)
187 }
188}
189
190impl RustNodeEventStore for ClusterService {
191 fn store_event(&mut self, event: RustNodeEvent) {
192 self.rust_node_events.push_back(event);
193 }
194}