p2p_testing/
libp2p_node.rs1use std::{collections::BTreeMap, error::Error, time::Duration};
2
3use libp2p::{
4 gossipsub, identify,
5 swarm::{NetworkBehaviour, SwarmEvent, THandlerErr},
6 Transport,
7};
8use mina_p2p_messages::rpc_kernel::RpcTag;
9use openmina_core::ChainId;
10use p2p::PeerId;
11
12use libp2p_rpc_behaviour::StreamId;
13
14use libp2p::kad::{self, record::store::MemoryStore};
15
16use crate::{cluster::PeerIdConfig, test_node::TestNode};
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
19pub struct Libp2pNodeId(pub(super) usize);
20
21#[derive(Debug, Default, Clone)]
22pub struct Libp2pNodeConfig {
23 pub peer_id: PeerIdConfig,
24 pub port_reuse: bool,
25}
26
27pub type Swarm = libp2p::Swarm<Libp2pBehaviour>;
28
29pub struct Libp2pNode {
30 swarm: Swarm,
31}
32
33impl Libp2pNode {
34 pub(super) fn new(swarm: Swarm) -> Self {
35 Libp2pNode { swarm }
36 }
37
38 pub fn swarm(&self) -> &Swarm {
39 &self.swarm
40 }
41
42 pub fn swarm_mut(&mut self) -> &mut Swarm {
43 &mut self.swarm
44 }
45}
46
47impl TestNode for Libp2pNode {
48 fn peer_id(&self) -> PeerId {
49 (*self.swarm.local_peer_id())
50 .try_into()
51 .expect("Conversion failed")
52 }
53
54 fn libp2p_port(&self) -> u16 {
55 self.swarm.behaviour().port
56 }
57}
58
59pub type Libp2pEvent = SwarmEvent<Libp2pBehaviourEvent, THandlerErr<Libp2pBehaviour>>;
60
61#[derive(Debug, derive_more::From)]
62pub enum Libp2pBehaviourEvent {
63 Gossipsub(gossipsub::Event),
65 Rpc((libp2p::identity::PeerId, libp2p_rpc_behaviour::Event)),
66 Identify(identify::Event),
67 Kademlia(kad::Event),
68}
69
70#[derive(NetworkBehaviour)]
71#[behaviour(to_swarm = "Libp2pBehaviourEvent")]
72pub struct Libp2pBehaviour {
73 pub gossipsub: gossipsub::Behaviour,
74 pub rpc: libp2p_rpc_behaviour::Behaviour,
75 pub identify: identify::Behaviour,
76 pub kademlia: kad::Behaviour<MemoryStore>,
77
78 #[behaviour(ignore)]
79 port: u16,
80
81 #[behaviour(ignore)]
83 pub ongoing: BTreeMap<(PeerId, u64), (RpcTag, u32)>,
84 #[behaviour(ignore)]
87 pub ongoing_incoming: BTreeMap<(PeerId, u64), (StreamId, String, u32)>,
88}
89
90pub(crate) fn create_swarm(
91 secret_key: p2p::identity::SecretKey,
92 port: u16,
93 port_reuse: bool,
94 chain_id: &ChainId,
95) -> Result<Swarm, Box<dyn Error>> {
96 let identity_keys = libp2p::identity::Keypair::ed25519_from_bytes(secret_key.to_bytes())
97 .expect("secret key bytes must be valid");
98
99 let psk = libp2p::pnet::PreSharedKey::new(chain_id.preshared_key());
100 let identify = libp2p::identify::Behaviour::new(libp2p::identify::Config::new(
101 "ipfs/0.1.0".to_string(),
102 identity_keys.public(),
103 ));
104
105 let kademlia = {
106 let peer_id = identity_keys.public().to_peer_id();
107 let kad_config = {
108 let mut c = libp2p::kad::Config::default();
109 c.set_protocol_names(vec![libp2p::StreamProtocol::new("/coda/kad/1.0.0")]);
110 c
111 };
112
113 let mut kademlia = libp2p::kad::Behaviour::with_config(
114 peer_id,
115 libp2p::kad::store::MemoryStore::new(peer_id),
116 kad_config,
117 );
118
119 kademlia.set_mode(Some(libp2p::kad::Mode::Server));
120 kademlia
121 };
122
123 let gossipsub = {
125 let message_authenticity = gossipsub::MessageAuthenticity::Signed(identity_keys.clone());
126 let gossipsub_config = gossipsub::ConfigBuilder::default()
127 .max_transmit_size(1024 * 1024 * 32)
128 .validate_messages()
129 .build()
130 .expect("Error building gossipsub");
131 let mut gossipsub: gossipsub::Behaviour =
132 gossipsub::Behaviour::new(message_authenticity, gossipsub_config)
133 .expect("Error creating behaviour");
134
135 gossipsub
136 .subscribe(&gossipsub::IdentTopic::new("coda/consensus-messages/0.0.1"))
137 .expect("subscribe");
138
139 gossipsub
140 };
141
142 let rpc = {
144 use mina_p2p_messages::rpc::{
145 AnswerSyncLedgerQueryV2, GetAncestryV2, GetBestTipV2,
146 GetStagedLedgerAuxAndPendingCoinbasesAtHashV2, GetTransitionChainProofV1ForV2,
147 GetTransitionChainV2,
148 };
149
150 libp2p_rpc_behaviour::BehaviourBuilder::default()
151 .register_method::<GetBestTipV2>()
152 .register_method::<GetAncestryV2>()
153 .register_method::<GetStagedLedgerAuxAndPendingCoinbasesAtHashV2>()
154 .register_method::<AnswerSyncLedgerQueryV2>()
155 .register_method::<GetTransitionChainV2>()
156 .register_method::<GetTransitionChainProofV1ForV2>()
157 .build()
158 };
159
160 let behaviour = Libp2pBehaviour {
161 gossipsub,
162 identify,
163 kademlia,
164 rpc,
165 port,
166 ongoing: Default::default(),
167 ongoing_incoming: Default::default(),
168 };
169
170 let swarm = libp2p::SwarmBuilder::with_existing_identity(identity_keys)
171 .with_tokio()
172 .with_other_transport(|key| {
173 let noise_config = libp2p::noise::Config::new(key).expect("Error generating noise");
174 let mut yamux_config = libp2p::yamux::Config::default();
175
176 yamux_config.set_protocol_name("/coda/yamux/1.0.0");
177
178 let mut base_transport = libp2p::tcp::tokio::Transport::new(
179 libp2p::tcp::Config::default()
180 .nodelay(true)
181 .port_reuse(port_reuse),
182 );
183
184 base_transport
185 .listen_on(
186 libp2p::core::transport::ListenerId::next(),
187 libp2p::multiaddr::multiaddr!(Ip4([127, 0, 0, 1]), Tcp(port)),
188 )
189 .expect("listen");
190
191 base_transport
192 .and_then(move |socket, _| libp2p::pnet::PnetConfig::new(psk).handshake(socket))
193 .upgrade(libp2p::core::upgrade::Version::V1)
194 .authenticate(noise_config)
195 .multiplex(yamux_config)
196 .timeout(Duration::from_secs(60))
197 })?
198 .with_dns()?
199 .with_behaviour(|_| behaviour)?
200 .with_swarm_config(|config| {
201 config.with_idle_connection_timeout(Duration::from_millis(1000))
202 })
203 .build();
204
205 Ok(swarm)
208}