p2p/network/pubsub/p2p_network_pubsub_actions.rs
1use super::{pb, BroadcastMessageId};
2use crate::{token::BroadcastAlgorithm, ConnectionAddr, Data, P2pState, PeerId, StreamId};
3use mina_p2p_messages::gossip::GossipNetMessageV2;
4use openmina_core::{p2p::P2pNetworkPubsubMessageCacheId, ActionEvent};
5use serde::{Deserialize, Serialize};
6
7/// Actions that can occur within the P2P Network PubSub system.
8///
9/// Managing pubsub streams, handling incoming and outgoing messages,
10/// and maintaining the mesh network topology.
11///
12/// **Common Fields:**
13/// - `peer_id`: The identifier of the peer associated with the action.
14/// - `addr`: The connection address of the peer.
15/// - `stream_id`: The unique identifier of the stream.
16/// - `topic_id`: The identifier of the topic involved in the action.
17#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
18pub enum P2pNetworkPubsubAction {
19 /// Create a new stream, either incoming or outgoing.
20 ///
21 /// **Fields:**
22 /// - `incoming`: Indicates if the stream is incoming (`true`) or outgoing (`false`).
23 /// - `protocol`: The broadcast algorithm used for the stream.
24 NewStream {
25 incoming: bool,
26 peer_id: PeerId,
27 addr: ConnectionAddr,
28 stream_id: StreamId,
29 protocol: BroadcastAlgorithm,
30 },
31
32 /// Process incoming raw data from a peer.
33 ///
34 /// **Fields:**
35 /// - `data`: The raw data payload received.
36 /// - `seen_limit`: The limit for tracking seen messages to prevent duplication.
37 IncomingData {
38 peer_id: PeerId,
39 addr: ConnectionAddr,
40 stream_id: StreamId,
41 data: Data,
42 seen_limit: usize,
43 },
44
45 /// Validate a batch of decoded incoming messages.
46 ValidateIncomingMessages {
47 peer_id: PeerId,
48 seen_limit: usize,
49 addr: ConnectionAddr,
50 },
51
52 /// Handle a fully decoded and validated message received from a peer.
53 ///
54 /// **Fields:**
55 /// - `message`: The decoded protobuf message.
56 /// - `seen_limit`: The limit for tracking seen messages to prevent duplication.
57 IncomingMessage {
58 peer_id: PeerId,
59 message: pb::Message,
60 seen_limit: usize,
61 },
62
63 /// Clean up temporary states after processing an incoming message.
64 IncomingMessageCleanup {
65 peer_id: PeerId,
66 },
67
68 /// Add a peer to the mesh network for a specific topic.
69 Graft {
70 peer_id: PeerId,
71 topic_id: String,
72 },
73
74 /// Remove a peer from the mesh network for a specific topic.
75 Prune {
76 peer_id: PeerId,
77 topic_id: String,
78 },
79
80 /// Rebroadcast message received from WebRTC connection.
81 ///
82 /// Expected to be dispatched after the message has been processed,
83 /// in spite of whether it was received from libp2p or webrtc network.
84 ///
85 /// If received from libp2p network, or if we have already broadcasted
86 /// this message, the message will be in the `mcache` state,
87 /// in which case the action won't be enabled (will be filtered out).
88 WebRtcRebroadcast {
89 message: GossipNetMessageV2,
90 },
91
92 /// Initiate the broadcasting of a message to all subscribed peers.
93 ///
94 /// **Fields:**
95 /// - `message`: The gossip network message to broadcast.
96 Broadcast {
97 message: GossipNetMessageV2,
98 },
99
100 /// Prepare a message for signing before broadcasting.
101 ///
102 /// **Fields:**
103 /// - `seqno`: The sequence number of the message.
104 /// - `author`: The identifier of the peer authoring the message.
105 /// - `data`: The data payload of the message.
106 /// - `topic`: The topic under which the message is published.
107 Sign {
108 seqno: u64,
109 author: PeerId,
110 data: Data,
111 topic: String,
112 },
113
114 /// An error occured during the signing process.
115 #[action_event(level = warn, fields(display(author), display(topic)))]
116 SignError {
117 author: PeerId,
118 topic: String,
119 },
120
121 /// Finalize the broadcasting of a signed message by attaching the signature.
122 ///
123 /// **Fields:**
124 /// - `signature`: The cryptographic signature of the message.
125 BroadcastSigned {
126 signature: Data,
127 },
128
129 /// Prepare an outgoing message to send to a specific peer.
130 OutgoingMessage {
131 peer_id: PeerId,
132 },
133
134 /// Clear the outgoing message state for a specific peer after sending.
135 OutgoingMessageClear {
136 peer_id: PeerId,
137 },
138
139 /// An error occured during the sending of an outgoing message.
140 ///
141 /// **Fields:**
142 /// - `msg`: The protobuf message that failed to send.
143 #[action_event(level = warn, fields(display(peer_id), debug(msg)))]
144 OutgoingMessageError {
145 msg: pb::Rpc,
146 peer_id: PeerId,
147 },
148
149 /// Send encoded data over an outgoing stream to a specific peer.
150 ///
151 /// **Fields:**
152 /// - `data`: The encoded data to be sent.
153 OutgoingData {
154 data: Data,
155 peer_id: PeerId,
156 },
157
158 HandleIncomingMessage {
159 message: pb::Message,
160 message_content: GossipNetMessageV2,
161 peer_id: PeerId,
162 },
163
164 ValidateIncomingMessage {
165 message_id: P2pNetworkPubsubMessageCacheId,
166 },
167
168 /// Delete expired messages from state
169 PruneMessages {},
170
171 RejectMessage {
172 message_id: Option<BroadcastMessageId>,
173 peer_id: Option<PeerId>,
174 reason: String,
175 },
176 IgnoreMessage {
177 message_id: Option<BroadcastMessageId>,
178 reason: String,
179 },
180
181 // After message is fully validated, broadcast it to other peers
182 BroadcastValidatedMessage {
183 message_id: BroadcastMessageId,
184 },
185}
186
187impl From<P2pNetworkPubsubAction> for crate::P2pAction {
188 fn from(value: P2pNetworkPubsubAction) -> Self {
189 crate::P2pAction::Network(value.into())
190 }
191}
192
193impl redux::EnablingCondition<P2pState> for P2pNetworkPubsubAction {
194 fn is_enabled(&self, state: &P2pState, _time: redux::Timestamp) -> bool {
195 let pubsub = &state.network.scheduler.broadcast_state;
196 match self {
197 P2pNetworkPubsubAction::OutgoingMessage { peer_id } => pubsub
198 .clients
199 .get(peer_id)
200 .is_some_and(|s| !s.message_is_empty()),
201 P2pNetworkPubsubAction::Prune { peer_id, topic_id } => pubsub
202 .topics
203 .get(topic_id)
204 .is_some_and(|topics| topics.contains_key(peer_id)),
205 P2pNetworkPubsubAction::WebRtcRebroadcast { message } => {
206 let source = super::webrtc_source_sk(message)
207 .public_key()
208 .peer_id()
209 .try_into()
210 .unwrap();
211 pubsub
212 .mcache
213 .get_message(&P2pNetworkPubsubMessageCacheId { source, seqno: 0 })
214 .is_none()
215 }
216 P2pNetworkPubsubAction::BroadcastValidatedMessage { message_id }
217 | P2pNetworkPubsubAction::RejectMessage {
218 message_id: Some(message_id),
219 ..
220 } => pubsub.mcache.contains_broadcast_id(message_id),
221 _ => true,
222 }
223 }
224}