1use std::{collections::VecDeque, time::Instant};
2
3use mina_core::channels::mpsc;
4use mina_p2p::{
5 identity::SecretKey,
6 service_impl::{
7 mio::MioService, webrtc::P2pServiceWebrtc, webrtc_with_libp2p::P2pServiceWebrtcWithLibp2p,
8 },
9 P2pCryptoService, P2pEvent,
10};
11use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
12use redux::{Service, TimeService};
13
14use crate::event::{RustNodeEvent, RustNodeEventStore};
15
16pub struct ClusterService {
17 pub rng: StdRng,
18 pub event_sender: mpsc::UnboundedSender<P2pEvent>,
19 pub cmd_sender: mpsc::TrackedUnboundedSender<mina_p2p::service_impl::webrtc::Cmd>,
20 mio: MioService,
21 peers: std::collections::BTreeMap<mina_p2p::PeerId, mina_p2p::service_impl::webrtc::PeerState>,
22 time: Instant,
23
24 rust_node_events: VecDeque<RustNodeEvent>,
25}
26
27impl ClusterService {
28 pub fn new(
29 node_idx: usize,
30 secret_key: SecretKey,
31 event_sender: mpsc::UnboundedSender<P2pEvent>,
32 cmd_sender: mpsc::TrackedUnboundedSender<mina_p2p::service_impl::webrtc::Cmd>,
33 time: Instant,
34 ) -> Self {
35 let mio = {
36 let event_sender = event_sender.clone();
37 let mut mio = MioService::pending(secret_key.try_into().expect("valid keypair"));
38 mio.run(move |mio_event| {
39 let _ = event_sender.send(mio_event.into());
40 });
42 mio
43 };
44 Self {
45 rng: StdRng::seed_from_u64(node_idx as u64),
46 event_sender,
47 cmd_sender,
48 mio,
49 peers: Default::default(),
50 time,
51
52 rust_node_events: Default::default(),
53 }
54 }
55
56 pub(crate) fn advance_time(&mut self, duration: std::time::Duration) {
57 self.time += duration
58 }
59
60 pub(crate) fn rust_node_event(&mut self) -> Option<RustNodeEvent> {
61 self.rust_node_events.pop_front()
62 }
63}
64
65impl TimeService for ClusterService {
66 fn monotonic_time(&mut self) -> redux::Instant {
67 self.time.into()
68 }
69}
70
71impl Service for ClusterService {}
72
73impl P2pServiceWebrtcWithLibp2p for ClusterService {
74 fn mio(&mut self) -> &mut mina_p2p::service_impl::mio::MioService {
75 &mut self.mio
76 }
77
78 fn connections(&self) -> std::collections::BTreeSet<mina_p2p::PeerId> {
79 Default::default()
80 }
81}
82
83impl P2pServiceWebrtc for ClusterService {
84 type Event = P2pEvent;
85
86 fn random_pick(
87 &mut self,
88 list: &[mina_p2p::connection::outgoing::P2pConnectionOutgoingInitOpts],
89 ) -> Option<mina_p2p::connection::outgoing::P2pConnectionOutgoingInitOpts> {
90 list.choose(&mut self.rng).cloned()
91 }
92
93 fn event_sender(&self) -> &mpsc::UnboundedSender<Self::Event> {
94 &self.event_sender
95 }
96
97 fn cmd_sender(&self) -> &mpsc::TrackedUnboundedSender<mina_p2p::service_impl::webrtc::Cmd> {
98 &self.cmd_sender
99 }
100
101 fn peers(
102 &mut self,
103 ) -> &mut std::collections::BTreeMap<mina_p2p::PeerId, mina_p2p::service_impl::webrtc::PeerState>
104 {
105 &mut self.peers
106 }
107
108 fn encrypt<T: mina_p2p::identity::EncryptableType>(
109 &mut self,
110 _other_pk: &mina_p2p::identity::PublicKey,
111 _message: &T,
112 ) -> Result<T::Encrypted, Box<dyn std::error::Error>> {
113 unreachable!("this is webrtc only and this crate tests libp2p only")
114 }
115
116 fn decrypt<T: mina_p2p::identity::EncryptableType>(
117 &mut self,
118 _other_pub_key: &mina_p2p::identity::PublicKey,
119 _encrypted: &T::Encrypted,
120 ) -> Result<T, Box<dyn std::error::Error>> {
121 unreachable!("this is webrtc only and this crate tests libp2p only")
122 }
123
124 fn auth_encrypt_and_send(
125 &mut self,
126 _peer_id: mina_p2p::PeerId,
127 _other_pub_key: &mina_p2p::identity::PublicKey,
128 _auth: mina_p2p::webrtc::ConnectionAuth,
129 ) {
130 unreachable!("this is webrtc only and this crate tests libp2p only")
131 }
132
133 fn auth_decrypt(
134 &mut self,
135 _other_pub_key: &mina_p2p::identity::PublicKey,
136 _auth: mina_p2p::webrtc::ConnectionAuthEncrypted,
137 ) -> Option<mina_p2p::webrtc::ConnectionAuth> {
138 unreachable!("this is webrtc only and this crate tests libp2p only")
139 }
140}
141
142impl P2pCryptoService for ClusterService {
143 fn generate_random_nonce(&mut self) -> [u8; 24] {
144 self.rng.gen()
145 }
146
147 fn ephemeral_sk(&mut self) -> [u8; 32] {
148 self.rng.gen()
149 }
150
151 fn static_sk(&mut self) -> [u8; 32] {
152 self.rng.gen()
153 }
154
155 fn sign_key(&mut self, key: &[u8; 32]) -> Vec<u8> {
157 let msg = [b"noise-libp2p-static-key:", key.as_ref()].concat();
158 let sig = self
159 .mio
160 .keypair()
161 .sign(&msg)
162 .expect("unable to create signature");
163
164 let mut payload = vec![];
165 payload.extend_from_slice(b"\x0a\x24");
166 payload.extend_from_slice(&self.mio.keypair().public().encode_protobuf());
167 payload.extend_from_slice(b"\x12\x40");
168 payload.extend_from_slice(&sig);
169 payload
170 }
171
172 fn sign_publication(&mut self, publication: &[u8]) -> Vec<u8> {
173 let msg: Vec<u8> = [b"libp2p-pubsub:", publication].concat();
174 self.mio
175 .keypair()
176 .sign(&msg)
177 .expect("unable to create signature")
178 }
179
180 fn verify_publication(
181 &mut self,
182 pk: &libp2p_identity::PublicKey,
183 publication: &[u8],
184 sig: &[u8],
185 ) -> bool {
186 let msg: Vec<u8> = [b"libp2p-pubsub:", publication].concat();
187 pk.verify(&msg, sig)
188 }
189}
190
191impl RustNodeEventStore for ClusterService {
192 fn store_event(&mut self, event: RustNodeEvent) {
193 self.rust_node_events.push_back(event);
194 }
195}