p2p/network/select/
p2p_network_select_state.rs1use std::collections::VecDeque;
2
3use malloc_size_of_derive::MallocSizeOf;
4use redux::Timestamp;
5use serde::{Deserialize, Serialize};
6use token::Token;
7
8use crate::{ConnectionAddr, Data, P2pTimeouts};
9
10use super::*;
11
12#[derive(Default, Serialize, Deserialize, Debug, Clone, MallocSizeOf)]
13pub struct P2pNetworkSelectState {
14 #[ignore_malloc_size_of = "doesn't allocate"]
15 pub time: Option<Timestamp>,
16 pub recv: token::State,
17 pub tokens: VecDeque<token::Token>,
18
19 pub negotiated: Option<Option<token::Protocol>>,
20
21 pub inner: P2pNetworkSelectStateInner,
22 pub to_send: Option<token::Token>,
23}
24
25impl P2pNetworkSelectState {
26 pub fn initiator_auth(kind: token::AuthKind, time: Timestamp) -> Self {
27 P2pNetworkSelectState {
28 time: Some(time),
29 inner: P2pNetworkSelectStateInner::Uncertain {
30 proposing: token::Protocol::Auth(kind),
31 },
32 ..Default::default()
33 }
34 }
35
36 pub fn initiator_mux(kind: token::MuxKind, time: Timestamp) -> Self {
37 P2pNetworkSelectState {
38 time: Some(time),
39 inner: P2pNetworkSelectStateInner::Initiator {
40 proposing: token::Protocol::Mux(kind),
41 },
42 ..Default::default()
43 }
44 }
45
46 pub fn initiator_stream(kind: token::StreamKind, time: Timestamp) -> Self {
47 P2pNetworkSelectState {
48 time: Some(time),
49 inner: P2pNetworkSelectStateInner::Initiator {
50 proposing: token::Protocol::Stream(kind),
51 },
52 ..Default::default()
53 }
54 }
55
56 pub fn default_timed(time: Timestamp) -> Self {
57 P2pNetworkSelectState {
58 time: Some(time),
59 ..Default::default()
60 }
61 }
62
63 pub fn is_timed_out(&self, now: Timestamp, timeouts: &P2pTimeouts) -> bool {
64 if self.negotiated.is_some() {
65 return false;
66 }
67
68 if let Some(time) = self.time {
69 now.checked_sub(time)
70 .and_then(|dur| timeouts.select.map(|to| dur >= to))
71 .unwrap_or(false)
72 } else {
73 false
74 }
75 }
76
77 pub fn is_incoming(&self) -> bool {
78 matches!(&self.inner, P2pNetworkSelectStateInner::Responder)
79 }
80
81 #[allow(dead_code)]
83 pub(super) fn forward_incoming_data(
84 &self,
85 kind: SelectKind,
86 addr: ConnectionAddr,
87 data: Data,
88 fin: bool,
89 ) -> Vec<P2pNetworkSelectAction> {
90 if self.negotiated.is_some() {
91 vec![kind.forward_data(addr, data, fin)]
92 } else {
93 let mut tokens = vec![];
94 let payload_data = &self.recv.buffer;
95 let mut tokens_parsed = false;
96
97 for token in &self.tokens {
98 if !tokens_parsed {
99 tokens_parsed =
100 matches!(token, Token::Protocol(..) | Token::UnknownProtocol(..));
101 }
102
103 tokens.push(P2pNetworkSelectAction::IncomingToken { addr, kind });
104 }
105
106 if tokens_parsed && !payload_data.is_empty() {
107 tokens.push(kind.forward_data(addr, Data::from(payload_data.clone()), fin));
108 }
109
110 tokens
111 }
112 }
113}
114
115#[derive(Serialize, Deserialize, Debug, Clone, MallocSizeOf)]
116pub enum P2pNetworkSelectStateInner {
117 Error(String),
118 Initiator { proposing: token::Protocol },
119 Uncertain { proposing: token::Protocol },
120 Responder,
121}
122
123impl Default for P2pNetworkSelectStateInner {
124 fn default() -> Self {
125 Self::Responder
126 }
127}