1use super::{pb, BroadcastMessageId};
2use crate::{token::BroadcastAlgorithm, ConnectionAddr, PeerId, StreamId};
3
4use libp2p_identity::ParseError;
5use mina_p2p_messages::gossip::GossipNetMessageV2;
6use openmina_core::{
7 p2p::P2pNetworkPubsubMessageCacheId,
8 snark::{Snark, SnarkJobId},
9 transaction::Transaction,
10};
11use redux::Timestamp;
12use serde::{Deserialize, Serialize};
13use std::{
14 collections::{BTreeMap, BTreeSet, VecDeque},
15 time::Duration,
16};
17
18use malloc_size_of_derive::MallocSizeOf;
19
20pub const IWANT_TIMEOUT_DURATION: Duration = Duration::from_secs(5);
21
22#[derive(Default, Serialize, Deserialize, Debug, Clone, MallocSizeOf)]
29pub struct P2pNetworkPubsubState {
30 #[with_malloc_size_of_func = "measurement::clients"]
32 pub clients: BTreeMap<PeerId, P2pNetworkPubsubClientState>,
33
34 pub seq: u64,
38
39 pub to_sign: VecDeque<pb::Message>,
41
42 pub seen: VecDeque<Vec<u8>>,
47
48 pub mcache: P2pNetworkPubsubMessageCache,
52
53 pub incoming_transactions: Vec<(Transaction, u32)>,
55
56 pub incoming_snarks: Vec<(Snark, u32)>,
58
59 #[with_malloc_size_of_func = "measurement::topics"]
61 pub topics: BTreeMap<String, BTreeMap<PeerId, P2pNetworkPubsubClientTopicState>>,
62
63 pub iwant: VecDeque<P2pNetworkPubsubIwantRequestCount>,
65}
66
67#[derive(Default, Serialize, Deserialize, Debug, Clone, MallocSizeOf)]
68pub struct P2pNetworkPubsubIwantRequestCount {
69 pub message_id: Vec<u8>,
70 #[with_malloc_size_of_func = "measurement::timestamps"]
71 pub count: Vec<Timestamp>,
72}
73
74impl P2pNetworkPubsubState {
75 pub fn prune_peer_state(&mut self, peer_id: &PeerId) {
76 self.clients.remove(peer_id);
77 }
78
79 pub fn filter_iwant_message_ids(&mut self, message_id: &[u8], timestamp: Timestamp) -> bool {
80 if self
81 .mcache
82 .get_message_from_raw_message_id(message_id)
83 .is_some()
84 {
85 return false;
86 }
87
88 let message_count = self
89 .iwant
90 .iter_mut()
91 .find(|message| message.message_id == message_id);
92
93 match message_count {
94 Some(message) => {
95 let message_counts = std::mem::take(&mut message.count);
96
97 message.count = message_counts
98 .into_iter()
99 .filter(|time| {
100 timestamp
101 .checked_sub(*time)
102 .is_some_and(|duration| duration < IWANT_TIMEOUT_DURATION)
103 })
104 .collect();
105
106 if message.count.len() < 3 {
107 message.count.push(timestamp);
108 return true;
109 }
110
111 false
112 }
113 None => {
114 let message_count = P2pNetworkPubsubIwantRequestCount {
115 message_id: message_id.to_vec(),
116 count: vec![timestamp],
117 };
118
119 self.iwant.push_back(message_count);
120 if self.iwant.len() > 10 {
121 self.iwant.pop_front();
122 }
123
124 true
125 }
126 }
127 }
128
129 pub fn clear_incoming(&mut self) {
130 self.incoming_transactions.clear();
131 self.incoming_snarks.clear();
132
133 self.incoming_transactions.shrink_to(0x20);
134 self.incoming_snarks.shrink_to(0x20);
135 }
136}
137
138#[derive(Serialize, Deserialize, Debug, Clone, MallocSizeOf)]
144pub struct P2pNetworkPubsubClientState {
145 pub protocol: BroadcastAlgorithm,
147
148 pub addr: ConnectionAddr,
150
151 pub outgoing_stream_id: Option<StreamId>,
156
157 pub message: pb::Rpc,
163
164 pub cache: P2pNetworkPubsubRecentlyPublishCache,
166
167 pub buffer: Vec<u8>,
172
173 pub incoming_messages: Vec<pb::Message>,
178}
179
180impl P2pNetworkPubsubClientState {
181 pub fn publish(&mut self, message: &pb::Message) {
182 let Ok(id) = compute_message_id(message) else {
183 self.message.publish.push(message.clone());
184 return;
185 };
186 if self.cache.map.insert(id) {
187 self.message.publish.push(message.clone());
188 }
189 self.cache.queue.push_back(id);
190 if self.cache.queue.len() > 50 {
191 if let Some(id) = self.cache.queue.pop_front() {
192 self.cache.map.remove(&id);
193 }
194 }
195 }
196
197 pub fn clear_buffer(&mut self) {
198 self.buffer.clear();
199 self.buffer.shrink_to(0x2000);
200 }
201
202 pub fn clear_incoming(&mut self) {
203 self.incoming_messages.clear();
204 self.incoming_messages.shrink_to(0x20)
205 }
206}
207
208#[derive(Default, Serialize, Deserialize, Debug, Clone)]
209pub struct P2pNetworkPubsubRecentlyPublishCache {
210 pub map: BTreeSet<P2pNetworkPubsubMessageCacheId>,
211 pub queue: VecDeque<P2pNetworkPubsubMessageCacheId>,
212}
213
214#[derive(Default, Serialize, Deserialize, Debug, Clone)]
216pub struct P2pNetworkPubsubMessageCache {
217 pub map: BTreeMap<P2pNetworkPubsubMessageCacheId, P2pNetworkPubsubMessageCacheMessage>,
218 pub queue: VecDeque<P2pNetworkPubsubMessageCacheId>,
219}
220
221#[derive(Serialize, Deserialize, Debug, Clone)]
222pub enum P2pNetworkPubsubMessageCacheMessage {
223 Init {
224 message: pb::Message,
225 content: GossipNetMessageV2,
226 peer_id: PeerId,
227 time: Timestamp,
228 },
229 PreValidatedBlockMessage {
230 block_hash: mina_p2p_messages::v2::StateHash,
231 message: pb::Message,
232 peer_id: PeerId,
233 time: Timestamp,
234 },
235 PreValidatedSnark {
236 job_id: SnarkJobId,
237 message: pb::Message,
238 peer_id: PeerId,
239 time: Timestamp,
240 },
241 PreValidated {
242 message: pb::Message,
243 peer_id: PeerId,
244 time: Timestamp,
245 },
246 Validated {
247 message: pb::Message,
248 peer_id: PeerId,
249 time: Timestamp,
250 },
251}
252
253pub fn compute_message_id(
256 message: &pb::Message,
257) -> Result<P2pNetworkPubsubMessageCacheId, ParseError> {
258 let source = source_from_message(message)?;
259
260 let seqno = message
261 .seqno
262 .as_ref()
263 .and_then(|b| <[u8; 8]>::try_from(b.as_slice()).ok())
264 .map(u64::from_be_bytes)
265 .unwrap_or_default();
266
267 Ok(P2pNetworkPubsubMessageCacheId { source, seqno })
268}
269
270macro_rules! enum_field {
271 ($field:ident: $field_type:ty) => {
272 pub fn $field(&self) -> &$field_type {
273 match self {
274 Self::Init { $field, .. }
275 | Self::PreValidated { $field, .. }
276 | Self::PreValidatedBlockMessage { $field, .. }
277 | Self::PreValidatedSnark { $field, .. }
278 | Self::Validated { $field, .. } => $field,
279 }
280 }
281 };
282}
283
284impl P2pNetworkPubsubMessageCacheMessage {
285 enum_field!(message: pb::Message);
286 enum_field!(time: Timestamp);
287 enum_field!(peer_id: PeerId);
288}
289
290impl P2pNetworkPubsubMessageCache {
291 const CAPACITY: usize = 100;
292
293 pub fn put(
294 &mut self,
295 message: pb::Message,
296 content: GossipNetMessageV2,
297 peer_id: PeerId,
298 time: Timestamp,
299 ) -> Result<P2pNetworkPubsubMessageCacheId, ParseError> {
300 let id = compute_message_id(&message)?;
301 self.map.insert(
302 id,
303 P2pNetworkPubsubMessageCacheMessage::Init {
304 message,
305 content,
306 time,
307 peer_id,
308 },
309 );
310
311 self.queue.push_back(id);
312 if self.queue.len() > Self::CAPACITY {
313 if let Some(id) = self.queue.pop_front() {
314 self.map.remove(&id);
315 }
316 }
317 Ok(id)
318 }
319
320 pub fn get_message(&self, id: &P2pNetworkPubsubMessageCacheId) -> Option<&GossipNetMessageV2> {
321 let message = self.map.get(id)?;
322 match message {
323 P2pNetworkPubsubMessageCacheMessage::Init { content, .. } => Some(content),
324 _ => None,
325 }
326 }
327
328 pub fn contains_broadcast_id(&self, message_id: &BroadcastMessageId) -> bool {
329 match message_id {
330 BroadcastMessageId::BlockHash { hash } => self
331 .map
332 .values()
333 .any(|message| matches!(message, P2pNetworkPubsubMessageCacheMessage::PreValidatedBlockMessage { block_hash, .. } if block_hash == hash)),
334 BroadcastMessageId::MessageId { message_id } => {
335 self.map.contains_key(message_id)
336 },
337 BroadcastMessageId::Snark { job_id: snark_job_id } => {
338 self
339 .map
340 .values()
341 .any(|message| matches!(message, P2pNetworkPubsubMessageCacheMessage::PreValidatedSnark { job_id,.. } if job_id == snark_job_id))
342 },
343 }
344 }
345
346 pub fn get_message_id_and_message(
347 &mut self,
348 message_id: &BroadcastMessageId,
349 ) -> Option<(
350 P2pNetworkPubsubMessageCacheId,
351 &mut P2pNetworkPubsubMessageCacheMessage,
352 )> {
353 match message_id {
354 BroadcastMessageId::BlockHash { hash } => {
355 self.map
356 .iter_mut()
357 .find_map(|(message_id, message)| match message {
358 P2pNetworkPubsubMessageCacheMessage::PreValidatedBlockMessage {
359 block_hash,
360 ..
361 } if block_hash == hash => Some((*message_id, message)),
362 _ => None,
363 })
364 }
365 BroadcastMessageId::MessageId { message_id } => self
366 .map
367 .get_mut(message_id)
368 .map(|content| (*message_id, content)),
369 BroadcastMessageId::Snark {
370 job_id: snark_job_id,
371 } => {
372 self.map
373 .iter_mut()
374 .find_map(|(message_id, message)| match message {
375 P2pNetworkPubsubMessageCacheMessage::PreValidatedSnark {
376 job_id, ..
377 } if job_id == snark_job_id => Some((*message_id, message)),
378 _ => None,
379 })
380 }
381 }
382 }
383
384 pub fn remove_message(
385 &mut self,
386 message_id: P2pNetworkPubsubMessageCacheId,
387 ) -> Option<P2pNetworkPubsubMessageCacheMessage> {
388 let message = self.map.remove(&message_id);
389 if let Some(position) = self.queue.iter().position(|id| id == &message_id) {
390 self.queue.remove(position);
391 }
392 message
393 }
394
395 pub fn get_message_from_raw_message_id(
396 &self,
397 message_id: &[u8],
398 ) -> Option<&P2pNetworkPubsubMessageCacheMessage> {
399 self.map.iter().find_map(|(key, value)| {
400 if key.to_raw_bytes() == message_id {
401 Some(value)
402 } else {
403 None
404 }
405 })
406 }
407}
408
409pub fn source_from_message(message: &pb::Message) -> Result<libp2p_identity::PeerId, ParseError> {
410 let source_bytes = message
411 .from
412 .as_ref()
413 .map(AsRef::as_ref)
414 .unwrap_or(&[0, 1, 0][..]);
415
416 libp2p_identity::PeerId::from_bytes(source_bytes)
417}
418
419#[derive(Default, Serialize, Deserialize, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
420pub struct P2pNetworkPubsubClientTopicState {
421 pub mesh: P2pNetworkPubsubClientMeshAddingState,
422}
423
424#[derive(Default, Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
425pub enum P2pNetworkPubsubClientMeshAddingState {
426 #[default]
427 Initial,
428 TheyRefused,
429 WeRefused,
430 Added,
431}
432
433impl P2pNetworkPubsubClientState {
434 pub fn message_is_empty(&self) -> bool {
435 self.message.subscriptions.is_empty()
436 && self.message.publish.is_empty()
437 && self.message.control.is_none()
438 }
439}
440
441impl P2pNetworkPubsubClientTopicState {
442 pub fn on_mesh(&self) -> bool {
443 matches!(&self.mesh, P2pNetworkPubsubClientMeshAddingState::Added)
444 }
445}
446
447mod measurement {
448 use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
449 use std::mem;
450
451 use super::*;
452
453 pub fn clients(
454 val: &BTreeMap<PeerId, P2pNetworkPubsubClientState>,
455 ops: &mut MallocSizeOfOps,
456 ) -> usize {
457 val.values().map(|v| v.size_of(ops)).sum()
458 }
459
460 pub fn topics(
461 val: &BTreeMap<String, BTreeMap<PeerId, P2pNetworkPubsubClientTopicState>>,
462 ops: &mut MallocSizeOfOps,
463 ) -> usize {
464 val.iter()
465 .map(|(k, v)| k.size_of(ops) + v.size_of(ops))
466 .sum()
467 }
468
469 pub fn timestamps(val: &Vec<Timestamp>, _ops: &mut MallocSizeOfOps) -> usize {
470 val.capacity() * mem::size_of::<Timestamp>()
471 }
472
473 impl MallocSizeOf for P2pNetworkPubsubRecentlyPublishCache {
474 fn size_of(&self, _ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
475 let map_size = self.map.len() * size_of::<P2pNetworkPubsubMessageCacheId>();
476 let queue_size = self.queue.capacity() * size_of::<P2pNetworkPubsubMessageCacheId>();
477 map_size + queue_size
478 }
479 }
480
481 impl MallocSizeOf for P2pNetworkPubsubMessageCache {
482 fn size_of(&self, _ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
483 let map_size = self.map.len()
484 * (size_of::<P2pNetworkPubsubMessageCacheId>()
485 + size_of::<P2pNetworkPubsubMessageCacheMessage>());
486 let queue_size = self.queue.capacity() * size_of::<P2pNetworkPubsubMessageCacheId>();
487 map_size + queue_size
488 }
489 }
490
491 impl MallocSizeOf for P2pNetworkPubsubClientTopicState {
492 fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize {
493 0
494 }
495 }
496}