p2p_testing/
libp2p_node.rs

1use std::{collections::BTreeMap, error::Error, time::Duration};
2
3use libp2p::{
4    gossipsub, identify,
5    swarm::{NetworkBehaviour, SwarmEvent, THandlerErr},
6    Transport,
7};
8use mina_p2p_messages::rpc_kernel::RpcTag;
9use openmina_core::ChainId;
10use p2p::PeerId;
11
12use libp2p_rpc_behaviour::StreamId;
13
14use libp2p::kad::{self, record::store::MemoryStore};
15
16use crate::{cluster::PeerIdConfig, test_node::TestNode};
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
19pub struct Libp2pNodeId(pub(super) usize);
20
21#[derive(Debug, Default, Clone)]
22pub struct Libp2pNodeConfig {
23    pub peer_id: PeerIdConfig,
24    pub port_reuse: bool,
25}
26
27pub type Swarm = libp2p::Swarm<Libp2pBehaviour>;
28
29pub struct Libp2pNode {
30    swarm: Swarm,
31}
32
33impl Libp2pNode {
34    pub(super) fn new(swarm: Swarm) -> Self {
35        Libp2pNode { swarm }
36    }
37
38    pub fn swarm(&self) -> &Swarm {
39        &self.swarm
40    }
41
42    pub fn swarm_mut(&mut self) -> &mut Swarm {
43        &mut self.swarm
44    }
45}
46
47impl TestNode for Libp2pNode {
48    fn peer_id(&self) -> PeerId {
49        (*self.swarm.local_peer_id())
50            .try_into()
51            .expect("Conversion failed")
52    }
53
54    fn libp2p_port(&self) -> u16 {
55        self.swarm.behaviour().port
56    }
57}
58
59pub type Libp2pEvent = SwarmEvent<Libp2pBehaviourEvent, THandlerErr<Libp2pBehaviour>>;
60
61#[derive(Debug, derive_more::From)]
62pub enum Libp2pBehaviourEvent {
63    // Identify(IdentifyEvent),
64    Gossipsub(gossipsub::Event),
65    Rpc((libp2p::identity::PeerId, libp2p_rpc_behaviour::Event)),
66    Identify(identify::Event),
67    Kademlia(kad::Event),
68}
69
70#[derive(NetworkBehaviour)]
71#[behaviour(to_swarm = "Libp2pBehaviourEvent")]
72pub struct Libp2pBehaviour {
73    pub gossipsub: gossipsub::Behaviour,
74    pub rpc: libp2p_rpc_behaviour::Behaviour,
75    pub identify: identify::Behaviour,
76    pub kademlia: kad::Behaviour<MemoryStore>,
77
78    #[behaviour(ignore)]
79    port: u16,
80
81    // map msg_id into (tag, version)
82    #[behaviour(ignore)]
83    pub ongoing: BTreeMap<(PeerId, u64), (RpcTag, u32)>,
84    // map from (peer, msg_id) into (stream_id, tag, version)
85    //
86    #[behaviour(ignore)]
87    pub ongoing_incoming: BTreeMap<(PeerId, u64), (StreamId, String, u32)>,
88}
89
90pub(crate) fn create_swarm(
91    secret_key: p2p::identity::SecretKey,
92    port: u16,
93    port_reuse: bool,
94    chain_id: &ChainId,
95) -> Result<Swarm, Box<dyn Error>> {
96    let identity_keys = libp2p::identity::Keypair::ed25519_from_bytes(secret_key.to_bytes())
97        .expect("secret key bytes must be valid");
98
99    let psk = libp2p::pnet::PreSharedKey::new(chain_id.preshared_key());
100    let identify = libp2p::identify::Behaviour::new(libp2p::identify::Config::new(
101        "ipfs/0.1.0".to_string(),
102        identity_keys.public(),
103    ));
104
105    let kademlia = {
106        let peer_id = identity_keys.public().to_peer_id();
107        let kad_config = {
108            let mut c = libp2p::kad::Config::default();
109            c.set_protocol_names(vec![libp2p::StreamProtocol::new("/coda/kad/1.0.0")]);
110            c
111        };
112
113        let mut kademlia = libp2p::kad::Behaviour::with_config(
114            peer_id,
115            libp2p::kad::store::MemoryStore::new(peer_id),
116            kad_config,
117        );
118
119        kademlia.set_mode(Some(libp2p::kad::Mode::Server));
120        kademlia
121    };
122
123    // gossipsub
124    let gossipsub = {
125        let message_authenticity = gossipsub::MessageAuthenticity::Signed(identity_keys.clone());
126        let gossipsub_config = gossipsub::ConfigBuilder::default()
127            .max_transmit_size(1024 * 1024 * 32)
128            .validate_messages()
129            .build()
130            .expect("Error building gossipsub");
131        let mut gossipsub: gossipsub::Behaviour =
132            gossipsub::Behaviour::new(message_authenticity, gossipsub_config)
133                .expect("Error creating behaviour");
134
135        gossipsub
136            .subscribe(&gossipsub::IdentTopic::new("coda/consensus-messages/0.0.1"))
137            .expect("subscribe");
138
139        gossipsub
140    };
141
142    // rpc
143    let rpc = {
144        use mina_p2p_messages::rpc::{
145            AnswerSyncLedgerQueryV2, GetAncestryV2, GetBestTipV2,
146            GetStagedLedgerAuxAndPendingCoinbasesAtHashV2, GetTransitionChainProofV1ForV2,
147            GetTransitionChainV2,
148        };
149
150        libp2p_rpc_behaviour::BehaviourBuilder::default()
151            .register_method::<GetBestTipV2>()
152            .register_method::<GetAncestryV2>()
153            .register_method::<GetStagedLedgerAuxAndPendingCoinbasesAtHashV2>()
154            .register_method::<AnswerSyncLedgerQueryV2>()
155            .register_method::<GetTransitionChainV2>()
156            .register_method::<GetTransitionChainProofV1ForV2>()
157            .build()
158    };
159
160    let behaviour = Libp2pBehaviour {
161        gossipsub,
162        identify,
163        kademlia,
164        rpc,
165        port,
166        ongoing: Default::default(),
167        ongoing_incoming: Default::default(),
168    };
169
170    let swarm = libp2p::SwarmBuilder::with_existing_identity(identity_keys)
171        .with_tokio()
172        .with_other_transport(|key| {
173            let noise_config = libp2p::noise::Config::new(key).expect("Error generating noise");
174            let mut yamux_config = libp2p::yamux::Config::default();
175
176            yamux_config.set_protocol_name("/coda/yamux/1.0.0");
177
178            let mut base_transport = libp2p::tcp::tokio::Transport::new(
179                libp2p::tcp::Config::default()
180                    .nodelay(true)
181                    .port_reuse(port_reuse),
182            );
183
184            base_transport
185                .listen_on(
186                    libp2p::core::transport::ListenerId::next(),
187                    libp2p::multiaddr::multiaddr!(Ip4([127, 0, 0, 1]), Tcp(port)),
188                )
189                .expect("listen");
190
191            base_transport
192                .and_then(move |socket, _| libp2p::pnet::PnetConfig::new(psk).handshake(socket))
193                .upgrade(libp2p::core::upgrade::Version::V1)
194                .authenticate(noise_config)
195                .multiplex(yamux_config)
196                .timeout(Duration::from_secs(60))
197        })?
198        .with_dns()?
199        .with_behaviour(|_| behaviour)?
200        .with_swarm_config(|config| {
201            config.with_idle_connection_timeout(Duration::from_millis(1000))
202        })
203        .build();
204
205    //swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server));
206
207    Ok(swarm)
208}