p2p/channels/rpc/
p2p_channels_rpc_state.rs

1use std::collections::VecDeque;
2
3use serde::{Deserialize, Serialize};
4
5use crate::P2pTimeouts;
6
7use super::{P2pRpcId, P2pRpcKind, P2pRpcRequest};
8
9#[derive(Serialize, Deserialize, Debug, Clone)]
10pub enum P2pChannelsRpcState {
11    Disabled,
12    Enabled,
13    Init {
14        time: redux::Timestamp,
15    },
16    Pending {
17        time: redux::Timestamp,
18    },
19    Ready {
20        time: redux::Timestamp,
21        /// We are the requestors here.
22        local: P2pRpcLocalState,
23        /// We are the responders here.
24        remote: P2pRpcRemoteState,
25    },
26}
27
28#[derive(Serialize, Deserialize, Debug, Clone)]
29pub enum P2pRpcLocalState {
30    WaitingForRequest {
31        time: redux::Timestamp,
32    },
33    Requested {
34        time: redux::Timestamp,
35        id: P2pRpcId,
36        request: Box<P2pRpcRequest>,
37    },
38    Responded {
39        time: redux::Timestamp,
40        id: P2pRpcId,
41        request: Box<P2pRpcRequest>,
42    },
43}
44
45#[derive(Serialize, Deserialize, Debug, Clone)]
46pub struct P2pRpcRemoteState {
47    pub pending_requests: VecDeque<P2pRpcRemotePendingRequestState>,
48    pub last_responded: redux::Timestamp,
49}
50
51static EMPTY_REMOTE_REQUESTS: VecDeque<P2pRpcRemotePendingRequestState> = VecDeque::new();
52
53#[derive(Serialize, Deserialize, Debug, Clone)]
54pub struct P2pRpcRemotePendingRequestState {
55    pub time: redux::Timestamp,
56    pub id: P2pRpcId,
57    pub request: P2pRpcRequest,
58    /// If a given rpc request requires a response from some async component,
59    /// e.g. ledger, then this field will be set to `true` once request
60    /// to that async component is initiated.
61    pub is_pending: bool,
62}
63
64impl P2pChannelsRpcState {
65    pub fn is_ready(&self) -> bool {
66        matches!(self, Self::Ready { .. })
67    }
68
69    pub fn can_send_request(&self) -> bool {
70        match self {
71            Self::Ready { local, .. } => matches!(
72                local,
73                P2pRpcLocalState::WaitingForRequest { .. } | P2pRpcLocalState::Responded { .. }
74            ),
75            _ => false,
76        }
77    }
78
79    pub fn is_timed_out(
80        &self,
81        rpc_id: P2pRpcId,
82        now: redux::Timestamp,
83        config: &P2pTimeouts,
84    ) -> bool {
85        match self {
86            Self::Ready {
87                local: P2pRpcLocalState::Requested { time, id, request },
88                ..
89            } => {
90                rpc_id == *id
91                    && request
92                        .kind()
93                        .timeout(config)
94                        .and_then(|timeout| {
95                            let dur = now.checked_sub(*time)?;
96                            Some(dur >= timeout)
97                        })
98                        .unwrap_or(false)
99            }
100            _ => false,
101        }
102    }
103
104    pub fn pending_local_rpc_id(&self) -> Option<P2pRpcId> {
105        match self {
106            Self::Ready {
107                local: P2pRpcLocalState::Requested { id, .. },
108                ..
109            } => Some(*id),
110            _ => None,
111        }
112    }
113
114    pub fn pending_local_rpc(&self) -> Option<&P2pRpcRequest> {
115        match self {
116            Self::Ready {
117                local: P2pRpcLocalState::Requested { request, .. },
118                ..
119            } => Some(request),
120            _ => None,
121        }
122    }
123
124    pub fn pending_local_rpc_kind(&self) -> Option<P2pRpcKind> {
125        self.pending_local_rpc().map(|req| req.kind())
126    }
127
128    pub fn local_responded_request(&self) -> Option<(P2pRpcId, &P2pRpcRequest)> {
129        match self {
130            Self::Ready {
131                local: P2pRpcLocalState::Responded { id, request, .. },
132                ..
133            } => Some((*id, request)),
134            _ => None,
135        }
136    }
137
138    fn remote_requests(&self) -> impl Iterator<Item = &P2pRpcRemotePendingRequestState> {
139        match self {
140            Self::Ready { remote, .. } => remote.pending_requests.iter(),
141            _ => EMPTY_REMOTE_REQUESTS.iter(),
142        }
143    }
144
145    pub fn remote_todo_requests_iter(
146        &self,
147    ) -> impl Iterator<Item = &P2pRpcRemotePendingRequestState> {
148        self.remote_requests().filter(|req| !req.is_pending)
149    }
150
151    pub fn remote_pending_requests_iter(
152        &self,
153    ) -> impl Iterator<Item = &P2pRpcRemotePendingRequestState> {
154        self.remote_requests().filter(|req| req.is_pending)
155    }
156
157    pub fn remote_last_responded(&self) -> redux::Timestamp {
158        match self {
159            Self::Ready { remote, .. } => remote.last_responded,
160            _ => redux::Timestamp::ZERO,
161        }
162    }
163}