p2p/network/pubsub/
p2p_network_pubsub_state.rs

1use super::{pb, BroadcastMessageId};
2use crate::{token::BroadcastAlgorithm, ConnectionAddr, PeerId, StreamId};
3
4use libp2p_identity::ParseError;
5use mina_p2p_messages::gossip::GossipNetMessageV2;
6use openmina_core::{
7    p2p::P2pNetworkPubsubMessageCacheId,
8    snark::{Snark, SnarkJobId},
9    transaction::Transaction,
10};
11use redux::Timestamp;
12use serde::{Deserialize, Serialize};
13use std::{
14    collections::{BTreeMap, BTreeSet, VecDeque},
15    time::Duration,
16};
17
18use malloc_size_of_derive::MallocSizeOf;
19
20pub const IWANT_TIMEOUT_DURATION: Duration = Duration::from_secs(5);
21
22/// State of the P2P Network PubSub system.
23///
24/// This struct maintains information about connected peers, message sequencing,
25/// message caching, and topic subscriptions. It handles incoming and outgoing
26/// messages, manages the mesh network topology, and ensures efficient message
27/// broadcasting across the network.
28#[derive(Default, Serialize, Deserialize, Debug, Clone, MallocSizeOf)]
29pub struct P2pNetworkPubsubState {
30    /// State of each connected peer.
31    #[with_malloc_size_of_func = "measurement::clients"]
32    pub clients: BTreeMap<PeerId, P2pNetworkPubsubClientState>,
33
34    /// Current message sequence number.
35    ///
36    /// Increments with each new message to ensure proper ordering and uniqueness.
37    pub seq: u64,
38
39    /// Messages awaiting cryptographic signing.
40    pub to_sign: VecDeque<pb::Message>,
41
42    /// Recently seen message identifiers to prevent duplication.
43    ///
44    /// Keeps a limited history of message signatures to avoid processing
45    /// the same message multiple times.
46    pub seen: VecDeque<Vec<u8>>,
47
48    /// Cache of published messages for efficient retrieval and broadcasting.
49    ///
50    /// For quick access and reducing redundant data transmission across peers.
51    pub mcache: P2pNetworkPubsubMessageCache,
52
53    /// Incoming transactions from peers along with their nonces.
54    pub incoming_transactions: Vec<(Transaction, u32)>,
55
56    /// Incoming snarks from peers along with their nonces.
57    pub incoming_snarks: Vec<(Snark, u32)>,
58
59    /// Topics and their subscribed peers.
60    #[with_malloc_size_of_func = "measurement::topics"]
61    pub topics: BTreeMap<String, BTreeMap<PeerId, P2pNetworkPubsubClientTopicState>>,
62
63    /// `iwant` requests, tracking the number of times peers have expressed interest in specific messages.
64    pub iwant: VecDeque<P2pNetworkPubsubIwantRequestCount>,
65}
66
67#[derive(Default, Serialize, Deserialize, Debug, Clone, MallocSizeOf)]
68pub struct P2pNetworkPubsubIwantRequestCount {
69    pub message_id: Vec<u8>,
70    #[with_malloc_size_of_func = "measurement::timestamps"]
71    pub count: Vec<Timestamp>,
72}
73
74impl P2pNetworkPubsubState {
75    pub fn prune_peer_state(&mut self, peer_id: &PeerId) {
76        self.clients.remove(peer_id);
77    }
78
79    pub fn filter_iwant_message_ids(&mut self, message_id: &[u8], timestamp: Timestamp) -> bool {
80        if self
81            .mcache
82            .get_message_from_raw_message_id(message_id)
83            .is_some()
84        {
85            return false;
86        }
87
88        let message_count = self
89            .iwant
90            .iter_mut()
91            .find(|message| message.message_id == message_id);
92
93        match message_count {
94            Some(message) => {
95                let message_counts = std::mem::take(&mut message.count);
96
97                message.count = message_counts
98                    .into_iter()
99                    .filter(|time| {
100                        timestamp
101                            .checked_sub(*time)
102                            .is_some_and(|duration| duration < IWANT_TIMEOUT_DURATION)
103                    })
104                    .collect();
105
106                if message.count.len() < 3 {
107                    message.count.push(timestamp);
108                    return true;
109                }
110
111                false
112            }
113            None => {
114                let message_count = P2pNetworkPubsubIwantRequestCount {
115                    message_id: message_id.to_vec(),
116                    count: vec![timestamp],
117                };
118
119                self.iwant.push_back(message_count);
120                if self.iwant.len() > 10 {
121                    self.iwant.pop_front();
122                }
123
124                true
125            }
126        }
127    }
128
129    pub fn clear_incoming(&mut self) {
130        self.incoming_transactions.clear();
131        self.incoming_snarks.clear();
132
133        self.incoming_transactions.shrink_to(0x20);
134        self.incoming_snarks.shrink_to(0x20);
135    }
136}
137
138/// State of a pubsub client connected to a peer.
139///
140/// This struct maintains essential information about the client's protocol,
141/// connection details, message buffers, and caching mechanisms. It facilitates
142/// efficient message handling and broadcasting within the pubsub system.
143#[derive(Serialize, Deserialize, Debug, Clone, MallocSizeOf)]
144pub struct P2pNetworkPubsubClientState {
145    /// Broadcast algorithm used for this client.
146    pub protocol: BroadcastAlgorithm,
147
148    /// Connection address of the peer.
149    pub addr: ConnectionAddr,
150
151    /// Outgoing stream identifier, if any.
152    ///
153    /// - `Some(StreamId)`: Indicates an active outgoing stream.
154    /// - `None`: No outgoing stream is currently established.
155    pub outgoing_stream_id: Option<StreamId>,
156
157    /// Current RPC message being constructed or processed.
158    ///
159    /// - `subscriptions`: List of subscription options for various topics.
160    /// - `publish`: Messages queued for publishing.
161    /// - `control`: Control commands for managing the mesh network.
162    pub message: pb::Rpc,
163
164    /// Cache of recently published messages.
165    pub cache: P2pNetworkPubsubRecentlyPublishCache,
166
167    /// Buffer for incoming data fragments.
168    ///
169    /// Stores partial data received from peers, facilitating the assembly of complete
170    /// messages when all fragments are received.
171    pub buffer: Vec<u8>,
172
173    /// Collection of incoming messages from the peer.
174    ///
175    /// Holds fully decoded `pb::Message` instances received from the peer,
176    /// ready for further handling such as validation, caching, and broadcasting.
177    pub incoming_messages: Vec<pb::Message>,
178}
179
180impl P2pNetworkPubsubClientState {
181    pub fn publish(&mut self, message: &pb::Message) {
182        let Ok(id) = compute_message_id(message) else {
183            self.message.publish.push(message.clone());
184            return;
185        };
186        if self.cache.map.insert(id) {
187            self.message.publish.push(message.clone());
188        }
189        self.cache.queue.push_back(id);
190        if self.cache.queue.len() > 50 {
191            if let Some(id) = self.cache.queue.pop_front() {
192                self.cache.map.remove(&id);
193            }
194        }
195    }
196
197    pub fn clear_buffer(&mut self) {
198        self.buffer.clear();
199        self.buffer.shrink_to(0x2000);
200    }
201
202    pub fn clear_incoming(&mut self) {
203        self.incoming_messages.clear();
204        self.incoming_messages.shrink_to(0x20)
205    }
206}
207
208#[derive(Default, Serialize, Deserialize, Debug, Clone)]
209pub struct P2pNetworkPubsubRecentlyPublishCache {
210    pub map: BTreeSet<P2pNetworkPubsubMessageCacheId>,
211    pub queue: VecDeque<P2pNetworkPubsubMessageCacheId>,
212}
213
214// TODO: store blocks, snarks and txs separately
215#[derive(Default, Serialize, Deserialize, Debug, Clone)]
216pub struct P2pNetworkPubsubMessageCache {
217    pub map: BTreeMap<P2pNetworkPubsubMessageCacheId, P2pNetworkPubsubMessageCacheMessage>,
218    pub queue: VecDeque<P2pNetworkPubsubMessageCacheId>,
219}
220
221#[derive(Serialize, Deserialize, Debug, Clone)]
222pub enum P2pNetworkPubsubMessageCacheMessage {
223    Init {
224        message: pb::Message,
225        content: GossipNetMessageV2,
226        peer_id: PeerId,
227        time: Timestamp,
228    },
229    PreValidatedBlockMessage {
230        block_hash: mina_p2p_messages::v2::StateHash,
231        message: pb::Message,
232        peer_id: PeerId,
233        time: Timestamp,
234    },
235    PreValidatedSnark {
236        job_id: SnarkJobId,
237        message: pb::Message,
238        peer_id: PeerId,
239        time: Timestamp,
240    },
241    PreValidated {
242        message: pb::Message,
243        peer_id: PeerId,
244        time: Timestamp,
245    },
246    Validated {
247        message: pb::Message,
248        peer_id: PeerId,
249        time: Timestamp,
250    },
251}
252
253// TODO: what if wasm32?
254// How to test it?
255pub fn compute_message_id(
256    message: &pb::Message,
257) -> Result<P2pNetworkPubsubMessageCacheId, ParseError> {
258    let source = source_from_message(message)?;
259
260    let seqno = message
261        .seqno
262        .as_ref()
263        .and_then(|b| <[u8; 8]>::try_from(b.as_slice()).ok())
264        .map(u64::from_be_bytes)
265        .unwrap_or_default();
266
267    Ok(P2pNetworkPubsubMessageCacheId { source, seqno })
268}
269
270macro_rules! enum_field {
271    ($field:ident: $field_type:ty) => {
272        pub fn $field(&self) -> &$field_type {
273            match self {
274                Self::Init { $field, .. }
275                | Self::PreValidated { $field, .. }
276                | Self::PreValidatedBlockMessage { $field, .. }
277                | Self::PreValidatedSnark { $field, .. }
278                | Self::Validated { $field, .. } => $field,
279            }
280        }
281    };
282}
283
284impl P2pNetworkPubsubMessageCacheMessage {
285    enum_field!(message: pb::Message);
286    enum_field!(time: Timestamp);
287    enum_field!(peer_id: PeerId);
288}
289
290impl P2pNetworkPubsubMessageCache {
291    const CAPACITY: usize = 100;
292
293    pub fn put(
294        &mut self,
295        message: pb::Message,
296        content: GossipNetMessageV2,
297        peer_id: PeerId,
298        time: Timestamp,
299    ) -> Result<P2pNetworkPubsubMessageCacheId, ParseError> {
300        let id = compute_message_id(&message)?;
301        self.map.insert(
302            id,
303            P2pNetworkPubsubMessageCacheMessage::Init {
304                message,
305                content,
306                time,
307                peer_id,
308            },
309        );
310
311        self.queue.push_back(id);
312        if self.queue.len() > Self::CAPACITY {
313            if let Some(id) = self.queue.pop_front() {
314                self.map.remove(&id);
315            }
316        }
317        Ok(id)
318    }
319
320    pub fn get_message(&self, id: &P2pNetworkPubsubMessageCacheId) -> Option<&GossipNetMessageV2> {
321        let message = self.map.get(id)?;
322        match message {
323            P2pNetworkPubsubMessageCacheMessage::Init { content, .. } => Some(content),
324            _ => None,
325        }
326    }
327
328    pub fn contains_broadcast_id(&self, message_id: &BroadcastMessageId) -> bool {
329        match message_id {
330            BroadcastMessageId::BlockHash { hash } => self
331                .map
332                .values()
333                .any(|message| matches!(message, P2pNetworkPubsubMessageCacheMessage::PreValidatedBlockMessage { block_hash, .. } if block_hash == hash)),
334            BroadcastMessageId::MessageId { message_id } => {
335                self.map.contains_key(message_id)
336            },
337            BroadcastMessageId::Snark { job_id: snark_job_id } => {
338                self
339                    .map
340                    .values()
341                    .any(|message| matches!(message, P2pNetworkPubsubMessageCacheMessage::PreValidatedSnark { job_id,.. } if job_id == snark_job_id))
342            },
343        }
344    }
345
346    pub fn get_message_id_and_message(
347        &mut self,
348        message_id: &BroadcastMessageId,
349    ) -> Option<(
350        P2pNetworkPubsubMessageCacheId,
351        &mut P2pNetworkPubsubMessageCacheMessage,
352    )> {
353        match message_id {
354            BroadcastMessageId::BlockHash { hash } => {
355                self.map
356                    .iter_mut()
357                    .find_map(|(message_id, message)| match message {
358                        P2pNetworkPubsubMessageCacheMessage::PreValidatedBlockMessage {
359                            block_hash,
360                            ..
361                        } if block_hash == hash => Some((*message_id, message)),
362                        _ => None,
363                    })
364            }
365            BroadcastMessageId::MessageId { message_id } => self
366                .map
367                .get_mut(message_id)
368                .map(|content| (*message_id, content)),
369            BroadcastMessageId::Snark {
370                job_id: snark_job_id,
371            } => {
372                self.map
373                    .iter_mut()
374                    .find_map(|(message_id, message)| match message {
375                        P2pNetworkPubsubMessageCacheMessage::PreValidatedSnark {
376                            job_id, ..
377                        } if job_id == snark_job_id => Some((*message_id, message)),
378                        _ => None,
379                    })
380            }
381        }
382    }
383
384    pub fn remove_message(
385        &mut self,
386        message_id: P2pNetworkPubsubMessageCacheId,
387    ) -> Option<P2pNetworkPubsubMessageCacheMessage> {
388        let message = self.map.remove(&message_id);
389        if let Some(position) = self.queue.iter().position(|id| id == &message_id) {
390            self.queue.remove(position);
391        }
392        message
393    }
394
395    pub fn get_message_from_raw_message_id(
396        &self,
397        message_id: &[u8],
398    ) -> Option<&P2pNetworkPubsubMessageCacheMessage> {
399        self.map.iter().find_map(|(key, value)| {
400            if key.to_raw_bytes() == message_id {
401                Some(value)
402            } else {
403                None
404            }
405        })
406    }
407}
408
409pub fn source_from_message(message: &pb::Message) -> Result<libp2p_identity::PeerId, ParseError> {
410    let source_bytes = message
411        .from
412        .as_ref()
413        .map(AsRef::as_ref)
414        .unwrap_or(&[0, 1, 0][..]);
415
416    libp2p_identity::PeerId::from_bytes(source_bytes)
417}
418
419#[derive(Default, Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
420pub struct P2pNetworkPubsubClientTopicState {
421    pub mesh: P2pNetworkPubsubClientMeshAddingState,
422}
423
424#[derive(Default, Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
425pub enum P2pNetworkPubsubClientMeshAddingState {
426    #[default]
427    Initial,
428    TheyRefused,
429    WeRefused,
430    Added,
431}
432
433impl P2pNetworkPubsubClientState {
434    pub fn message_is_empty(&self) -> bool {
435        self.message.subscriptions.is_empty()
436            && self.message.publish.is_empty()
437            && self.message.control.is_none()
438    }
439}
440
441impl P2pNetworkPubsubClientTopicState {
442    pub fn on_mesh(&self) -> bool {
443        matches!(&self.mesh, P2pNetworkPubsubClientMeshAddingState::Added)
444    }
445}
446
447mod measurement {
448    use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
449    use std::mem;
450
451    use super::*;
452
453    pub fn clients(
454        val: &BTreeMap<PeerId, P2pNetworkPubsubClientState>,
455        ops: &mut MallocSizeOfOps,
456    ) -> usize {
457        val.values().map(|v| v.size_of(ops)).sum()
458    }
459
460    pub fn topics(
461        val: &BTreeMap<String, BTreeMap<PeerId, P2pNetworkPubsubClientTopicState>>,
462        ops: &mut MallocSizeOfOps,
463    ) -> usize {
464        val.iter()
465            .map(|(k, v)| k.size_of(ops) + v.size_of(ops))
466            .sum()
467    }
468
469    pub fn timestamps(val: &Vec<Timestamp>, _ops: &mut MallocSizeOfOps) -> usize {
470        val.capacity() * mem::size_of::<Timestamp>()
471    }
472
473    impl MallocSizeOf for P2pNetworkPubsubRecentlyPublishCache {
474        fn size_of(&self, _ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
475            let map_size = self.map.len() * size_of::<P2pNetworkPubsubMessageCacheId>();
476            let queue_size = self.queue.capacity() * size_of::<P2pNetworkPubsubMessageCacheId>();
477            map_size + queue_size
478        }
479    }
480
481    impl MallocSizeOf for P2pNetworkPubsubMessageCache {
482        fn size_of(&self, _ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
483            let map_size = self.map.len()
484                * (size_of::<P2pNetworkPubsubMessageCacheId>()
485                    + size_of::<P2pNetworkPubsubMessageCacheMessage>());
486            let queue_size = self.queue.capacity() * size_of::<P2pNetworkPubsubMessageCacheId>();
487            map_size + queue_size
488        }
489    }
490
491    impl MallocSizeOf for P2pNetworkPubsubClientTopicState {
492        fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize {
493            0
494        }
495    }
496}