p2p/channels/rpc/
p2p_channels_rpc_actions.rs

1use openmina_core::ActionEvent;
2use redux::Timestamp;
3use serde::{Deserialize, Serialize};
4
5use crate::{P2pState, PeerId};
6
7use super::{P2pChannelsRpcState, P2pRpcId, P2pRpcLocalState, P2pRpcRequest, P2pRpcResponse};
8
9pub type P2pChannelsRpcActionWithMetaRef<'a> = redux::ActionWithMeta<&'a P2pChannelsRpcAction>;
10
11pub const MAX_P2P_RPC_REMOTE_CONCURRENT_REQUESTS: usize = 5;
12
13#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
14#[action_event(fields(display(peer_id)))]
15pub enum P2pChannelsRpcAction {
16    Init {
17        peer_id: PeerId,
18    },
19    Pending {
20        peer_id: PeerId,
21    },
22    Ready {
23        peer_id: PeerId,
24    },
25    RequestSend {
26        peer_id: PeerId,
27        id: P2pRpcId,
28        request: Box<P2pRpcRequest>,
29        on_init: Option<redux::Callback<(PeerId, P2pRpcId, P2pRpcRequest)>>,
30    },
31    Timeout {
32        peer_id: PeerId,
33        id: P2pRpcId,
34    },
35    ResponseReceived {
36        peer_id: PeerId,
37        id: P2pRpcId,
38        response: Option<Box<P2pRpcResponse>>,
39    },
40    RequestReceived {
41        peer_id: PeerId,
42        id: P2pRpcId,
43        request: Box<P2pRpcRequest>,
44    },
45    /// Response for the request sent by peer is pending. Dispatched when
46    /// we need data from an async component, like ledger, for constructing
47    /// the response.
48    ResponsePending {
49        peer_id: PeerId,
50        id: P2pRpcId,
51    },
52    ResponseSend {
53        peer_id: PeerId,
54        id: P2pRpcId,
55        response: Option<Box<P2pRpcResponse>>,
56    },
57}
58
59impl P2pChannelsRpcAction {
60    pub fn peer_id(&self) -> &PeerId {
61        match self {
62            Self::Init { peer_id }
63            | Self::Pending { peer_id }
64            | Self::Ready { peer_id }
65            | Self::RequestSend { peer_id, .. }
66            | Self::Timeout { peer_id, .. }
67            | Self::ResponseReceived { peer_id, .. }
68            | Self::RequestReceived { peer_id, .. }
69            | Self::ResponsePending { peer_id, .. }
70            | Self::ResponseSend { peer_id, .. } => peer_id,
71        }
72    }
73}
74
75impl redux::EnablingCondition<P2pState> for P2pChannelsRpcAction {
76    fn is_enabled(&self, state: &P2pState, time: Timestamp) -> bool {
77        match self {
78            P2pChannelsRpcAction::Init { peer_id } => state
79                .get_ready_peer(peer_id)
80                .is_some_and(|p| matches!(p.channels.rpc, P2pChannelsRpcState::Enabled)),
81            P2pChannelsRpcAction::Pending { peer_id } => state
82                .get_ready_peer(peer_id)
83                .is_some_and(|p| matches!(p.channels.rpc, P2pChannelsRpcState::Init { .. })),
84            P2pChannelsRpcAction::Ready { peer_id } => state
85                .get_ready_peer(peer_id)
86                .is_some_and(|p| matches!(p.channels.rpc, P2pChannelsRpcState::Pending { .. })),
87            P2pChannelsRpcAction::RequestSend {
88                peer_id,
89                id,
90                request,
91                on_init: _,
92            } => state
93                .peers
94                .get(peer_id)
95                .filter(|p| !p.is_libp2p() || request.kind().supported_by_libp2p())
96                .and_then(|p| p.status.as_ready())
97                .is_some_and(|p| {
98                    matches!(
99                        &p.channels.rpc,
100                        P2pChannelsRpcState::Ready {
101                            local: P2pRpcLocalState::WaitingForRequest { .. }
102                                | P2pRpcLocalState::Responded { .. },
103                            ..
104                        } if p.channels.next_local_rpc_id() == *id
105                    )
106                }),
107            P2pChannelsRpcAction::Timeout { peer_id, id } => {
108                state.get_ready_peer(peer_id).is_some_and(|p| {
109                    matches!(
110                        &p.channels.rpc,
111                        P2pChannelsRpcState::Ready {
112                            local: P2pRpcLocalState::Requested { id: rpc_id, .. },
113                            ..
114                        } if rpc_id == id
115                    )
116                }) && state.is_peer_rpc_timed_out(peer_id, *id, time)
117            }
118            P2pChannelsRpcAction::ResponseReceived { peer_id, id, .. } => {
119                // TODO(binier): use consensus to enforce that peer doesn't send
120                // us inferior block than it has in the past.
121                state.get_ready_peer(peer_id).is_some_and(|p| {
122                    match &p.channels.rpc {
123                        P2pChannelsRpcState::Ready { local, .. } => {
124                            // TODO(binier): validate that response corresponds to request.
125                            matches!(
126                                local,
127                                P2pRpcLocalState::Requested { id: rpc_id, .. }
128                                    if rpc_id == id
129                            )
130                        }
131                        _ => false,
132                    }
133                })
134            }
135            P2pChannelsRpcAction::RequestReceived { peer_id, id, .. } => state
136                .get_ready_peer(peer_id)
137                .is_some_and(|p| match &p.channels.rpc {
138                    P2pChannelsRpcState::Ready { remote, .. } => {
139                        remote.pending_requests.len() < MAX_P2P_RPC_REMOTE_CONCURRENT_REQUESTS
140                            && remote.pending_requests.iter().all(|v| v.id != *id)
141                    }
142                    _ => false,
143                }),
144            P2pChannelsRpcAction::ResponsePending { peer_id, id } => state
145                .get_ready_peer(peer_id)
146                .is_some_and(|p| match &p.channels.rpc {
147                    P2pChannelsRpcState::Ready { remote, .. } => remote
148                        .pending_requests
149                        .iter()
150                        .any(|v| v.id == *id && !v.is_pending),
151                    _ => false,
152                }),
153            P2pChannelsRpcAction::ResponseSend {
154                peer_id,
155                id,
156                response: _response,
157            } => {
158                #[cfg(feature = "p2p-libp2p")]
159                if state.is_libp2p_peer(peer_id) {
160                    let Some(response) = _response.as_ref() else {
161                        return false;
162                    };
163                    return if !response.kind().supported_by_libp2p() {
164                        false
165                    } else if let Some(streams) =
166                        state.network.scheduler.rpc_incoming_streams.get(peer_id)
167                    {
168                        !streams.is_empty()
169                    } else {
170                        false
171                    };
172                }
173
174                state.get_ready_peer(peer_id).is_some_and(|p| {
175                    match &p.channels.rpc {
176                        P2pChannelsRpcState::Ready { remote, .. } => {
177                            // TODO(binier): validate that response corresponds to request.
178                            remote.pending_requests.iter().any(|v| v.id == *id)
179                        }
180                        _ => false,
181                    }
182                })
183            }
184        }
185    }
186}
187
188use crate::channels::P2pChannelsAction;
189
190impl From<P2pChannelsRpcAction> for crate::P2pAction {
191    fn from(a: P2pChannelsRpcAction) -> Self {
192        Self::Channels(P2pChannelsAction::Rpc(a))
193    }
194}