1use openmina_core::ActionEvent;
2use redux::Timestamp;
3use serde::{Deserialize, Serialize};
4
5use crate::{P2pState, PeerId};
6
7use super::{P2pChannelsRpcState, P2pRpcId, P2pRpcLocalState, P2pRpcRequest, P2pRpcResponse};
8
9pub type P2pChannelsRpcActionWithMetaRef<'a> = redux::ActionWithMeta<&'a P2pChannelsRpcAction>;
10
11pub const MAX_P2P_RPC_REMOTE_CONCURRENT_REQUESTS: usize = 5;
12
13#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
14#[action_event(fields(display(peer_id)))]
15pub enum P2pChannelsRpcAction {
16 Init {
17 peer_id: PeerId,
18 },
19 Pending {
20 peer_id: PeerId,
21 },
22 Ready {
23 peer_id: PeerId,
24 },
25 RequestSend {
26 peer_id: PeerId,
27 id: P2pRpcId,
28 request: Box<P2pRpcRequest>,
29 on_init: Option<redux::Callback<(PeerId, P2pRpcId, P2pRpcRequest)>>,
30 },
31 Timeout {
32 peer_id: PeerId,
33 id: P2pRpcId,
34 },
35 ResponseReceived {
36 peer_id: PeerId,
37 id: P2pRpcId,
38 response: Option<Box<P2pRpcResponse>>,
39 },
40 RequestReceived {
41 peer_id: PeerId,
42 id: P2pRpcId,
43 request: Box<P2pRpcRequest>,
44 },
45 ResponsePending {
49 peer_id: PeerId,
50 id: P2pRpcId,
51 },
52 ResponseSend {
53 peer_id: PeerId,
54 id: P2pRpcId,
55 response: Option<Box<P2pRpcResponse>>,
56 },
57}
58
59impl P2pChannelsRpcAction {
60 pub fn peer_id(&self) -> &PeerId {
61 match self {
62 Self::Init { peer_id }
63 | Self::Pending { peer_id }
64 | Self::Ready { peer_id }
65 | Self::RequestSend { peer_id, .. }
66 | Self::Timeout { peer_id, .. }
67 | Self::ResponseReceived { peer_id, .. }
68 | Self::RequestReceived { peer_id, .. }
69 | Self::ResponsePending { peer_id, .. }
70 | Self::ResponseSend { peer_id, .. } => peer_id,
71 }
72 }
73}
74
75impl redux::EnablingCondition<P2pState> for P2pChannelsRpcAction {
76 fn is_enabled(&self, state: &P2pState, time: Timestamp) -> bool {
77 match self {
78 P2pChannelsRpcAction::Init { peer_id } => state
79 .get_ready_peer(peer_id)
80 .is_some_and(|p| matches!(p.channels.rpc, P2pChannelsRpcState::Enabled)),
81 P2pChannelsRpcAction::Pending { peer_id } => state
82 .get_ready_peer(peer_id)
83 .is_some_and(|p| matches!(p.channels.rpc, P2pChannelsRpcState::Init { .. })),
84 P2pChannelsRpcAction::Ready { peer_id } => state
85 .get_ready_peer(peer_id)
86 .is_some_and(|p| matches!(p.channels.rpc, P2pChannelsRpcState::Pending { .. })),
87 P2pChannelsRpcAction::RequestSend {
88 peer_id,
89 id,
90 request,
91 on_init: _,
92 } => state
93 .peers
94 .get(peer_id)
95 .filter(|p| !p.is_libp2p() || request.kind().supported_by_libp2p())
96 .and_then(|p| p.status.as_ready())
97 .is_some_and(|p| {
98 matches!(
99 &p.channels.rpc,
100 P2pChannelsRpcState::Ready {
101 local: P2pRpcLocalState::WaitingForRequest { .. }
102 | P2pRpcLocalState::Responded { .. },
103 ..
104 } if p.channels.next_local_rpc_id() == *id
105 )
106 }),
107 P2pChannelsRpcAction::Timeout { peer_id, id } => {
108 state.get_ready_peer(peer_id).is_some_and(|p| {
109 matches!(
110 &p.channels.rpc,
111 P2pChannelsRpcState::Ready {
112 local: P2pRpcLocalState::Requested { id: rpc_id, .. },
113 ..
114 } if rpc_id == id
115 )
116 }) && state.is_peer_rpc_timed_out(peer_id, *id, time)
117 }
118 P2pChannelsRpcAction::ResponseReceived { peer_id, id, .. } => {
119 state.get_ready_peer(peer_id).is_some_and(|p| {
122 match &p.channels.rpc {
123 P2pChannelsRpcState::Ready { local, .. } => {
124 matches!(
126 local,
127 P2pRpcLocalState::Requested { id: rpc_id, .. }
128 if rpc_id == id
129 )
130 }
131 _ => false,
132 }
133 })
134 }
135 P2pChannelsRpcAction::RequestReceived { peer_id, id, .. } => state
136 .get_ready_peer(peer_id)
137 .is_some_and(|p| match &p.channels.rpc {
138 P2pChannelsRpcState::Ready { remote, .. } => {
139 remote.pending_requests.len() < MAX_P2P_RPC_REMOTE_CONCURRENT_REQUESTS
140 && remote.pending_requests.iter().all(|v| v.id != *id)
141 }
142 _ => false,
143 }),
144 P2pChannelsRpcAction::ResponsePending { peer_id, id } => state
145 .get_ready_peer(peer_id)
146 .is_some_and(|p| match &p.channels.rpc {
147 P2pChannelsRpcState::Ready { remote, .. } => remote
148 .pending_requests
149 .iter()
150 .any(|v| v.id == *id && !v.is_pending),
151 _ => false,
152 }),
153 P2pChannelsRpcAction::ResponseSend {
154 peer_id,
155 id,
156 response: _response,
157 } => {
158 #[cfg(feature = "p2p-libp2p")]
159 if state.is_libp2p_peer(peer_id) {
160 let Some(response) = _response.as_ref() else {
161 return false;
162 };
163 return if !response.kind().supported_by_libp2p() {
164 false
165 } else if let Some(streams) =
166 state.network.scheduler.rpc_incoming_streams.get(peer_id)
167 {
168 !streams.is_empty()
169 } else {
170 false
171 };
172 }
173
174 state.get_ready_peer(peer_id).is_some_and(|p| {
175 match &p.channels.rpc {
176 P2pChannelsRpcState::Ready { remote, .. } => {
177 remote.pending_requests.iter().any(|v| v.id == *id)
179 }
180 _ => false,
181 }
182 })
183 }
184 }
185 }
186}
187
188use crate::channels::P2pChannelsAction;
189
190impl From<P2pChannelsRpcAction> for crate::P2pAction {
191 fn from(a: P2pChannelsRpcAction) -> Self {
192 Self::Channels(P2pChannelsAction::Rpc(a))
193 }
194}