p2p/network/select/
p2p_network_select_state.rs

1use std::collections::VecDeque;
2
3use malloc_size_of_derive::MallocSizeOf;
4use redux::Timestamp;
5use serde::{Deserialize, Serialize};
6use token::Token;
7
8use crate::{ConnectionAddr, Data, P2pTimeouts};
9
10use super::*;
11
12#[derive(Default, Serialize, Deserialize, Debug, Clone, MallocSizeOf)]
13pub struct P2pNetworkSelectState {
14    #[ignore_malloc_size_of = "doesn't allocate"]
15    pub time: Option<Timestamp>,
16    pub recv: token::State,
17    pub tokens: VecDeque<token::Token>,
18
19    pub negotiated: Option<Option<token::Protocol>>,
20
21    pub inner: P2pNetworkSelectStateInner,
22    pub to_send: Option<token::Token>,
23}
24
25impl P2pNetworkSelectState {
26    pub fn initiator_auth(kind: token::AuthKind, time: Timestamp) -> Self {
27        P2pNetworkSelectState {
28            time: Some(time),
29            inner: P2pNetworkSelectStateInner::Uncertain {
30                proposing: token::Protocol::Auth(kind),
31            },
32            ..Default::default()
33        }
34    }
35
36    pub fn initiator_mux(kind: token::MuxKind, time: Timestamp) -> Self {
37        P2pNetworkSelectState {
38            time: Some(time),
39            inner: P2pNetworkSelectStateInner::Initiator {
40                proposing: token::Protocol::Mux(kind),
41            },
42            ..Default::default()
43        }
44    }
45
46    pub fn initiator_stream(kind: token::StreamKind, time: Timestamp) -> Self {
47        P2pNetworkSelectState {
48            time: Some(time),
49            inner: P2pNetworkSelectStateInner::Initiator {
50                proposing: token::Protocol::Stream(kind),
51            },
52            ..Default::default()
53        }
54    }
55
56    pub fn default_timed(time: Timestamp) -> Self {
57        P2pNetworkSelectState {
58            time: Some(time),
59            ..Default::default()
60        }
61    }
62
63    pub fn is_timed_out(&self, now: Timestamp, timeouts: &P2pTimeouts) -> bool {
64        if self.negotiated.is_some() {
65            return false;
66        }
67
68        if let Some(time) = self.time {
69            now.checked_sub(time)
70                .and_then(|dur| timeouts.select.map(|to| dur >= to))
71                .unwrap_or(false)
72        } else {
73            false
74        }
75    }
76
77    pub fn is_incoming(&self) -> bool {
78        matches!(&self.inner, P2pNetworkSelectStateInner::Responder)
79    }
80
81    /// Propagates incoming data to corresponding action
82    #[allow(dead_code)]
83    pub(super) fn forward_incoming_data(
84        &self,
85        kind: SelectKind,
86        addr: ConnectionAddr,
87        data: Data,
88        fin: bool,
89    ) -> Vec<P2pNetworkSelectAction> {
90        if self.negotiated.is_some() {
91            vec![kind.forward_data(addr, data, fin)]
92        } else {
93            let mut tokens = vec![];
94            let payload_data = &self.recv.buffer;
95            let mut tokens_parsed = false;
96
97            for token in &self.tokens {
98                if !tokens_parsed {
99                    tokens_parsed =
100                        matches!(token, Token::Protocol(..) | Token::UnknownProtocol(..));
101                }
102
103                tokens.push(P2pNetworkSelectAction::IncomingToken { addr, kind });
104            }
105
106            if tokens_parsed && !payload_data.is_empty() {
107                tokens.push(kind.forward_data(addr, Data::from(payload_data.clone()), fin));
108            }
109
110            tokens
111        }
112    }
113}
114
115#[derive(Serialize, Deserialize, Debug, Clone, MallocSizeOf)]
116pub enum P2pNetworkSelectStateInner {
117    Error(String),
118    Initiator { proposing: token::Protocol },
119    Uncertain { proposing: token::Protocol },
120    Responder,
121}
122
123impl Default for P2pNetworkSelectStateInner {
124    fn default() -> Self {
125        Self::Responder
126    }
127}