p2p_testing/
service.rs

1use std::{collections::VecDeque, time::Instant};
2
3use openmina_core::channels::mpsc;
4use p2p::{
5    identity::SecretKey,
6    service_impl::{
7        mio::MioService, webrtc::P2pServiceWebrtc, webrtc_with_libp2p::P2pServiceWebrtcWithLibp2p,
8    },
9    P2pCryptoService, P2pEvent,
10};
11use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
12use redux::{Service, TimeService};
13
14use crate::event::{RustNodeEvent, RustNodeEventStore};
15
16pub struct ClusterService {
17    pub rng: StdRng,
18    pub event_sender: mpsc::UnboundedSender<P2pEvent>,
19    pub cmd_sender: mpsc::TrackedUnboundedSender<p2p::service_impl::webrtc::Cmd>,
20    mio: MioService,
21    peers: std::collections::BTreeMap<p2p::PeerId, p2p::service_impl::webrtc::PeerState>,
22    time: Instant,
23
24    rust_node_events: VecDeque<RustNodeEvent>,
25}
26
27impl ClusterService {
28    pub fn new(
29        node_idx: usize,
30        secret_key: SecretKey,
31        event_sender: mpsc::UnboundedSender<P2pEvent>,
32        cmd_sender: mpsc::TrackedUnboundedSender<p2p::service_impl::webrtc::Cmd>,
33        time: Instant,
34    ) -> Self {
35        let mio = {
36            let event_sender = event_sender.clone();
37            let mut mio = MioService::pending(secret_key.try_into().expect("valid keypair"));
38            mio.run(move |mio_event| {
39                let _ = event_sender.send(mio_event.into());
40                //.expect("cannot send mio event")
41            });
42            mio
43        };
44        Self {
45            rng: StdRng::seed_from_u64(node_idx as u64),
46            event_sender,
47            cmd_sender,
48            mio,
49            peers: Default::default(),
50            time,
51
52            rust_node_events: Default::default(),
53        }
54    }
55
56    pub(crate) fn advance_time(&mut self, duration: std::time::Duration) {
57        self.time += duration
58    }
59
60    pub(crate) fn rust_node_event(&mut self) -> Option<RustNodeEvent> {
61        self.rust_node_events.pop_front()
62    }
63}
64
65impl TimeService for ClusterService {
66    fn monotonic_time(&mut self) -> redux::Instant {
67        self.time.into()
68    }
69}
70
71impl Service for ClusterService {}
72
73impl P2pServiceWebrtcWithLibp2p for ClusterService {
74    fn mio(&mut self) -> &mut p2p::service_impl::mio::MioService {
75        &mut self.mio
76    }
77
78    fn connections(&self) -> std::collections::BTreeSet<p2p::PeerId> {
79        Default::default()
80    }
81}
82
83impl P2pServiceWebrtc for ClusterService {
84    type Event = P2pEvent;
85
86    fn random_pick(
87        &mut self,
88        list: &[p2p::connection::outgoing::P2pConnectionOutgoingInitOpts],
89    ) -> Option<p2p::connection::outgoing::P2pConnectionOutgoingInitOpts> {
90        list.choose(&mut self.rng).cloned()
91    }
92
93    fn event_sender(&self) -> &mpsc::UnboundedSender<Self::Event> {
94        &self.event_sender
95    }
96
97    fn cmd_sender(&self) -> &mpsc::TrackedUnboundedSender<p2p::service_impl::webrtc::Cmd> {
98        &self.cmd_sender
99    }
100
101    fn peers(
102        &mut self,
103    ) -> &mut std::collections::BTreeMap<p2p::PeerId, p2p::service_impl::webrtc::PeerState> {
104        &mut self.peers
105    }
106
107    fn encrypt<T: p2p::identity::EncryptableType>(
108        &mut self,
109        _other_pk: &p2p::identity::PublicKey,
110        _message: &T,
111    ) -> Result<T::Encrypted, Box<dyn std::error::Error>> {
112        unreachable!("this is webrtc only and this crate tests libp2p only")
113    }
114
115    fn decrypt<T: p2p::identity::EncryptableType>(
116        &mut self,
117        _other_pub_key: &p2p::identity::PublicKey,
118        _encrypted: &T::Encrypted,
119    ) -> Result<T, Box<dyn std::error::Error>> {
120        unreachable!("this is webrtc only and this crate tests libp2p only")
121    }
122
123    fn auth_encrypt_and_send(
124        &mut self,
125        _peer_id: p2p::PeerId,
126        _other_pub_key: &p2p::identity::PublicKey,
127        _auth: p2p::webrtc::ConnectionAuth,
128    ) {
129        unreachable!("this is webrtc only and this crate tests libp2p only")
130    }
131
132    fn auth_decrypt(
133        &mut self,
134        _other_pub_key: &p2p::identity::PublicKey,
135        _auth: p2p::webrtc::ConnectionAuthEncrypted,
136    ) -> Option<p2p::webrtc::ConnectionAuth> {
137        unreachable!("this is webrtc only and this crate tests libp2p only")
138    }
139}
140
141impl P2pCryptoService for ClusterService {
142    fn generate_random_nonce(&mut self) -> [u8; 24] {
143        self.rng.gen()
144    }
145
146    fn ephemeral_sk(&mut self) -> [u8; 32] {
147        self.rng.gen()
148    }
149
150    fn static_sk(&mut self) -> [u8; 32] {
151        self.rng.gen()
152    }
153
154    // TODO: move it to statemachine.
155    fn sign_key(&mut self, key: &[u8; 32]) -> Vec<u8> {
156        let msg = [b"noise-libp2p-static-key:", key.as_ref()].concat();
157        let sig = self
158            .mio
159            .keypair()
160            .sign(&msg)
161            .expect("unable to create signature");
162
163        let mut payload = vec![];
164        payload.extend_from_slice(b"\x0a\x24");
165        payload.extend_from_slice(&self.mio.keypair().public().encode_protobuf());
166        payload.extend_from_slice(b"\x12\x40");
167        payload.extend_from_slice(&sig);
168        payload
169    }
170
171    fn sign_publication(&mut self, publication: &[u8]) -> Vec<u8> {
172        let msg: Vec<u8> = [b"libp2p-pubsub:", publication].concat();
173        self.mio
174            .keypair()
175            .sign(&msg)
176            .expect("unable to create signature")
177    }
178
179    fn verify_publication(
180        &mut self,
181        pk: &libp2p_identity::PublicKey,
182        publication: &[u8],
183        sig: &[u8],
184    ) -> bool {
185        let msg: Vec<u8> = [b"libp2p-pubsub:", publication].concat();
186        pk.verify(&msg, sig)
187    }
188}
189
190impl RustNodeEventStore for ClusterService {
191    fn store_event(&mut self, event: RustNodeEvent) {
192        self.rust_node_events.push_back(event);
193    }
194}