p2p/network/pubsub/
p2p_network_pubsub_actions.rs

1use super::{pb, BroadcastMessageId};
2use crate::{token::BroadcastAlgorithm, ConnectionAddr, Data, P2pState, PeerId, StreamId};
3use mina_p2p_messages::gossip::GossipNetMessageV2;
4use openmina_core::{p2p::P2pNetworkPubsubMessageCacheId, ActionEvent};
5use serde::{Deserialize, Serialize};
6
7/// Actions that can occur within the P2P Network PubSub system.
8///
9/// Managing pubsub streams, handling incoming and outgoing messages,
10/// and maintaining the mesh network topology.
11///
12/// **Common Fields:**
13/// - `peer_id`: The identifier of the peer associated with the action.
14/// - `addr`: The connection address of the peer.
15/// - `stream_id`: The unique identifier of the stream.
16/// - `topic_id`: The identifier of the topic involved in the action.
17#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
18pub enum P2pNetworkPubsubAction {
19    /// Create a new stream, either incoming or outgoing.
20    ///
21    /// **Fields:**
22    /// - `incoming`: Indicates if the stream is incoming (`true`) or outgoing (`false`).
23    /// - `protocol`: The broadcast algorithm used for the stream.
24    NewStream {
25        incoming: bool,
26        peer_id: PeerId,
27        addr: ConnectionAddr,
28        stream_id: StreamId,
29        protocol: BroadcastAlgorithm,
30    },
31
32    /// Process incoming raw data from a peer.
33    ///
34    /// **Fields:**
35    /// - `data`: The raw data payload received.
36    /// - `seen_limit`: The limit for tracking seen messages to prevent duplication.
37    IncomingData {
38        peer_id: PeerId,
39        addr: ConnectionAddr,
40        stream_id: StreamId,
41        data: Data,
42        seen_limit: usize,
43    },
44
45    /// Validate a batch of decoded incoming messages.
46    ValidateIncomingMessages {
47        peer_id: PeerId,
48        seen_limit: usize,
49        addr: ConnectionAddr,
50    },
51
52    /// Handle a fully decoded and validated message received from a peer.
53    ///
54    /// **Fields:**
55    /// - `message`: The decoded protobuf message.
56    /// - `seen_limit`: The limit for tracking seen messages to prevent duplication.
57    IncomingMessage {
58        peer_id: PeerId,
59        message: pb::Message,
60        seen_limit: usize,
61    },
62
63    /// Clean up temporary states after processing an incoming message.
64    IncomingMessageCleanup {
65        peer_id: PeerId,
66    },
67
68    /// Add a peer to the mesh network for a specific topic.
69    Graft {
70        peer_id: PeerId,
71        topic_id: String,
72    },
73
74    /// Remove a peer from the mesh network for a specific topic.
75    Prune {
76        peer_id: PeerId,
77        topic_id: String,
78    },
79
80    /// Rebroadcast message received from WebRTC connection.
81    ///
82    /// Expected to be dispatched after the message has been processed,
83    /// in spite of whether it was received from libp2p or webrtc network.
84    ///
85    /// If received from libp2p network, or if we have already broadcasted
86    /// this message, the message will be in the `mcache` state,
87    /// in which case the action won't be enabled (will be filtered out).
88    WebRtcRebroadcast {
89        message: GossipNetMessageV2,
90    },
91
92    /// Initiate the broadcasting of a message to all subscribed peers.
93    ///
94    /// **Fields:**
95    /// - `message`: The gossip network message to broadcast.
96    Broadcast {
97        message: GossipNetMessageV2,
98    },
99
100    /// Prepare a message for signing before broadcasting.
101    ///
102    /// **Fields:**
103    /// - `seqno`: The sequence number of the message.
104    /// - `author`: The identifier of the peer authoring the message.
105    /// - `data`: The data payload of the message.
106    /// - `topic`: The topic under which the message is published.
107    Sign {
108        seqno: u64,
109        author: PeerId,
110        data: Data,
111        topic: String,
112    },
113
114    /// An error occured during the signing process.
115    #[action_event(level = warn, fields(display(author), display(topic)))]
116    SignError {
117        author: PeerId,
118        topic: String,
119    },
120
121    /// Finalize the broadcasting of a signed message by attaching the signature.
122    ///
123    /// **Fields:**
124    /// - `signature`: The cryptographic signature of the message.
125    BroadcastSigned {
126        signature: Data,
127    },
128
129    /// Prepare an outgoing message to send to a specific peer.
130    OutgoingMessage {
131        peer_id: PeerId,
132    },
133
134    /// Clear the outgoing message state for a specific peer after sending.
135    OutgoingMessageClear {
136        peer_id: PeerId,
137    },
138
139    /// An error occured during the sending of an outgoing message.
140    ///
141    /// **Fields:**
142    /// - `msg`: The protobuf message that failed to send.
143    #[action_event(level = warn, fields(display(peer_id), debug(msg)))]
144    OutgoingMessageError {
145        msg: pb::Rpc,
146        peer_id: PeerId,
147    },
148
149    /// Send encoded data over an outgoing stream to a specific peer.
150    ///
151    /// **Fields:**
152    /// - `data`: The encoded data to be sent.
153    OutgoingData {
154        data: Data,
155        peer_id: PeerId,
156    },
157
158    HandleIncomingMessage {
159        message: pb::Message,
160        message_content: GossipNetMessageV2,
161        peer_id: PeerId,
162    },
163
164    ValidateIncomingMessage {
165        message_id: P2pNetworkPubsubMessageCacheId,
166    },
167
168    /// Delete expired messages from state
169    PruneMessages {},
170
171    RejectMessage {
172        message_id: Option<BroadcastMessageId>,
173        peer_id: Option<PeerId>,
174        reason: String,
175    },
176    IgnoreMessage {
177        message_id: Option<BroadcastMessageId>,
178        reason: String,
179    },
180
181    // After message is fully validated, broadcast it to other peers
182    BroadcastValidatedMessage {
183        message_id: BroadcastMessageId,
184    },
185}
186
187impl From<P2pNetworkPubsubAction> for crate::P2pAction {
188    fn from(value: P2pNetworkPubsubAction) -> Self {
189        crate::P2pAction::Network(value.into())
190    }
191}
192
193impl redux::EnablingCondition<P2pState> for P2pNetworkPubsubAction {
194    fn is_enabled(&self, state: &P2pState, _time: redux::Timestamp) -> bool {
195        let pubsub = &state.network.scheduler.broadcast_state;
196        match self {
197            P2pNetworkPubsubAction::OutgoingMessage { peer_id } => pubsub
198                .clients
199                .get(peer_id)
200                .is_some_and(|s| !s.message_is_empty()),
201            P2pNetworkPubsubAction::Prune { peer_id, topic_id } => pubsub
202                .topics
203                .get(topic_id)
204                .is_some_and(|topics| topics.contains_key(peer_id)),
205            P2pNetworkPubsubAction::WebRtcRebroadcast { message } => {
206                let source = super::webrtc_source_sk(message)
207                    .public_key()
208                    .peer_id()
209                    .try_into()
210                    .unwrap();
211                pubsub
212                    .mcache
213                    .get_message(&P2pNetworkPubsubMessageCacheId { source, seqno: 0 })
214                    .is_none()
215            }
216            P2pNetworkPubsubAction::BroadcastValidatedMessage { message_id }
217            | P2pNetworkPubsubAction::RejectMessage {
218                message_id: Some(message_id),
219                ..
220            } => pubsub.mcache.contains_broadcast_id(message_id),
221            _ => true,
222        }
223    }
224}