p2p/network/pubsub/
p2p_network_pubsub_reducer.rs

1use std::{collections::btree_map::Entry, time::Duration};
2
3use binprot::BinProtRead;
4use mina_p2p_messages::{
5    gossip::{self, GossipNetMessageV2},
6    v2::NetworkPoolSnarkPoolDiffVersionedStableV2,
7};
8use openmina_core::{
9    block::BlockWithHash, bug_condition, fuzz_maybe, fuzzed_maybe, snark::Snark, Substate,
10};
11use redux::{Dispatcher, Timestamp};
12
13use crate::{
14    channels::{snark::P2pChannelsSnarkAction, transaction::P2pChannelsTransactionAction},
15    disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
16    peer::P2pPeerAction,
17    Data, P2pConfig, P2pNetworkYamuxAction, P2pState, PeerId,
18};
19
20use super::{
21    p2p_network_pubsub_state::{
22        source_from_message, P2pNetworkPubsubClientMeshAddingState,
23        P2pNetworkPubsubMessageCacheMessage,
24    },
25    pb::{self, Message},
26    P2pNetworkPubsubAction, P2pNetworkPubsubClientState, P2pNetworkPubsubEffectfulAction,
27    P2pNetworkPubsubMessageCacheId, P2pNetworkPubsubState, TOPIC,
28};
29
30const MAX_MESSAGE_KEEP_DURATION: Duration = Duration::from_secs(300);
31
32impl P2pNetworkPubsubState {
33    pub fn reducer<Action, State>(
34        mut state_context: Substate<Action, State, Self>,
35        action: redux::ActionWithMeta<P2pNetworkPubsubAction>,
36    ) -> Result<(), String>
37    where
38        State: crate::P2pStateTrait,
39        Action: crate::P2pActionTrait<State>,
40    {
41        let pubsub_state = state_context.get_substate_mut()?;
42        let (action, meta) = action.split();
43        let time = meta.time();
44
45        match action {
46            P2pNetworkPubsubAction::NewStream {
47                incoming: true,
48                peer_id,
49                addr,
50                protocol,
51                ..
52            } => {
53                let entry = pubsub_state.clients.entry(peer_id);
54                // preserve it
55                let outgoing_stream_id = match &entry {
56                    Entry::Occupied(v) => v.get().outgoing_stream_id,
57                    Entry::Vacant(_) => None,
58                };
59                let state = entry.or_insert_with(|| P2pNetworkPubsubClientState {
60                    protocol,
61                    addr,
62                    outgoing_stream_id,
63                    message: pb::Rpc {
64                        subscriptions: vec![],
65                        publish: vec![],
66                        control: None,
67                    },
68                    cache: Default::default(),
69                    buffer: vec![],
70                    incoming_messages: vec![],
71                });
72                state.protocol = protocol;
73                state.addr = addr;
74
75                pubsub_state
76                    .topics
77                    .entry(super::TOPIC.to_owned())
78                    .or_default()
79                    .insert(peer_id, Default::default());
80
81                Ok(())
82            }
83            P2pNetworkPubsubAction::NewStream {
84                incoming: false,
85                peer_id,
86                stream_id,
87                addr,
88                protocol,
89            } => {
90                let state = pubsub_state.clients.entry(peer_id).or_insert_with(|| {
91                    P2pNetworkPubsubClientState {
92                        protocol,
93                        addr,
94                        outgoing_stream_id: Some(stream_id),
95                        message: pb::Rpc {
96                            subscriptions: vec![],
97                            publish: vec![],
98                            control: None,
99                        },
100                        cache: Default::default(),
101                        buffer: vec![],
102                        incoming_messages: vec![],
103                    }
104                });
105                state.outgoing_stream_id = Some(stream_id);
106                state.protocol = protocol;
107                state.addr = addr;
108
109                pubsub_state
110                    .topics
111                    .entry(TOPIC.to_owned())
112                    .or_default()
113                    .insert(peer_id, Default::default());
114
115                if let Some(state) = pubsub_state.clients.get_mut(&peer_id) {
116                    state.message.subscriptions.push(pb::rpc::SubOpts {
117                        subscribe: Some(true),
118                        topic_id: Some(TOPIC.to_owned()),
119                    });
120                }
121
122                let (dispatcher, state) = state_context.into_dispatcher_and_state();
123                let config: &P2pConfig = state.substate()?;
124                let state: &P2pNetworkPubsubState = state.substate()?;
125
126                let Some(map) = state.topics.get(TOPIC) else {
127                    // must have this topic already
128                    return Ok(());
129                };
130                dispatcher.push(P2pNetworkPubsubAction::OutgoingMessage { peer_id });
131                let mesh_size = map.values().filter(|s| s.on_mesh()).count();
132                if mesh_size < config.meshsub.outbound_degree_desired {
133                    dispatcher.push(P2pNetworkPubsubAction::Graft {
134                        peer_id,
135                        topic_id: TOPIC.to_owned(),
136                    });
137                }
138
139                Ok(())
140            }
141            P2pNetworkPubsubAction::IncomingData {
142                peer_id,
143                data,
144                seen_limit,
145                addr,
146                ..
147            } => {
148                pubsub_state.reduce_incoming_data(&peer_id, data, meta.time())?;
149
150                let dispatcher = state_context.into_dispatcher();
151
152                dispatcher.push(P2pNetworkPubsubAction::ValidateIncomingMessages {
153                    peer_id,
154                    seen_limit,
155                    addr,
156                });
157
158                Ok(())
159            }
160            P2pNetworkPubsubAction::ValidateIncomingMessages {
161                peer_id,
162                seen_limit,
163                addr,
164            } => {
165                let Some(state) = pubsub_state.clients.get_mut(&peer_id) else {
166                    // TODO: investigate, cannot reproduce this
167                    // bug_condition!("{:?} not found in state.clients", peer_id);
168                    return Ok(());
169                };
170                let messages = std::mem::take(&mut state.incoming_messages);
171
172                let dispatcher = state_context.into_dispatcher();
173
174                dispatcher.push(P2pNetworkPubsubEffectfulAction::ValidateIncomingMessages {
175                    peer_id,
176                    seen_limit,
177                    addr,
178                    messages,
179                });
180
181                Ok(())
182            }
183            P2pNetworkPubsubAction::IncomingMessage {
184                peer_id,
185                message,
186                seen_limit,
187            } => {
188                // Check that if we can extract source from message, this is pre check
189                if source_from_message(&message).is_err() {
190                    let dispatcher = state_context.into_dispatcher();
191                    dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
192                        message_id: None,
193                        peer_id: Some(peer_id),
194                        reason: "Invalid originator in message".to_owned(),
195                    });
196                    return Ok(());
197                }
198
199                // Check result later to ensure we always dispatch the cleanup action
200                let reduce_incoming_result =
201                    pubsub_state.reduce_incoming_message(&message, seen_limit);
202
203                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
204                let p2p_state: &P2pState = global_state.substate()?;
205                let state: &Self = global_state.substate()?;
206
207                dispatcher.push(P2pNetworkPubsubAction::IncomingMessageCleanup { peer_id });
208
209                let message_content = reduce_incoming_result?;
210
211                for (topic_id, map) in &state.topics {
212                    let mesh_size = map.values().filter(|s| s.on_mesh()).count();
213                    let could_accept = mesh_size < p2p_state.config.meshsub.outbound_degree_high;
214
215                    if !could_accept {
216                        if let Some(topic_state) = map.get(&peer_id) {
217                            if topic_state.on_mesh() {
218                                let topic_id = topic_id.clone();
219                                dispatcher.push(P2pNetworkPubsubAction::Prune { peer_id, topic_id })
220                            }
221                        }
222                    }
223                }
224
225                // This happens if message was already seen
226                if let Some(message_content) = message_content {
227                    dispatcher.push(P2pNetworkPubsubAction::HandleIncomingMessage {
228                        message,
229                        message_content,
230                        peer_id,
231                    });
232                } else {
233                    dispatcher.push(P2pNetworkPubsubAction::IgnoreMessage {
234                        message_id: None,
235                        reason: "Message already seen".to_owned(),
236                    });
237                };
238
239                Ok(())
240            }
241            P2pNetworkPubsubAction::HandleIncomingMessage {
242                message,
243                message_content,
244                peer_id,
245            } => {
246                let Ok(message_id) =
247                    pubsub_state
248                        .mcache
249                        .put(message, message_content, peer_id, time)
250                else {
251                    bug_condition!("Unable to add message to `mcache`");
252                    return Ok(());
253                };
254
255                let (dispatcher, state) = state_context.into_dispatcher_and_state();
256                let p2p_state: &P2pState = state.substate()?;
257
258                if let Some(callback) = p2p_state.callbacks.on_p2p_pubsub_message_received.clone() {
259                    dispatcher.push_callback(callback, message_id);
260                } else {
261                    dispatcher.push(P2pNetworkPubsubAction::ValidateIncomingMessage { message_id });
262                }
263                Ok(())
264            }
265            P2pNetworkPubsubAction::IncomingMessageCleanup { peer_id } => {
266                pubsub_state.clear_incoming();
267
268                let Some(client_state) = pubsub_state.clients.get_mut(&peer_id) else {
269                    bug_condition!(
270                        "State not found for action P2pNetworkPubsubAction::IncomingMessageCleanup"
271                    );
272                    return Ok(());
273                };
274
275                client_state.clear_incoming();
276
277                Ok(())
278            }
279            // we want to add peer to our mesh
280            P2pNetworkPubsubAction::Graft { peer_id, topic_id } => {
281                let Some(state) = pubsub_state
282                    .topics
283                    .get_mut(&topic_id)
284                    .and_then(|m| m.get_mut(&peer_id))
285                else {
286                    return Ok(());
287                };
288                state.mesh = P2pNetworkPubsubClientMeshAddingState::Added;
289
290                if let Some(state) = pubsub_state.clients.get_mut(&peer_id) {
291                    let control = state
292                        .message
293                        .control
294                        .get_or_insert_with(|| pb::ControlMessage {
295                            ihave: vec![],
296                            iwant: vec![],
297                            graft: vec![],
298                            prune: vec![],
299                        });
300                    control.graft.push(pb::ControlGraft {
301                        topic_id: Some(topic_id),
302                    });
303                }
304
305                let dispatcher = state_context.into_dispatcher();
306                dispatcher.push(P2pNetworkPubsubAction::OutgoingMessage { peer_id });
307                Ok(())
308            }
309            P2pNetworkPubsubAction::Prune { peer_id, topic_id } => {
310                let Some(state) = pubsub_state
311                    .topics
312                    .get_mut(&topic_id)
313                    .and_then(|m| m.get_mut(&peer_id))
314                else {
315                    bug_condition!("State not found for action: `P2pNetworkPubsubAction::Prune`");
316                    return Ok(());
317                };
318                state.mesh = P2pNetworkPubsubClientMeshAddingState::WeRefused;
319
320                if let Some(state) = pubsub_state.clients.get_mut(&peer_id) {
321                    let control = state
322                        .message
323                        .control
324                        .get_or_insert_with(|| pb::ControlMessage {
325                            ihave: vec![],
326                            iwant: vec![],
327                            graft: vec![],
328                            prune: vec![],
329                        });
330                    control.prune.push(pb::ControlPrune {
331                        topic_id: Some(topic_id),
332                        peers: vec![pb::PeerInfo {
333                            peer_id: None,
334                            signed_peer_record: None,
335                        }],
336                        backoff: None,
337                    });
338                }
339
340                let dispatcher = state_context.into_dispatcher();
341                dispatcher.push(P2pNetworkPubsubAction::OutgoingMessage { peer_id });
342                Ok(())
343            }
344            P2pNetworkPubsubAction::OutgoingMessage { peer_id } => {
345                let msg = if let Some(v) = pubsub_state.clients.get_mut(&peer_id) {
346                    &v.message
347                } else {
348                    bug_condition!(
349                        "Invalid state for action: `P2pNetworkPubsubAction::OutgoingMessage`"
350                    );
351                    return Ok(());
352                };
353
354                let mut data = vec![];
355                let result = prost::Message::encode_length_delimited(msg, &mut data)
356                    .map(|_| data)
357                    .map_err(|_| msg.clone());
358
359                let dispatcher = state_context.into_dispatcher();
360
361                match result {
362                    Err(msg) => {
363                        dispatcher
364                            .push(P2pNetworkPubsubAction::OutgoingMessageError { msg, peer_id });
365                    }
366                    Ok(data) => {
367                        dispatcher.push(P2pNetworkPubsubAction::OutgoingData {
368                            data: Data::from(data),
369                            peer_id,
370                        });
371                    }
372                }
373
374                // Important to avoid leaking state
375                dispatcher.push(P2pNetworkPubsubAction::OutgoingMessageClear { peer_id });
376
377                Ok(())
378            }
379            P2pNetworkPubsubAction::OutgoingMessageClear { peer_id } => {
380                if let Some(v) = pubsub_state.clients.get_mut(&peer_id) {
381                    v.message = pb::Rpc {
382                        subscriptions: vec![],
383                        publish: vec![],
384                        control: None,
385                    };
386                } else {
387                    bug_condition!(
388                        "Invalid state for action: `P2pNetworkPubsubAction::OutgoingMessageClear`"
389                    );
390                };
391                Ok(())
392            }
393            P2pNetworkPubsubAction::OutgoingMessageError { .. } => Ok(()),
394            P2pNetworkPubsubAction::WebRtcRebroadcast { message } => {
395                let data = match super::encode_message(&message) {
396                    Err(err) => {
397                        bug_condition!("binprot serialization error: {err}");
398                        return Ok(());
399                    }
400                    Ok(data) => data,
401                };
402
403                let mut source_sk = super::webrtc_source_sk_from_bytes(&data[8..]);
404                let source_peer_id = source_sk.public_key().peer_id();
405                let message_id = P2pNetworkPubsubMessageCacheId {
406                    source: libp2p_identity::PeerId::try_from(source_peer_id).unwrap(),
407                    seqno: 0,
408                };
409                let mut msg = pb::Message {
410                    from: Some(message_id.source.to_bytes().to_vec()),
411                    data: Some(data),
412                    seqno: Some(message_id.seqno.to_be_bytes().to_vec()),
413                    topic: super::TOPIC.to_owned(),
414                    signature: None,
415                    key: None,
416                };
417
418                msg.signature = match source_sk.libp2p_pubsub_pb_message_sign(&msg) {
419                    Err(err) => {
420                        bug_condition!("pubsub prost encode error: {err}");
421                        return Ok(());
422                    }
423                    Ok(sig) => Some(sig.to_bytes().to_vec()),
424                };
425
426                let message_state = P2pNetworkPubsubMessageCacheMessage::Validated {
427                    message: msg,
428                    peer_id: source_peer_id,
429                    time,
430                };
431
432                pubsub_state.mcache.map.insert(message_id, message_state);
433
434                let dispatcher = state_context.into_dispatcher();
435
436                dispatcher.push(P2pNetworkPubsubAction::BroadcastValidatedMessage {
437                    message_id: super::BroadcastMessageId::MessageId { message_id },
438                });
439
440                Ok(())
441            }
442            P2pNetworkPubsubAction::Broadcast { message } => {
443                let data = match super::encode_message(&message) {
444                    Err(err) => {
445                        bug_condition!("binprot serialization error: {err}");
446                        return Ok(());
447                    }
448                    Ok(data) => data,
449                };
450
451                Self::prepare_to_sign(state_context, data)
452            }
453            P2pNetworkPubsubAction::Sign {
454                seqno,
455                author,
456                data,
457                topic,
458            } => {
459                pubsub_state.seq += 1;
460
461                let libp2p_peer_id =
462                    libp2p_identity::PeerId::try_from(author).expect("valid peer_id"); // This can't happen unless something is broken in the configuration
463                pubsub_state.to_sign.push_back(pb::Message {
464                    from: Some(libp2p_peer_id.to_bytes()),
465                    data: Some(data.0.into_vec()),
466                    seqno: Some(seqno.to_be_bytes().to_vec()),
467                    topic: topic.clone(),
468                    signature: None,
469                    key: None,
470                });
471
472                let to_sign = pubsub_state.to_sign.front().cloned();
473                let Some(message) = to_sign else {
474                    bug_condition!("Message not found");
475                    return Ok(());
476                };
477
478                let dispatcher = state_context.into_dispatcher();
479                dispatcher.push(P2pNetworkPubsubEffectfulAction::Sign { author, message });
480                Ok(())
481            }
482            P2pNetworkPubsubAction::SignError { .. } => {
483                let _ = pubsub_state.to_sign.pop_front();
484                Ok(())
485            }
486            P2pNetworkPubsubAction::BroadcastSigned { signature } => {
487                if let Some(mut message) = pubsub_state.to_sign.pop_front() {
488                    message.signature = Some(signature.0.to_vec());
489                    pubsub_state
490                        .clients
491                        .iter_mut()
492                        .for_each(|(_, state)| state.publish(&message));
493                }
494
495                let (dispatcher, state) = state_context.into_dispatcher_and_state();
496                Self::broadcast(dispatcher, state)
497            }
498            P2pNetworkPubsubAction::OutgoingData { mut data, peer_id } => {
499                let (dispatcher, state) = state_context.into_dispatcher_and_state();
500                let state: &Self = state.substate()?;
501
502                let Some(state) = state.clients.get(&peer_id) else {
503                    bug_condition!(
504                        "Missing state for action: `P2pNetworkPubsubAction::OutgoingData`"
505                    );
506                    return Ok(());
507                };
508                fuzz_maybe!(&mut data, crate::fuzzer::mutate_pubsub);
509                let flags = fuzzed_maybe!(Default::default(), crate::fuzzer::mutate_yamux_flags);
510
511                if let Some(stream_id) = state.outgoing_stream_id.as_ref().copied() {
512                    dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
513                        addr: state.addr,
514                        stream_id,
515                        data,
516                        flags,
517                    });
518                }
519                Ok(())
520            }
521            P2pNetworkPubsubAction::ValidateIncomingMessage { message_id } => {
522                let Some(message) = pubsub_state.mcache.map.remove(&message_id) else {
523                    bug_condition!("Message with id: {:?} not found", message_id);
524                    return Ok(());
525                };
526
527                let P2pNetworkPubsubMessageCacheMessage::Init {
528                    message,
529                    content,
530                    time,
531                    peer_id,
532                } = message
533                else {
534                    bug_condition!(
535                        "`P2pNetworkPubsubAction::ValidateIncomingMessage` called on invalid state"
536                    );
537                    return Ok(());
538                };
539
540                let new_message_state = match &content {
541                    GossipNetMessageV2::NewState(block) => {
542                        let block_hash = block.try_hash()?;
543                        P2pNetworkPubsubMessageCacheMessage::PreValidatedBlockMessage {
544                            block_hash,
545                            message,
546                            peer_id,
547                            time,
548                        }
549                    }
550                    GossipNetMessageV2::SnarkPoolDiff {
551                        message: NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(snark),
552                        ..
553                    } => {
554                        let snark: Snark = snark.1.clone().into();
555                        let job_id = snark.job_id();
556                        P2pNetworkPubsubMessageCacheMessage::PreValidatedSnark {
557                            job_id,
558                            message,
559                            peer_id,
560                            time,
561                        }
562                    }
563                    _ => P2pNetworkPubsubMessageCacheMessage::PreValidated {
564                        message,
565                        peer_id,
566                        time,
567                    },
568                };
569                pubsub_state
570                    .mcache
571                    .map
572                    .insert(message_id, new_message_state);
573
574                let dispatcher = state_context.into_dispatcher();
575
576                // TODO: for transaction proof we track source, we should do that for `BestTipUpdate` and for `SnarkPoolDiff`
577                match content {
578                    GossipNetMessageV2::NewState(block) => {
579                        let best_tip = BlockWithHash::try_new(block.clone())?;
580                        dispatcher.push(P2pPeerAction::BestTipUpdate { peer_id, best_tip });
581                    }
582                    GossipNetMessageV2::TransactionPoolDiff { message, nonce } => {
583                        let nonce = nonce.as_u32();
584                        dispatcher.push(P2pChannelsTransactionAction::Libp2pReceived {
585                            peer_id,
586                            transactions: message.0.into_iter().collect(),
587                            nonce,
588                            message_id,
589                        });
590                    }
591                    GossipNetMessageV2::SnarkPoolDiff {
592                        message: NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(work),
593                        nonce,
594                    } => {
595                        let snark: Snark = work.1.into();
596                        dispatcher.push(P2pChannelsSnarkAction::Libp2pReceived {
597                            peer_id,
598                            snark: Box::new(snark),
599                            nonce: nonce.as_u32(),
600                        });
601                    }
602                    _ => {}
603                }
604                Ok(())
605            }
606            P2pNetworkPubsubAction::BroadcastValidatedMessage { message_id } => {
607                let Some((message_id, _)) =
608                    pubsub_state.mcache.get_message_id_and_message(&message_id)
609                else {
610                    bug_condition!("Message with id: {:?} not found", message_id);
611                    return Ok(());
612                };
613
614                let Some(message) = pubsub_state.mcache.map.get(&message_id) else {
615                    bug_condition!("Message with id: {:?} not found", message_id);
616                    return Ok(());
617                };
618
619                let raw_message = message.message().clone();
620                let peer_id = *message.peer_id();
621
622                pubsub_state.reduce_incoming_validated_message(message_id, peer_id, &raw_message);
623
624                let Some(message) = pubsub_state.mcache.map.get_mut(&message_id) else {
625                    bug_condition!("Message with id: {:?} not found", message_id);
626                    return Ok(());
627                };
628
629                *message = P2pNetworkPubsubMessageCacheMessage::Validated {
630                    message: raw_message,
631                    peer_id,
632                    time: *message.time(),
633                };
634
635                let (dispatcher, state) = state_context.into_dispatcher_and_state();
636                Self::broadcast(dispatcher, state)
637            }
638            P2pNetworkPubsubAction::PruneMessages {} => {
639                let messages = pubsub_state
640                    .mcache
641                    .map
642                    .iter()
643                    .filter_map(|(message_id, message)| {
644                        if (*message.time() + MAX_MESSAGE_KEEP_DURATION) <= time {
645                            Some(message_id.to_owned())
646                        } else {
647                            None
648                        }
649                    })
650                    .collect::<Vec<_>>();
651
652                for message_id in messages {
653                    pubsub_state.mcache.remove_message(message_id);
654                }
655                Ok(())
656            }
657            P2pNetworkPubsubAction::RejectMessage {
658                message_id,
659                peer_id,
660                ..
661            } => {
662                let mut involved_peers = peer_id.into_iter().collect::<Vec<_>>();
663                let mut add_peer = |peer: &PeerId| {
664                    if !involved_peers.contains(peer) {
665                        involved_peers.push(*peer);
666                    }
667                };
668
669                if let Some(message_id) = message_id {
670                    let Some((message_id, message)) =
671                        pubsub_state.mcache.get_message_id_and_message(&message_id)
672                    else {
673                        bug_condition!("Message not found for id: {:?}", message_id);
674                        return Ok(());
675                    };
676
677                    add_peer(message.peer_id());
678                    pubsub_state.mcache.remove_message(message_id);
679                }
680
681                let dispatcher = state_context.into_dispatcher();
682
683                for peer_id in involved_peers {
684                    dispatcher.push(P2pDisconnectionAction::Init {
685                        peer_id,
686                        reason: P2pDisconnectionReason::InvalidMessage,
687                    });
688                }
689
690                Ok(())
691            }
692            P2pNetworkPubsubAction::IgnoreMessage { .. } => Ok(()),
693        }
694    }
695
696    fn prepare_to_sign<Action, State>(
697        mut state_context: Substate<Action, State, Self>,
698        buffer: Vec<u8>,
699    ) -> Result<(), String>
700    where
701        State: crate::P2pStateTrait,
702        Action: crate::P2pActionTrait<State>,
703    {
704        let pubsub_state = state_context.get_substate_mut()?;
705
706        let mut seqno = pubsub_state.seq;
707        let (dispatcher, state) = state_context.into_dispatcher_and_state();
708        let config: &P2pConfig = state.substate()?;
709        seqno += config.meshsub.initial_time.as_nanos() as u64;
710
711        dispatcher.push(P2pNetworkPubsubAction::Sign {
712            seqno,
713            author: config.identity_pub_key.peer_id(),
714            data: buffer.into(),
715            topic: super::TOPIC.to_owned(),
716        });
717
718        Ok(())
719    }
720
721    /// Queues a validated message for propagation to other peers in the pubsub network.
722    /// For peers that are "on mesh" for the message's topic, queues the full message.
723    /// For other peers, queues an IHAVE control message to notify about message availability.
724    /// The original sender is excluded from propagation.
725    fn reduce_incoming_validated_message(
726        &mut self,
727        message_id: P2pNetworkPubsubMessageCacheId,
728        peer_id: PeerId,
729        message: &Message,
730    ) {
731        let topic = self.topics.entry(message.topic.clone()).or_default();
732
733        self.clients
734            .iter_mut()
735            .filter(|(c, _)| {
736                // don't send back to who sent this
737                *c != &peer_id
738            })
739            .for_each(|(c, state)| {
740                let Some(topic_state) = topic.get(c) else {
741                    return;
742                };
743                if topic_state.on_mesh() {
744                    state.publish(message)
745                } else {
746                    let ctr = state.message.control.get_or_insert_with(Default::default);
747                    ctr.ihave.push(pb::ControlIHave {
748                        topic_id: Some(message.topic.clone()),
749                        message_ids: vec![message_id.to_raw_bytes()],
750                    })
751                }
752            });
753    }
754
755    /// Processes an incoming message by checking for duplicates and deserializing its contents.
756    ///
757    /// This function performs two main operations:
758    /// 1. Deduplication: Tracks recently seen messages using their signatures to avoid processing duplicates
759    /// 2. Deserialization: Converts valid message data into a `GossipNetMessageV2` structure
760    ///
761    /// # Arguments
762    ///
763    /// * `message` - The incoming message to process
764    /// * `seen_limit` - Maximum number of message signatures to keep in the deduplication cache
765    ///
766    /// # Returns
767    ///
768    /// * `Ok(Some(GossipNetMessageV2))` - Successfully processed and deserialized message
769    /// * `Ok(None)` - Message was a duplicate (already seen)
770    /// * `Err(String)` - Error during processing (invalid message format or deserialization failure)
771    ///
772    #[inline(never)]
773    fn reduce_incoming_message(
774        &mut self,
775        message: &Message,
776        seen_limit: usize,
777    ) -> Result<Option<GossipNetMessageV2>, String> {
778        let Some(signature) = &message.signature else {
779            bug_condition!("Validation failed: missing signature");
780            return Ok(None);
781        };
782
783        // skip recently seen message
784        if !self.seen.contains(signature) {
785            self.seen.push_back(signature.clone());
786            // keep only last `n` to avoid memory leak
787            if self.seen.len() > seen_limit {
788                self.seen.pop_front();
789            }
790        } else {
791            return Ok(None);
792        }
793
794        match &message.data {
795            Some(data) if data.len() > 8 => {
796                let mut slice = &data[8..];
797                Ok(Some(
798                    gossip::GossipNetMessageV2::binprot_read(&mut slice)
799                        .map_err(|e| format!("Invalid `GossipNetMessageV2` message, error: {e}"))?,
800                ))
801            }
802            _ => Err("Invalid message".to_owned()),
803        }
804    }
805
806    fn combined_with_pending_buffer<'a>(buffer: &'a mut Vec<u8>, data: &'a [u8]) -> &'a [u8] {
807        if buffer.is_empty() {
808            // Nothing pending, we can use the data directly
809            data
810        } else {
811            buffer.extend_from_slice(data);
812            buffer.as_slice()
813        }
814    }
815
816    /// Processes incoming data from a peer, handling subscriptions, control messages,
817    /// and message broadcasting within the P2P pubsub system.
818    fn reduce_incoming_data(
819        &mut self,
820        peer_id: &PeerId,
821        data: Data,
822        timestamp: Timestamp,
823    ) -> Result<(), String> {
824        let Some(client_state) = self.clients.get_mut(peer_id) else {
825            // TODO: investigate, cannot reproduce this
826            // bug_condition!("State not found for action: P2pNetworkPubsubAction::IncomingData");
827            return Ok(());
828        };
829
830        // Data may be part of a partial message we received before.
831        let slice = Self::combined_with_pending_buffer(&mut client_state.buffer, &data);
832
833        match <pb::Rpc as prost::Message>::decode_length_delimited(slice) {
834            Ok(decoded) => {
835                client_state.clear_buffer();
836                client_state.incoming_messages.extend(decoded.publish);
837
838                let subscriptions = decoded.subscriptions;
839                let control = decoded.control.unwrap_or_default();
840
841                self.update_subscriptions(peer_id, subscriptions);
842                self.apply_control_commands(peer_id, &control);
843                self.respond_to_iwant_requests(peer_id, &control.iwant);
844                self.process_ihave_messages(peer_id, control.ihave, timestamp);
845            }
846            Err(err) => {
847                // NOTE: not the ideal way to check for errors, but `prost` doesn't provide
848                // a better alternative, so we must check the message contents.
849                if err.to_string().contains("buffer underflow") && client_state.buffer.is_empty() {
850                    // Incomplete data, keep in buffer, should be completed later
851                    client_state.buffer = data.to_vec();
852                } else {
853                    // Clear the buffer for other decoding errors, otherwise this will cause issues
854                    // with any data we receive later.
855                    client_state.clear_buffer();
856                }
857            }
858        }
859
860        Ok(())
861    }
862
863    fn update_subscriptions(&mut self, peer_id: &PeerId, subscriptions: Vec<pb::rpc::SubOpts>) {
864        // Update subscription status based on incoming subscription requests.
865        for subscription in &subscriptions {
866            let topic_id = subscription.topic_id().to_owned();
867            let topic = self.topics.entry(topic_id).or_default();
868
869            if subscription.subscribe() {
870                topic.entry(*peer_id).or_default();
871            } else {
872                topic.remove(peer_id);
873            }
874        }
875    }
876
877    /// Applies control commands (`graft` and `prune`) to manage the peer's mesh states within topics.
878    fn apply_control_commands(&mut self, peer_id: &PeerId, control: &pb::ControlMessage) {
879        // Apply graft commands to add the peer to specific topic meshes.
880        for graft in &control.graft {
881            if let Some(mesh_state) = self
882                .topics
883                .get_mut(graft.topic_id())
884                .and_then(|m| m.get_mut(peer_id))
885            {
886                mesh_state.mesh = P2pNetworkPubsubClientMeshAddingState::Added;
887            }
888        }
889
890        // Apply prune commands to remove the peer from specific topic meshes.
891        for prune in &control.prune {
892            if let Some(mesh_state) = self
893                .topics
894                .get_mut(prune.topic_id())
895                .and_then(|m| m.get_mut(peer_id))
896            {
897                mesh_state.mesh = P2pNetworkPubsubClientMeshAddingState::TheyRefused;
898            }
899        }
900    }
901
902    fn respond_to_iwant_requests(&mut self, peer_id: &PeerId, iwant_requests: &[pb::ControlIWant]) {
903        // Respond to iwant requests by publishing available messages from the cache.
904        for iwant in iwant_requests {
905            for msg_id in &iwant.message_ids {
906                if let Some(msg) = self.mcache.get_message_from_raw_message_id(msg_id) {
907                    if let Some(client) = self.clients.get_mut(peer_id) {
908                        client.publish(msg.message());
909                    }
910                }
911            }
912        }
913    }
914
915    fn process_ihave_messages(
916        &mut self,
917        peer_id: &PeerId,
918        ihave_messages: Vec<pb::ControlIHave>,
919        timestamp: Timestamp,
920    ) {
921        // Process ihave messages by determining which available messages the client wants.
922        for ihave in ihave_messages {
923            if self.clients.contains_key(peer_id) {
924                let message_ids = ihave
925                    .message_ids
926                    .into_iter()
927                    .filter(|message_id| self.filter_iwant_message_ids(message_id, timestamp))
928                    .collect::<Vec<_>>();
929
930                let Some(client) = self.clients.get_mut(peer_id) else {
931                    bug_condition!("process_ihave_messages: State not found for {}", peer_id);
932                    return;
933                };
934
935                // Queue the desired message IDs for the client to request.
936                let ctr = client.message.control.get_or_insert_with(Default::default);
937                ctr.iwant.push(pb::ControlIWant { message_ids })
938            }
939        }
940    }
941
942    fn broadcast<Action, State>(
943        dispatcher: &mut Dispatcher<Action, State>,
944        state: &State,
945    ) -> Result<(), String>
946    where
947        State: crate::P2pStateTrait,
948        Action: crate::P2pActionTrait<State>,
949    {
950        let state: &P2pNetworkPubsubState = state.substate()?;
951
952        for peer_id in state
953            .clients
954            .iter()
955            .filter(|(_, s)| !s.message_is_empty())
956            .map(|(peer_id, _)| *peer_id)
957        {
958            dispatcher.push(P2pNetworkPubsubAction::OutgoingMessage { peer_id });
959        }
960
961        Ok(())
962    }
963}