1use mina_p2p_messages::rpc_kernel::{QueryHeader, QueryID, ResponseHeader};
2use openmina_core::{action_debug, action_trace, ActionEvent};
3use serde::{Deserialize, Serialize};
4
5use super::{super::*, *};
6use crate::{P2pState, PeerId};
7
8#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
9#[action_event(fields(display(addr), display(peer_id), incoming, stream_id, debug(data), fin))]
10pub enum P2pNetworkRpcAction {
11 Init {
12 addr: ConnectionAddr,
13 peer_id: PeerId,
14 stream_id: StreamId,
15 incoming: bool,
16 },
17 #[action_event(level = trace)]
18 IncomingData {
19 addr: ConnectionAddr,
20 peer_id: PeerId,
21 stream_id: StreamId,
22 data: Data,
23 },
24 #[action_event(expr(log_message(context, message, addr, peer_id, stream_id)))]
25 IncomingMessage {
26 addr: ConnectionAddr,
27 peer_id: PeerId,
28 stream_id: StreamId,
29 message: RpcMessage,
30 },
31 PrunePending {
32 peer_id: PeerId,
33 stream_id: StreamId,
34 },
35 HeartbeatSend {
36 addr: ConnectionAddr,
37 peer_id: PeerId,
38 stream_id: StreamId,
39 },
40 OutgoingQuery {
41 peer_id: PeerId,
42 query: QueryHeader,
43 data: Data,
44 },
45 OutgoingResponse {
46 peer_id: PeerId,
47 response: ResponseHeader,
48 data: Data,
49 },
50 OutgoingData {
51 addr: ConnectionAddr,
52 peer_id: PeerId,
53 stream_id: StreamId,
54 data: Data,
55 fin: bool,
56 },
57}
58
59pub enum RpcStreamId {
60 Exact(StreamId),
61 WithQuery(QueryID),
62 AnyIncoming,
63 AnyOutgoing,
64}
65
66impl P2pNetworkRpcAction {
67 pub fn stream_id(&self) -> RpcStreamId {
68 match self {
69 Self::Init { stream_id, .. } => RpcStreamId::Exact(*stream_id),
70 Self::IncomingData { stream_id, .. } => RpcStreamId::Exact(*stream_id),
71 Self::IncomingMessage { stream_id, .. } => RpcStreamId::Exact(*stream_id),
72 Self::PrunePending { stream_id, .. } => RpcStreamId::Exact(*stream_id),
73 Self::HeartbeatSend { stream_id, .. } => RpcStreamId::Exact(*stream_id),
74 Self::OutgoingQuery { .. } => RpcStreamId::AnyOutgoing,
75 Self::OutgoingResponse {
76 response: ResponseHeader { id },
77 ..
78 } => RpcStreamId::WithQuery(*id),
79 Self::OutgoingData { stream_id, .. } => RpcStreamId::Exact(*stream_id),
80 }
81 }
82
83 pub fn peer_id(&self) -> &PeerId {
84 match self {
85 Self::Init { peer_id, .. } => peer_id,
86 Self::IncomingData { peer_id, .. } => peer_id,
87 Self::IncomingMessage { peer_id, .. } => peer_id,
88 Self::PrunePending { peer_id, .. } => peer_id,
89 Self::HeartbeatSend { peer_id, .. } => peer_id,
90 Self::OutgoingQuery { peer_id, .. } => peer_id,
91 Self::OutgoingResponse { peer_id, .. } => peer_id,
92 Self::OutgoingData { peer_id, .. } => peer_id,
93 }
94 }
95}
96impl From<P2pNetworkRpcAction> for crate::P2pAction {
97 fn from(a: P2pNetworkRpcAction) -> Self {
98 Self::Network(a.into())
99 }
100}
101
102impl redux::EnablingCondition<P2pState> for P2pNetworkRpcAction {
103 fn is_enabled(&self, state: &P2pState, time: redux::Timestamp) -> bool {
104 let Some(rpc_state) = state.network.find_rpc_state(self) else {
105 return false;
106 };
107
108 if rpc_state.error.is_some() {
109 return false;
110 }
111
112 #[allow(unused_variables)]
113 match self {
114 P2pNetworkRpcAction::Init {
115 addr,
116 peer_id,
117 stream_id,
118 incoming,
119 } => true,
120 P2pNetworkRpcAction::IncomingData {
121 addr,
122 peer_id,
123 stream_id,
124 data,
125 } => true,
126 P2pNetworkRpcAction::IncomingMessage {
127 addr,
128 peer_id,
129 stream_id,
130 message,
131 } => true,
132 P2pNetworkRpcAction::PrunePending { peer_id, stream_id } => true,
133 P2pNetworkRpcAction::HeartbeatSend {
134 addr,
135 peer_id,
136 stream_id,
137 } => {
138 rpc_state.should_send_heartbeat(time)
143 }
144 P2pNetworkRpcAction::OutgoingQuery {
145 peer_id,
146 query,
147 data,
148 } => true,
149 P2pNetworkRpcAction::OutgoingResponse {
150 peer_id,
151 response,
152 data,
153 } => true,
154 P2pNetworkRpcAction::OutgoingData {
155 addr,
156 peer_id,
157 stream_id,
158 data,
159 fin,
160 } => true,
161 }
162 }
163}
164
165fn log_message<T>(
166 context: &T,
167 message: &RpcMessage,
168 addr: &ConnectionAddr,
169 peer_id: &PeerId,
170 stream_id: &u32,
171) where
172 T: openmina_core::log::EventContext,
173{
174 match message {
175 RpcMessage::Handshake => action_trace!(
176 context,
177 kind = "P2pNetworkRpcIncomingMessage",
178 addr = display(addr),
179 peer_id = display(peer_id),
180 stream_id,
181 message_kind = "handshake"
182 ),
183 RpcMessage::Heartbeat => action_trace!(
184 context,
185 kind = "P2pNetworkRpcIncomingMessage",
186 addr = display(addr),
187 peer_id = display(peer_id),
188 stream_id,
189 message_kind = "heartbeat"
190 ),
191 RpcMessage::Query { header, .. } => action_debug!(
192 context,
193 kind = "P2pNetworkRpcIncomingMessage",
194 addr = display(addr),
195 peer_id = display(peer_id),
196 stream_id,
197 message_kind = "query",
198 message_header = debug(header)
199 ),
200 RpcMessage::Response { header, .. } => action_debug!(
201 context,
202 kind = "P2pNetworkRpcIncomingMessage",
203 addr = display(addr),
204 peer_id = display(peer_id),
205 stream_id,
206 message_kind = "response",
207 message_header = debug(header)
208 ),
209 }
210}