1use std::{collections::btree_map::Entry, time::Duration};
2
3use binprot::BinProtRead;
4use mina_p2p_messages::{
5 gossip::{self, GossipNetMessageV2},
6 v2::NetworkPoolSnarkPoolDiffVersionedStableV2,
7};
8use openmina_core::{
9 block::BlockWithHash, bug_condition, fuzz_maybe, fuzzed_maybe, snark::Snark, Substate,
10};
11use redux::{Dispatcher, Timestamp};
12
13use crate::{
14 channels::{snark::P2pChannelsSnarkAction, transaction::P2pChannelsTransactionAction},
15 disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
16 peer::P2pPeerAction,
17 Data, P2pConfig, P2pNetworkYamuxAction, P2pState, PeerId,
18};
19
20use super::{
21 p2p_network_pubsub_state::{
22 source_from_message, P2pNetworkPubsubClientMeshAddingState,
23 P2pNetworkPubsubMessageCacheMessage,
24 },
25 pb::{self, Message},
26 P2pNetworkPubsubAction, P2pNetworkPubsubClientState, P2pNetworkPubsubEffectfulAction,
27 P2pNetworkPubsubMessageCacheId, P2pNetworkPubsubState, TOPIC,
28};
29
30const MAX_MESSAGE_KEEP_DURATION: Duration = Duration::from_secs(300);
31
32impl P2pNetworkPubsubState {
33 pub fn reducer<Action, State>(
34 mut state_context: Substate<Action, State, Self>,
35 action: redux::ActionWithMeta<P2pNetworkPubsubAction>,
36 ) -> Result<(), String>
37 where
38 State: crate::P2pStateTrait,
39 Action: crate::P2pActionTrait<State>,
40 {
41 let pubsub_state = state_context.get_substate_mut()?;
42 let (action, meta) = action.split();
43 let time = meta.time();
44
45 match action {
46 P2pNetworkPubsubAction::NewStream {
47 incoming: true,
48 peer_id,
49 addr,
50 protocol,
51 ..
52 } => {
53 let entry = pubsub_state.clients.entry(peer_id);
54 let outgoing_stream_id = match &entry {
56 Entry::Occupied(v) => v.get().outgoing_stream_id,
57 Entry::Vacant(_) => None,
58 };
59 let state = entry.or_insert_with(|| P2pNetworkPubsubClientState {
60 protocol,
61 addr,
62 outgoing_stream_id,
63 message: pb::Rpc {
64 subscriptions: vec![],
65 publish: vec![],
66 control: None,
67 },
68 cache: Default::default(),
69 buffer: vec![],
70 incoming_messages: vec![],
71 });
72 state.protocol = protocol;
73 state.addr = addr;
74
75 pubsub_state
76 .topics
77 .entry(super::TOPIC.to_owned())
78 .or_default()
79 .insert(peer_id, Default::default());
80
81 Ok(())
82 }
83 P2pNetworkPubsubAction::NewStream {
84 incoming: false,
85 peer_id,
86 stream_id,
87 addr,
88 protocol,
89 } => {
90 let state = pubsub_state.clients.entry(peer_id).or_insert_with(|| {
91 P2pNetworkPubsubClientState {
92 protocol,
93 addr,
94 outgoing_stream_id: Some(stream_id),
95 message: pb::Rpc {
96 subscriptions: vec![],
97 publish: vec![],
98 control: None,
99 },
100 cache: Default::default(),
101 buffer: vec![],
102 incoming_messages: vec![],
103 }
104 });
105 state.outgoing_stream_id = Some(stream_id);
106 state.protocol = protocol;
107 state.addr = addr;
108
109 pubsub_state
110 .topics
111 .entry(TOPIC.to_owned())
112 .or_default()
113 .insert(peer_id, Default::default());
114
115 if let Some(state) = pubsub_state.clients.get_mut(&peer_id) {
116 state.message.subscriptions.push(pb::rpc::SubOpts {
117 subscribe: Some(true),
118 topic_id: Some(TOPIC.to_owned()),
119 });
120 }
121
122 let (dispatcher, state) = state_context.into_dispatcher_and_state();
123 let config: &P2pConfig = state.substate()?;
124 let state: &P2pNetworkPubsubState = state.substate()?;
125
126 let Some(map) = state.topics.get(TOPIC) else {
127 return Ok(());
129 };
130 dispatcher.push(P2pNetworkPubsubAction::OutgoingMessage { peer_id });
131 let mesh_size = map.values().filter(|s| s.on_mesh()).count();
132 if mesh_size < config.meshsub.outbound_degree_desired {
133 dispatcher.push(P2pNetworkPubsubAction::Graft {
134 peer_id,
135 topic_id: TOPIC.to_owned(),
136 });
137 }
138
139 Ok(())
140 }
141 P2pNetworkPubsubAction::IncomingData {
142 peer_id,
143 data,
144 seen_limit,
145 addr,
146 ..
147 } => {
148 pubsub_state.reduce_incoming_data(&peer_id, data, meta.time())?;
149
150 let dispatcher = state_context.into_dispatcher();
151
152 dispatcher.push(P2pNetworkPubsubAction::ValidateIncomingMessages {
153 peer_id,
154 seen_limit,
155 addr,
156 });
157
158 Ok(())
159 }
160 P2pNetworkPubsubAction::ValidateIncomingMessages {
161 peer_id,
162 seen_limit,
163 addr,
164 } => {
165 let Some(state) = pubsub_state.clients.get_mut(&peer_id) else {
166 return Ok(());
169 };
170 let messages = std::mem::take(&mut state.incoming_messages);
171
172 let dispatcher = state_context.into_dispatcher();
173
174 dispatcher.push(P2pNetworkPubsubEffectfulAction::ValidateIncomingMessages {
175 peer_id,
176 seen_limit,
177 addr,
178 messages,
179 });
180
181 Ok(())
182 }
183 P2pNetworkPubsubAction::IncomingMessage {
184 peer_id,
185 message,
186 seen_limit,
187 } => {
188 if source_from_message(&message).is_err() {
190 let dispatcher = state_context.into_dispatcher();
191 dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
192 message_id: None,
193 peer_id: Some(peer_id),
194 reason: "Invalid originator in message".to_owned(),
195 });
196 return Ok(());
197 }
198
199 let reduce_incoming_result =
201 pubsub_state.reduce_incoming_message(&message, seen_limit);
202
203 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
204 let p2p_state: &P2pState = global_state.substate()?;
205 let state: &Self = global_state.substate()?;
206
207 dispatcher.push(P2pNetworkPubsubAction::IncomingMessageCleanup { peer_id });
208
209 let message_content = reduce_incoming_result?;
210
211 for (topic_id, map) in &state.topics {
212 let mesh_size = map.values().filter(|s| s.on_mesh()).count();
213 let could_accept = mesh_size < p2p_state.config.meshsub.outbound_degree_high;
214
215 if !could_accept {
216 if let Some(topic_state) = map.get(&peer_id) {
217 if topic_state.on_mesh() {
218 let topic_id = topic_id.clone();
219 dispatcher.push(P2pNetworkPubsubAction::Prune { peer_id, topic_id })
220 }
221 }
222 }
223 }
224
225 if let Some(message_content) = message_content {
227 dispatcher.push(P2pNetworkPubsubAction::HandleIncomingMessage {
228 message,
229 message_content,
230 peer_id,
231 });
232 } else {
233 dispatcher.push(P2pNetworkPubsubAction::IgnoreMessage {
234 message_id: None,
235 reason: "Message already seen".to_owned(),
236 });
237 };
238
239 Ok(())
240 }
241 P2pNetworkPubsubAction::HandleIncomingMessage {
242 message,
243 message_content,
244 peer_id,
245 } => {
246 let Ok(message_id) =
247 pubsub_state
248 .mcache
249 .put(message, message_content, peer_id, time)
250 else {
251 bug_condition!("Unable to add message to `mcache`");
252 return Ok(());
253 };
254
255 let (dispatcher, state) = state_context.into_dispatcher_and_state();
256 let p2p_state: &P2pState = state.substate()?;
257
258 if let Some(callback) = p2p_state.callbacks.on_p2p_pubsub_message_received.clone() {
259 dispatcher.push_callback(callback, message_id);
260 } else {
261 dispatcher.push(P2pNetworkPubsubAction::ValidateIncomingMessage { message_id });
262 }
263 Ok(())
264 }
265 P2pNetworkPubsubAction::IncomingMessageCleanup { peer_id } => {
266 pubsub_state.clear_incoming();
267
268 let Some(client_state) = pubsub_state.clients.get_mut(&peer_id) else {
269 bug_condition!(
270 "State not found for action P2pNetworkPubsubAction::IncomingMessageCleanup"
271 );
272 return Ok(());
273 };
274
275 client_state.clear_incoming();
276
277 Ok(())
278 }
279 P2pNetworkPubsubAction::Graft { peer_id, topic_id } => {
281 let Some(state) = pubsub_state
282 .topics
283 .get_mut(&topic_id)
284 .and_then(|m| m.get_mut(&peer_id))
285 else {
286 return Ok(());
287 };
288 state.mesh = P2pNetworkPubsubClientMeshAddingState::Added;
289
290 if let Some(state) = pubsub_state.clients.get_mut(&peer_id) {
291 let control = state
292 .message
293 .control
294 .get_or_insert_with(|| pb::ControlMessage {
295 ihave: vec![],
296 iwant: vec![],
297 graft: vec![],
298 prune: vec![],
299 });
300 control.graft.push(pb::ControlGraft {
301 topic_id: Some(topic_id),
302 });
303 }
304
305 let dispatcher = state_context.into_dispatcher();
306 dispatcher.push(P2pNetworkPubsubAction::OutgoingMessage { peer_id });
307 Ok(())
308 }
309 P2pNetworkPubsubAction::Prune { peer_id, topic_id } => {
310 let Some(state) = pubsub_state
311 .topics
312 .get_mut(&topic_id)
313 .and_then(|m| m.get_mut(&peer_id))
314 else {
315 bug_condition!("State not found for action: `P2pNetworkPubsubAction::Prune`");
316 return Ok(());
317 };
318 state.mesh = P2pNetworkPubsubClientMeshAddingState::WeRefused;
319
320 if let Some(state) = pubsub_state.clients.get_mut(&peer_id) {
321 let control = state
322 .message
323 .control
324 .get_or_insert_with(|| pb::ControlMessage {
325 ihave: vec![],
326 iwant: vec![],
327 graft: vec![],
328 prune: vec![],
329 });
330 control.prune.push(pb::ControlPrune {
331 topic_id: Some(topic_id),
332 peers: vec![pb::PeerInfo {
333 peer_id: None,
334 signed_peer_record: None,
335 }],
336 backoff: None,
337 });
338 }
339
340 let dispatcher = state_context.into_dispatcher();
341 dispatcher.push(P2pNetworkPubsubAction::OutgoingMessage { peer_id });
342 Ok(())
343 }
344 P2pNetworkPubsubAction::OutgoingMessage { peer_id } => {
345 let msg = if let Some(v) = pubsub_state.clients.get_mut(&peer_id) {
346 &v.message
347 } else {
348 bug_condition!(
349 "Invalid state for action: `P2pNetworkPubsubAction::OutgoingMessage`"
350 );
351 return Ok(());
352 };
353
354 let mut data = vec![];
355 let result = prost::Message::encode_length_delimited(msg, &mut data)
356 .map(|_| data)
357 .map_err(|_| msg.clone());
358
359 let dispatcher = state_context.into_dispatcher();
360
361 match result {
362 Err(msg) => {
363 dispatcher
364 .push(P2pNetworkPubsubAction::OutgoingMessageError { msg, peer_id });
365 }
366 Ok(data) => {
367 dispatcher.push(P2pNetworkPubsubAction::OutgoingData {
368 data: Data::from(data),
369 peer_id,
370 });
371 }
372 }
373
374 dispatcher.push(P2pNetworkPubsubAction::OutgoingMessageClear { peer_id });
376
377 Ok(())
378 }
379 P2pNetworkPubsubAction::OutgoingMessageClear { peer_id } => {
380 if let Some(v) = pubsub_state.clients.get_mut(&peer_id) {
381 v.message = pb::Rpc {
382 subscriptions: vec![],
383 publish: vec![],
384 control: None,
385 };
386 } else {
387 bug_condition!(
388 "Invalid state for action: `P2pNetworkPubsubAction::OutgoingMessageClear`"
389 );
390 };
391 Ok(())
392 }
393 P2pNetworkPubsubAction::OutgoingMessageError { .. } => Ok(()),
394 P2pNetworkPubsubAction::WebRtcRebroadcast { message } => {
395 let data = match super::encode_message(&message) {
396 Err(err) => {
397 bug_condition!("binprot serialization error: {err}");
398 return Ok(());
399 }
400 Ok(data) => data,
401 };
402
403 let mut source_sk = super::webrtc_source_sk_from_bytes(&data[8..]);
404 let source_peer_id = source_sk.public_key().peer_id();
405 let message_id = P2pNetworkPubsubMessageCacheId {
406 source: libp2p_identity::PeerId::try_from(source_peer_id).unwrap(),
407 seqno: 0,
408 };
409 let mut msg = pb::Message {
410 from: Some(message_id.source.to_bytes().to_vec()),
411 data: Some(data),
412 seqno: Some(message_id.seqno.to_be_bytes().to_vec()),
413 topic: super::TOPIC.to_owned(),
414 signature: None,
415 key: None,
416 };
417
418 msg.signature = match source_sk.libp2p_pubsub_pb_message_sign(&msg) {
419 Err(err) => {
420 bug_condition!("pubsub prost encode error: {err}");
421 return Ok(());
422 }
423 Ok(sig) => Some(sig.to_bytes().to_vec()),
424 };
425
426 let message_state = P2pNetworkPubsubMessageCacheMessage::Validated {
427 message: msg,
428 peer_id: source_peer_id,
429 time,
430 };
431
432 pubsub_state.mcache.map.insert(message_id, message_state);
433
434 let dispatcher = state_context.into_dispatcher();
435
436 dispatcher.push(P2pNetworkPubsubAction::BroadcastValidatedMessage {
437 message_id: super::BroadcastMessageId::MessageId { message_id },
438 });
439
440 Ok(())
441 }
442 P2pNetworkPubsubAction::Broadcast { message } => {
443 let data = match super::encode_message(&message) {
444 Err(err) => {
445 bug_condition!("binprot serialization error: {err}");
446 return Ok(());
447 }
448 Ok(data) => data,
449 };
450
451 Self::prepare_to_sign(state_context, data)
452 }
453 P2pNetworkPubsubAction::Sign {
454 seqno,
455 author,
456 data,
457 topic,
458 } => {
459 pubsub_state.seq += 1;
460
461 let libp2p_peer_id =
462 libp2p_identity::PeerId::try_from(author).expect("valid peer_id"); pubsub_state.to_sign.push_back(pb::Message {
464 from: Some(libp2p_peer_id.to_bytes()),
465 data: Some(data.0.into_vec()),
466 seqno: Some(seqno.to_be_bytes().to_vec()),
467 topic: topic.clone(),
468 signature: None,
469 key: None,
470 });
471
472 let to_sign = pubsub_state.to_sign.front().cloned();
473 let Some(message) = to_sign else {
474 bug_condition!("Message not found");
475 return Ok(());
476 };
477
478 let dispatcher = state_context.into_dispatcher();
479 dispatcher.push(P2pNetworkPubsubEffectfulAction::Sign { author, message });
480 Ok(())
481 }
482 P2pNetworkPubsubAction::SignError { .. } => {
483 let _ = pubsub_state.to_sign.pop_front();
484 Ok(())
485 }
486 P2pNetworkPubsubAction::BroadcastSigned { signature } => {
487 if let Some(mut message) = pubsub_state.to_sign.pop_front() {
488 message.signature = Some(signature.0.to_vec());
489 pubsub_state
490 .clients
491 .iter_mut()
492 .for_each(|(_, state)| state.publish(&message));
493 }
494
495 let (dispatcher, state) = state_context.into_dispatcher_and_state();
496 Self::broadcast(dispatcher, state)
497 }
498 P2pNetworkPubsubAction::OutgoingData { mut data, peer_id } => {
499 let (dispatcher, state) = state_context.into_dispatcher_and_state();
500 let state: &Self = state.substate()?;
501
502 let Some(state) = state.clients.get(&peer_id) else {
503 bug_condition!(
504 "Missing state for action: `P2pNetworkPubsubAction::OutgoingData`"
505 );
506 return Ok(());
507 };
508 fuzz_maybe!(&mut data, crate::fuzzer::mutate_pubsub);
509 let flags = fuzzed_maybe!(Default::default(), crate::fuzzer::mutate_yamux_flags);
510
511 if let Some(stream_id) = state.outgoing_stream_id.as_ref().copied() {
512 dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
513 addr: state.addr,
514 stream_id,
515 data,
516 flags,
517 });
518 }
519 Ok(())
520 }
521 P2pNetworkPubsubAction::ValidateIncomingMessage { message_id } => {
522 let Some(message) = pubsub_state.mcache.map.remove(&message_id) else {
523 bug_condition!("Message with id: {:?} not found", message_id);
524 return Ok(());
525 };
526
527 let P2pNetworkPubsubMessageCacheMessage::Init {
528 message,
529 content,
530 time,
531 peer_id,
532 } = message
533 else {
534 bug_condition!(
535 "`P2pNetworkPubsubAction::ValidateIncomingMessage` called on invalid state"
536 );
537 return Ok(());
538 };
539
540 let new_message_state = match &content {
541 GossipNetMessageV2::NewState(block) => {
542 let block_hash = block.try_hash()?;
543 P2pNetworkPubsubMessageCacheMessage::PreValidatedBlockMessage {
544 block_hash,
545 message,
546 peer_id,
547 time,
548 }
549 }
550 GossipNetMessageV2::SnarkPoolDiff {
551 message: NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(snark),
552 ..
553 } => {
554 let snark: Snark = snark.1.clone().into();
555 let job_id = snark.job_id();
556 P2pNetworkPubsubMessageCacheMessage::PreValidatedSnark {
557 job_id,
558 message,
559 peer_id,
560 time,
561 }
562 }
563 _ => P2pNetworkPubsubMessageCacheMessage::PreValidated {
564 message,
565 peer_id,
566 time,
567 },
568 };
569 pubsub_state
570 .mcache
571 .map
572 .insert(message_id, new_message_state);
573
574 let dispatcher = state_context.into_dispatcher();
575
576 match content {
578 GossipNetMessageV2::NewState(block) => {
579 let best_tip = BlockWithHash::try_new(block.clone())?;
580 dispatcher.push(P2pPeerAction::BestTipUpdate { peer_id, best_tip });
581 }
582 GossipNetMessageV2::TransactionPoolDiff { message, nonce } => {
583 let nonce = nonce.as_u32();
584 dispatcher.push(P2pChannelsTransactionAction::Libp2pReceived {
585 peer_id,
586 transactions: message.0.into_iter().collect(),
587 nonce,
588 message_id,
589 });
590 }
591 GossipNetMessageV2::SnarkPoolDiff {
592 message: NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(work),
593 nonce,
594 } => {
595 let snark: Snark = work.1.into();
596 dispatcher.push(P2pChannelsSnarkAction::Libp2pReceived {
597 peer_id,
598 snark: Box::new(snark),
599 nonce: nonce.as_u32(),
600 });
601 }
602 _ => {}
603 }
604 Ok(())
605 }
606 P2pNetworkPubsubAction::BroadcastValidatedMessage { message_id } => {
607 let Some((message_id, _)) =
608 pubsub_state.mcache.get_message_id_and_message(&message_id)
609 else {
610 bug_condition!("Message with id: {:?} not found", message_id);
611 return Ok(());
612 };
613
614 let Some(message) = pubsub_state.mcache.map.get(&message_id) else {
615 bug_condition!("Message with id: {:?} not found", message_id);
616 return Ok(());
617 };
618
619 let raw_message = message.message().clone();
620 let peer_id = *message.peer_id();
621
622 pubsub_state.reduce_incoming_validated_message(message_id, peer_id, &raw_message);
623
624 let Some(message) = pubsub_state.mcache.map.get_mut(&message_id) else {
625 bug_condition!("Message with id: {:?} not found", message_id);
626 return Ok(());
627 };
628
629 *message = P2pNetworkPubsubMessageCacheMessage::Validated {
630 message: raw_message,
631 peer_id,
632 time: *message.time(),
633 };
634
635 let (dispatcher, state) = state_context.into_dispatcher_and_state();
636 Self::broadcast(dispatcher, state)
637 }
638 P2pNetworkPubsubAction::PruneMessages {} => {
639 let messages = pubsub_state
640 .mcache
641 .map
642 .iter()
643 .filter_map(|(message_id, message)| {
644 if (*message.time() + MAX_MESSAGE_KEEP_DURATION) <= time {
645 Some(message_id.to_owned())
646 } else {
647 None
648 }
649 })
650 .collect::<Vec<_>>();
651
652 for message_id in messages {
653 pubsub_state.mcache.remove_message(message_id);
654 }
655 Ok(())
656 }
657 P2pNetworkPubsubAction::RejectMessage {
658 message_id,
659 peer_id,
660 ..
661 } => {
662 let mut involved_peers = peer_id.into_iter().collect::<Vec<_>>();
663 let mut add_peer = |peer: &PeerId| {
664 if !involved_peers.contains(peer) {
665 involved_peers.push(*peer);
666 }
667 };
668
669 if let Some(message_id) = message_id {
670 let Some((message_id, message)) =
671 pubsub_state.mcache.get_message_id_and_message(&message_id)
672 else {
673 bug_condition!("Message not found for id: {:?}", message_id);
674 return Ok(());
675 };
676
677 add_peer(message.peer_id());
678 pubsub_state.mcache.remove_message(message_id);
679 }
680
681 let dispatcher = state_context.into_dispatcher();
682
683 for peer_id in involved_peers {
684 dispatcher.push(P2pDisconnectionAction::Init {
685 peer_id,
686 reason: P2pDisconnectionReason::InvalidMessage,
687 });
688 }
689
690 Ok(())
691 }
692 P2pNetworkPubsubAction::IgnoreMessage { .. } => Ok(()),
693 }
694 }
695
696 fn prepare_to_sign<Action, State>(
697 mut state_context: Substate<Action, State, Self>,
698 buffer: Vec<u8>,
699 ) -> Result<(), String>
700 where
701 State: crate::P2pStateTrait,
702 Action: crate::P2pActionTrait<State>,
703 {
704 let pubsub_state = state_context.get_substate_mut()?;
705
706 let mut seqno = pubsub_state.seq;
707 let (dispatcher, state) = state_context.into_dispatcher_and_state();
708 let config: &P2pConfig = state.substate()?;
709 seqno += config.meshsub.initial_time.as_nanos() as u64;
710
711 dispatcher.push(P2pNetworkPubsubAction::Sign {
712 seqno,
713 author: config.identity_pub_key.peer_id(),
714 data: buffer.into(),
715 topic: super::TOPIC.to_owned(),
716 });
717
718 Ok(())
719 }
720
721 fn reduce_incoming_validated_message(
726 &mut self,
727 message_id: P2pNetworkPubsubMessageCacheId,
728 peer_id: PeerId,
729 message: &Message,
730 ) {
731 let topic = self.topics.entry(message.topic.clone()).or_default();
732
733 self.clients
734 .iter_mut()
735 .filter(|(c, _)| {
736 *c != &peer_id
738 })
739 .for_each(|(c, state)| {
740 let Some(topic_state) = topic.get(c) else {
741 return;
742 };
743 if topic_state.on_mesh() {
744 state.publish(message)
745 } else {
746 let ctr = state.message.control.get_or_insert_with(Default::default);
747 ctr.ihave.push(pb::ControlIHave {
748 topic_id: Some(message.topic.clone()),
749 message_ids: vec![message_id.to_raw_bytes()],
750 })
751 }
752 });
753 }
754
755 #[inline(never)]
773 fn reduce_incoming_message(
774 &mut self,
775 message: &Message,
776 seen_limit: usize,
777 ) -> Result<Option<GossipNetMessageV2>, String> {
778 let Some(signature) = &message.signature else {
779 bug_condition!("Validation failed: missing signature");
780 return Ok(None);
781 };
782
783 if !self.seen.contains(signature) {
785 self.seen.push_back(signature.clone());
786 if self.seen.len() > seen_limit {
788 self.seen.pop_front();
789 }
790 } else {
791 return Ok(None);
792 }
793
794 match &message.data {
795 Some(data) if data.len() > 8 => {
796 let mut slice = &data[8..];
797 Ok(Some(
798 gossip::GossipNetMessageV2::binprot_read(&mut slice)
799 .map_err(|e| format!("Invalid `GossipNetMessageV2` message, error: {e}"))?,
800 ))
801 }
802 _ => Err("Invalid message".to_owned()),
803 }
804 }
805
806 fn combined_with_pending_buffer<'a>(buffer: &'a mut Vec<u8>, data: &'a [u8]) -> &'a [u8] {
807 if buffer.is_empty() {
808 data
810 } else {
811 buffer.extend_from_slice(data);
812 buffer.as_slice()
813 }
814 }
815
816 fn reduce_incoming_data(
819 &mut self,
820 peer_id: &PeerId,
821 data: Data,
822 timestamp: Timestamp,
823 ) -> Result<(), String> {
824 let Some(client_state) = self.clients.get_mut(peer_id) else {
825 return Ok(());
828 };
829
830 let slice = Self::combined_with_pending_buffer(&mut client_state.buffer, &data);
832
833 match <pb::Rpc as prost::Message>::decode_length_delimited(slice) {
834 Ok(decoded) => {
835 client_state.clear_buffer();
836 client_state.incoming_messages.extend(decoded.publish);
837
838 let subscriptions = decoded.subscriptions;
839 let control = decoded.control.unwrap_or_default();
840
841 self.update_subscriptions(peer_id, subscriptions);
842 self.apply_control_commands(peer_id, &control);
843 self.respond_to_iwant_requests(peer_id, &control.iwant);
844 self.process_ihave_messages(peer_id, control.ihave, timestamp);
845 }
846 Err(err) => {
847 if err.to_string().contains("buffer underflow") && client_state.buffer.is_empty() {
850 client_state.buffer = data.to_vec();
852 } else {
853 client_state.clear_buffer();
856 }
857 }
858 }
859
860 Ok(())
861 }
862
863 fn update_subscriptions(&mut self, peer_id: &PeerId, subscriptions: Vec<pb::rpc::SubOpts>) {
864 for subscription in &subscriptions {
866 let topic_id = subscription.topic_id().to_owned();
867 let topic = self.topics.entry(topic_id).or_default();
868
869 if subscription.subscribe() {
870 topic.entry(*peer_id).or_default();
871 } else {
872 topic.remove(peer_id);
873 }
874 }
875 }
876
877 fn apply_control_commands(&mut self, peer_id: &PeerId, control: &pb::ControlMessage) {
879 for graft in &control.graft {
881 if let Some(mesh_state) = self
882 .topics
883 .get_mut(graft.topic_id())
884 .and_then(|m| m.get_mut(peer_id))
885 {
886 mesh_state.mesh = P2pNetworkPubsubClientMeshAddingState::Added;
887 }
888 }
889
890 for prune in &control.prune {
892 if let Some(mesh_state) = self
893 .topics
894 .get_mut(prune.topic_id())
895 .and_then(|m| m.get_mut(peer_id))
896 {
897 mesh_state.mesh = P2pNetworkPubsubClientMeshAddingState::TheyRefused;
898 }
899 }
900 }
901
902 fn respond_to_iwant_requests(&mut self, peer_id: &PeerId, iwant_requests: &[pb::ControlIWant]) {
903 for iwant in iwant_requests {
905 for msg_id in &iwant.message_ids {
906 if let Some(msg) = self.mcache.get_message_from_raw_message_id(msg_id) {
907 if let Some(client) = self.clients.get_mut(peer_id) {
908 client.publish(msg.message());
909 }
910 }
911 }
912 }
913 }
914
915 fn process_ihave_messages(
916 &mut self,
917 peer_id: &PeerId,
918 ihave_messages: Vec<pb::ControlIHave>,
919 timestamp: Timestamp,
920 ) {
921 for ihave in ihave_messages {
923 if self.clients.contains_key(peer_id) {
924 let message_ids = ihave
925 .message_ids
926 .into_iter()
927 .filter(|message_id| self.filter_iwant_message_ids(message_id, timestamp))
928 .collect::<Vec<_>>();
929
930 let Some(client) = self.clients.get_mut(peer_id) else {
931 bug_condition!("process_ihave_messages: State not found for {}", peer_id);
932 return;
933 };
934
935 let ctr = client.message.control.get_or_insert_with(Default::default);
937 ctr.iwant.push(pb::ControlIWant { message_ids })
938 }
939 }
940 }
941
942 fn broadcast<Action, State>(
943 dispatcher: &mut Dispatcher<Action, State>,
944 state: &State,
945 ) -> Result<(), String>
946 where
947 State: crate::P2pStateTrait,
948 Action: crate::P2pActionTrait<State>,
949 {
950 let state: &P2pNetworkPubsubState = state.substate()?;
951
952 for peer_id in state
953 .clients
954 .iter()
955 .filter(|(_, s)| !s.message_is_empty())
956 .map(|(peer_id, _)| *peer_id)
957 {
958 dispatcher.push(P2pNetworkPubsubAction::OutgoingMessage { peer_id });
959 }
960
961 Ok(())
962 }
963}