p2p/network/rpc/
p2p_network_rpc_actions.rs

1use mina_p2p_messages::rpc_kernel::{QueryHeader, QueryID, ResponseHeader};
2use openmina_core::{action_debug, action_trace, ActionEvent};
3use serde::{Deserialize, Serialize};
4
5use super::{super::*, *};
6use crate::{P2pState, PeerId};
7
8#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
9#[action_event(fields(display(addr), display(peer_id), incoming, stream_id, debug(data), fin))]
10pub enum P2pNetworkRpcAction {
11    Init {
12        addr: ConnectionAddr,
13        peer_id: PeerId,
14        stream_id: StreamId,
15        incoming: bool,
16    },
17    #[action_event(level = trace)]
18    IncomingData {
19        addr: ConnectionAddr,
20        peer_id: PeerId,
21        stream_id: StreamId,
22        data: Data,
23    },
24    #[action_event(expr(log_message(context, message, addr, peer_id, stream_id)))]
25    IncomingMessage {
26        addr: ConnectionAddr,
27        peer_id: PeerId,
28        stream_id: StreamId,
29        message: RpcMessage,
30    },
31    PrunePending {
32        peer_id: PeerId,
33        stream_id: StreamId,
34    },
35    HeartbeatSend {
36        addr: ConnectionAddr,
37        peer_id: PeerId,
38        stream_id: StreamId,
39    },
40    OutgoingQuery {
41        peer_id: PeerId,
42        query: QueryHeader,
43        data: Data,
44    },
45    OutgoingResponse {
46        peer_id: PeerId,
47        response: ResponseHeader,
48        data: Data,
49    },
50    OutgoingData {
51        addr: ConnectionAddr,
52        peer_id: PeerId,
53        stream_id: StreamId,
54        data: Data,
55        fin: bool,
56    },
57}
58
59pub enum RpcStreamId {
60    Exact(StreamId),
61    WithQuery(QueryID),
62    AnyIncoming,
63    AnyOutgoing,
64}
65
66impl P2pNetworkRpcAction {
67    pub fn stream_id(&self) -> RpcStreamId {
68        match self {
69            Self::Init { stream_id, .. } => RpcStreamId::Exact(*stream_id),
70            Self::IncomingData { stream_id, .. } => RpcStreamId::Exact(*stream_id),
71            Self::IncomingMessage { stream_id, .. } => RpcStreamId::Exact(*stream_id),
72            Self::PrunePending { stream_id, .. } => RpcStreamId::Exact(*stream_id),
73            Self::HeartbeatSend { stream_id, .. } => RpcStreamId::Exact(*stream_id),
74            Self::OutgoingQuery { .. } => RpcStreamId::AnyOutgoing,
75            Self::OutgoingResponse {
76                response: ResponseHeader { id },
77                ..
78            } => RpcStreamId::WithQuery(*id),
79            Self::OutgoingData { stream_id, .. } => RpcStreamId::Exact(*stream_id),
80        }
81    }
82
83    pub fn peer_id(&self) -> &PeerId {
84        match self {
85            Self::Init { peer_id, .. } => peer_id,
86            Self::IncomingData { peer_id, .. } => peer_id,
87            Self::IncomingMessage { peer_id, .. } => peer_id,
88            Self::PrunePending { peer_id, .. } => peer_id,
89            Self::HeartbeatSend { peer_id, .. } => peer_id,
90            Self::OutgoingQuery { peer_id, .. } => peer_id,
91            Self::OutgoingResponse { peer_id, .. } => peer_id,
92            Self::OutgoingData { peer_id, .. } => peer_id,
93        }
94    }
95}
96impl From<P2pNetworkRpcAction> for crate::P2pAction {
97    fn from(a: P2pNetworkRpcAction) -> Self {
98        Self::Network(a.into())
99    }
100}
101
102impl redux::EnablingCondition<P2pState> for P2pNetworkRpcAction {
103    fn is_enabled(&self, state: &P2pState, time: redux::Timestamp) -> bool {
104        let Some(rpc_state) = state.network.find_rpc_state(self) else {
105            return false;
106        };
107
108        if rpc_state.error.is_some() {
109            return false;
110        }
111
112        #[allow(unused_variables)]
113        match self {
114            P2pNetworkRpcAction::Init {
115                addr,
116                peer_id,
117                stream_id,
118                incoming,
119            } => true,
120            P2pNetworkRpcAction::IncomingData {
121                addr,
122                peer_id,
123                stream_id,
124                data,
125            } => true,
126            P2pNetworkRpcAction::IncomingMessage {
127                addr,
128                peer_id,
129                stream_id,
130                message,
131            } => true,
132            P2pNetworkRpcAction::PrunePending { peer_id, stream_id } => true,
133            P2pNetworkRpcAction::HeartbeatSend {
134                addr,
135                peer_id,
136                stream_id,
137            } => {
138                // TODO: if we have an incoming rpc, for which response
139                // isn't yet fully flushed to the stream, we will end up
140                // adding these heartbeats to the queue. Not necessarily
141                // an issue but not a completely correct behavior either.
142                rpc_state.should_send_heartbeat(time)
143            }
144            P2pNetworkRpcAction::OutgoingQuery {
145                peer_id,
146                query,
147                data,
148            } => true,
149            P2pNetworkRpcAction::OutgoingResponse {
150                peer_id,
151                response,
152                data,
153            } => true,
154            P2pNetworkRpcAction::OutgoingData {
155                addr,
156                peer_id,
157                stream_id,
158                data,
159                fin,
160            } => true,
161        }
162    }
163}
164
165fn log_message<T>(
166    context: &T,
167    message: &RpcMessage,
168    addr: &ConnectionAddr,
169    peer_id: &PeerId,
170    stream_id: &u32,
171) where
172    T: openmina_core::log::EventContext,
173{
174    match message {
175        RpcMessage::Handshake => action_trace!(
176            context,
177            kind = "P2pNetworkRpcIncomingMessage",
178            addr = display(addr),
179            peer_id = display(peer_id),
180            stream_id,
181            message_kind = "handshake"
182        ),
183        RpcMessage::Heartbeat => action_trace!(
184            context,
185            kind = "P2pNetworkRpcIncomingMessage",
186            addr = display(addr),
187            peer_id = display(peer_id),
188            stream_id,
189            message_kind = "heartbeat"
190        ),
191        RpcMessage::Query { header, .. } => action_debug!(
192            context,
193            kind = "P2pNetworkRpcIncomingMessage",
194            addr = display(addr),
195            peer_id = display(peer_id),
196            stream_id,
197            message_kind = "query",
198            message_header = debug(header)
199        ),
200        RpcMessage::Response { header, .. } => action_debug!(
201            context,
202            kind = "P2pNetworkRpcIncomingMessage",
203            addr = display(addr),
204            peer_id = display(peer_id),
205            stream_id,
206            message_kind = "response",
207            message_header = debug(header)
208        ),
209    }
210}