p2p/network/kad/
p2p_network_kad_state.rs

1use std::{collections::BTreeMap, net::SocketAddr};
2
3use malloc_size_of_derive::MallocSizeOf;
4use redux::Timestamp;
5use serde::{Deserialize, Serialize};
6
7use super::{
8    bootstrap::P2pNetworkKadBootstrapState, request::P2pNetworkKadRequestState,
9    stream::P2pNetworkKadStreamState, P2pNetworkKadRoutingTable,
10};
11use crate::{
12    bootstrap::{P2pNetworkKadBootstrapRequestStat, P2pNetworkKadBootstrapStats},
13    is_time_passed, P2pTimeouts, PeerId, StreamId, StreamState,
14};
15
16/// Kademlia status.
17#[derive(Clone, Debug, Default, Serialize, Deserialize, MallocSizeOf)]
18#[serde(tag = "type")]
19pub enum P2pNetworkKadStatus {
20    /// Initial state.
21    #[default]
22    Init,
23    /// Bootstrap is in progress.
24    Bootstrapping(super::bootstrap::P2pNetworkKadBootstrapState),
25    /// Kademlia is bootstrapped.
26    Bootstrapped {
27        /// Timestamp of the bootstrap.
28        #[ignore_malloc_size_of = "doesn't allocate"]
29        time: Timestamp,
30        /// Stats for the latest bootstrap process.
31        stats: P2pNetworkKadBootstrapStats,
32    },
33}
34
35impl P2pNetworkKadStatus {
36    pub(crate) fn can_bootstrap(&self, now: Timestamp, timeouts: &P2pTimeouts) -> bool {
37        match self {
38            P2pNetworkKadStatus::Init => true,
39            P2pNetworkKadStatus::Bootstrapping(_) => false,
40            P2pNetworkKadStatus::Bootstrapped { time, stats } => {
41                let timeout = if stats.requests.iter().any(|req| {
42                        matches!(req, P2pNetworkKadBootstrapRequestStat::Successful(req) if !req.closest_peers.is_empty())
43                    }) {
44                        timeouts.kademlia_bootstrap
45                    } else {
46                        timeouts.kademlia_initial_bootstrap
47                    };
48                is_time_passed(now, *time, timeout)
49            }
50        }
51    }
52}
53
54#[derive(Clone, Debug, Serialize, Deserialize, MallocSizeOf)]
55pub struct P2pNetworkKadState {
56    pub routing_table: P2pNetworkKadRoutingTable,
57    pub latest_request_peers: P2pNetworkKadLatestRequestPeers,
58    #[with_malloc_size_of_func = "measurement::requests_map"]
59    pub requests: BTreeMap<PeerId, P2pNetworkKadRequestState>,
60    pub streams: StreamState<P2pNetworkKadStreamState>,
61    pub status: P2pNetworkKadStatus,
62    pub filter_addrs: bool,
63}
64
65impl Default for P2pNetworkKadState {
66    fn default() -> Self {
67        Self {
68            routing_table: Default::default(),
69            latest_request_peers: Default::default(),
70            requests: Default::default(),
71            streams: Default::default(),
72            status: Default::default(),
73            filter_addrs: std::env::var("OPENMINA_DISCOVERY_FILTER_ADDR")
74                .ok()
75                .and_then(|s| s.parse().ok())
76                .unwrap_or(true),
77        }
78    }
79}
80
81impl P2pNetworkKadState {
82    pub fn is_bootstrapped(&self) -> bool {
83        matches!(&self.status, P2pNetworkKadStatus::Bootstrapped { .. })
84    }
85
86    pub fn bootstrap_state(&self) -> Option<&super::bootstrap::P2pNetworkKadBootstrapState> {
87        if let P2pNetworkKadStatus::Bootstrapping(state) = &self.status {
88            Some(state)
89        } else {
90            None
91        }
92    }
93
94    pub fn bootstrap_state_mut(&mut self) -> Option<&mut P2pNetworkKadBootstrapState> {
95        if let P2pNetworkKadStatus::Bootstrapping(state) = &mut self.status {
96            Some(state)
97        } else {
98            None
99        }
100    }
101
102    pub fn bootstrap_stats(&self) -> Option<&P2pNetworkKadBootstrapStats> {
103        match &self.status {
104            P2pNetworkKadStatus::Init => None,
105            P2pNetworkKadStatus::Bootstrapping(state) => Some(&state.stats),
106            P2pNetworkKadStatus::Bootstrapped { stats, .. } => Some(stats),
107        }
108    }
109
110    pub fn request(&self, peer_id: &PeerId) -> Option<&P2pNetworkKadRequestState> {
111        self.requests.get(peer_id)
112    }
113
114    pub fn create_request(
115        &mut self,
116        addr: SocketAddr,
117        peer_id: PeerId,
118        key: PeerId,
119    ) -> Result<&mut P2pNetworkKadRequestState, &P2pNetworkKadRequestState> {
120        match self.requests.entry(peer_id) {
121            std::collections::btree_map::Entry::Vacant(v) => {
122                Ok(v.insert(P2pNetworkKadRequestState {
123                    peer_id,
124                    key,
125                    addr,
126                    status: crate::request::P2pNetworkKadRequestStatus::Default,
127                }))
128            }
129            std::collections::btree_map::Entry::Occupied(o) => Err(o.into_mut()),
130        }
131    }
132
133    pub fn find_kad_stream_state(
134        &self,
135        peer_id: &PeerId,
136        stream_id: &StreamId,
137    ) -> Option<&P2pNetworkKadStreamState> {
138        self.streams.get(peer_id)?.get(stream_id)
139    }
140
141    pub fn create_kad_stream_state(
142        &mut self,
143        incoming: bool,
144        peer_id: &PeerId,
145        stream_id: &StreamId,
146    ) -> Result<&mut P2pNetworkKadStreamState, &P2pNetworkKadStreamState> {
147        match self.streams.entry(*peer_id).or_default().entry(*stream_id) {
148            std::collections::btree_map::Entry::Vacant(e) => {
149                Ok(e.insert(P2pNetworkKadStreamState::new(incoming)))
150            }
151            std::collections::btree_map::Entry::Occupied(e) => Err(e.into_mut()),
152        }
153    }
154
155    pub fn find_kad_stream_state_mut(
156        &mut self,
157        peer_id: &PeerId,
158        stream_id: &StreamId,
159    ) -> Option<&mut P2pNetworkKadStreamState> {
160        self.streams.get_mut(peer_id)?.get_mut(stream_id)
161    }
162
163    pub fn remove_kad_stream_state(&mut self, peer_id: &PeerId, stream_id: &StreamId) -> bool {
164        self.streams
165            .get_mut(peer_id)
166            .is_some_and(|m| m.remove(stream_id).is_some())
167    }
168}
169
170#[derive(
171    Clone,
172    Debug,
173    Default,
174    Serialize,
175    Deserialize,
176    derive_more::Deref,
177    derive_more::From,
178    MallocSizeOf,
179)]
180pub struct P2pNetworkKadLatestRequestPeers(Vec<(PeerId, P2pNetworkKadLatestRequestPeerKind)>);
181
182impl P2pNetworkKadLatestRequestPeers {
183    pub fn get_new_peers(&self) -> impl Iterator<Item = &'_ PeerId> {
184        self.get_peers_of_kind(P2pNetworkKadLatestRequestPeerKind::New)
185    }
186
187    pub fn get_existing_peers(&self) -> impl Iterator<Item = &'_ PeerId> {
188        self.get_peers_of_kind(P2pNetworkKadLatestRequestPeerKind::Existing)
189    }
190
191    pub fn get_discarded_peers(&self) -> impl Iterator<Item = &'_ PeerId> {
192        self.get_peers_of_kind(P2pNetworkKadLatestRequestPeerKind::Discarded)
193    }
194
195    fn get_peers_of_kind(
196        &self,
197        kind: P2pNetworkKadLatestRequestPeerKind,
198    ) -> impl Iterator<Item = &'_ PeerId> {
199        self.iter()
200            .filter_map(move |(peer_id, k)| (kind == *k).then_some(peer_id))
201    }
202}
203
204#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, MallocSizeOf)]
205pub enum P2pNetworkKadLatestRequestPeerKind {
206    New,
207    Existing,
208    Discarded,
209}
210mod measurement {
211    use std::{collections::BTreeMap, mem};
212
213    use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
214
215    use super::P2pNetworkKadRequestState;
216    use crate::PeerId;
217
218    pub fn requests_map(
219        val: &BTreeMap<PeerId, P2pNetworkKadRequestState>,
220        ops: &mut MallocSizeOfOps,
221    ) -> usize {
222        val.iter()
223            .map(|(k, v)| mem::size_of_val(k) + mem::size_of_val(v) + v.size_of(ops))
224            .sum()
225    }
226}