p2p/channels/rpc/
p2p_channels_rpc_state.rs1use std::collections::VecDeque;
2
3use serde::{Deserialize, Serialize};
4
5use crate::P2pTimeouts;
6
7use super::{P2pRpcId, P2pRpcKind, P2pRpcRequest};
8
9#[derive(Serialize, Deserialize, Debug, Clone)]
10pub enum P2pChannelsRpcState {
11 Disabled,
12 Enabled,
13 Init {
14 time: redux::Timestamp,
15 },
16 Pending {
17 time: redux::Timestamp,
18 },
19 Ready {
20 time: redux::Timestamp,
21 local: P2pRpcLocalState,
23 remote: P2pRpcRemoteState,
25 },
26}
27
28#[derive(Serialize, Deserialize, Debug, Clone)]
29pub enum P2pRpcLocalState {
30 WaitingForRequest {
31 time: redux::Timestamp,
32 },
33 Requested {
34 time: redux::Timestamp,
35 id: P2pRpcId,
36 request: Box<P2pRpcRequest>,
37 },
38 Responded {
39 time: redux::Timestamp,
40 id: P2pRpcId,
41 request: Box<P2pRpcRequest>,
42 },
43}
44
45#[derive(Serialize, Deserialize, Debug, Clone)]
46pub struct P2pRpcRemoteState {
47 pub pending_requests: VecDeque<P2pRpcRemotePendingRequestState>,
48 pub last_responded: redux::Timestamp,
49}
50
51static EMPTY_REMOTE_REQUESTS: VecDeque<P2pRpcRemotePendingRequestState> = VecDeque::new();
52
53#[derive(Serialize, Deserialize, Debug, Clone)]
54pub struct P2pRpcRemotePendingRequestState {
55 pub time: redux::Timestamp,
56 pub id: P2pRpcId,
57 pub request: P2pRpcRequest,
58 pub is_pending: bool,
62}
63
64impl P2pChannelsRpcState {
65 pub fn is_ready(&self) -> bool {
66 matches!(self, Self::Ready { .. })
67 }
68
69 pub fn can_send_request(&self) -> bool {
70 match self {
71 Self::Ready { local, .. } => matches!(
72 local,
73 P2pRpcLocalState::WaitingForRequest { .. } | P2pRpcLocalState::Responded { .. }
74 ),
75 _ => false,
76 }
77 }
78
79 pub fn is_timed_out(
80 &self,
81 rpc_id: P2pRpcId,
82 now: redux::Timestamp,
83 config: &P2pTimeouts,
84 ) -> bool {
85 match self {
86 Self::Ready {
87 local: P2pRpcLocalState::Requested { time, id, request },
88 ..
89 } => {
90 rpc_id == *id
91 && request
92 .kind()
93 .timeout(config)
94 .and_then(|timeout| {
95 let dur = now.checked_sub(*time)?;
96 Some(dur >= timeout)
97 })
98 .unwrap_or(false)
99 }
100 _ => false,
101 }
102 }
103
104 pub fn pending_local_rpc_id(&self) -> Option<P2pRpcId> {
105 match self {
106 Self::Ready {
107 local: P2pRpcLocalState::Requested { id, .. },
108 ..
109 } => Some(*id),
110 _ => None,
111 }
112 }
113
114 pub fn pending_local_rpc(&self) -> Option<&P2pRpcRequest> {
115 match self {
116 Self::Ready {
117 local: P2pRpcLocalState::Requested { request, .. },
118 ..
119 } => Some(request),
120 _ => None,
121 }
122 }
123
124 pub fn pending_local_rpc_kind(&self) -> Option<P2pRpcKind> {
125 self.pending_local_rpc().map(|req| req.kind())
126 }
127
128 pub fn local_responded_request(&self) -> Option<(P2pRpcId, &P2pRpcRequest)> {
129 match self {
130 Self::Ready {
131 local: P2pRpcLocalState::Responded { id, request, .. },
132 ..
133 } => Some((*id, request)),
134 _ => None,
135 }
136 }
137
138 fn remote_requests(&self) -> impl Iterator<Item = &P2pRpcRemotePendingRequestState> {
139 match self {
140 Self::Ready { remote, .. } => remote.pending_requests.iter(),
141 _ => EMPTY_REMOTE_REQUESTS.iter(),
142 }
143 }
144
145 pub fn remote_todo_requests_iter(
146 &self,
147 ) -> impl Iterator<Item = &P2pRpcRemotePendingRequestState> {
148 self.remote_requests().filter(|req| !req.is_pending)
149 }
150
151 pub fn remote_pending_requests_iter(
152 &self,
153 ) -> impl Iterator<Item = &P2pRpcRemotePendingRequestState> {
154 self.remote_requests().filter(|req| req.is_pending)
155 }
156
157 pub fn remote_last_responded(&self) -> redux::Timestamp {
158 match self {
159 Self::Ready { remote, .. } => remote.last_responded,
160 _ => redux::Timestamp::ZERO,
161 }
162 }
163}