mina_p2p/network/kad/
p2p_network_kad_state.rs

1use std::{collections::BTreeMap, net::SocketAddr};
2
3use malloc_size_of_derive::MallocSizeOf;
4use redux::Timestamp;
5use serde::{Deserialize, Serialize};
6
7use super::{
8    bootstrap::P2pNetworkKadBootstrapState, request::P2pNetworkKadRequestState,
9    stream::P2pNetworkKadStreamState, P2pNetworkKadRoutingTable,
10};
11use crate::{
12    bootstrap::{P2pNetworkKadBootstrapRequestStat, P2pNetworkKadBootstrapStats},
13    is_time_passed, P2pTimeouts, PeerId, StreamId, StreamState,
14};
15
16/// Kademlia status.
17#[derive(Clone, Debug, Default, Serialize, Deserialize, MallocSizeOf)]
18#[serde(tag = "type")]
19pub enum P2pNetworkKadStatus {
20    /// Initial state.
21    #[default]
22    Init,
23    /// Bootstrap is in progress.
24    Bootstrapping(super::bootstrap::P2pNetworkKadBootstrapState),
25    /// Kademlia is bootstrapped.
26    Bootstrapped {
27        /// Timestamp of the bootstrap.
28        #[ignore_malloc_size_of = "doesn't allocate"]
29        time: Timestamp,
30        /// Stats for the latest bootstrap process.
31        stats: P2pNetworkKadBootstrapStats,
32    },
33}
34
35impl P2pNetworkKadStatus {
36    pub(crate) fn can_bootstrap(&self, now: Timestamp, timeouts: &P2pTimeouts) -> bool {
37        match self {
38            P2pNetworkKadStatus::Init => true,
39            P2pNetworkKadStatus::Bootstrapping(_) => false,
40            P2pNetworkKadStatus::Bootstrapped { time, stats } => {
41                let timeout = if stats.requests.iter().any(|req| {
42                        matches!(req, P2pNetworkKadBootstrapRequestStat::Successful(req) if !req.closest_peers.is_empty())
43                    }) {
44                        timeouts.kademlia_bootstrap
45                    } else {
46                        timeouts.kademlia_initial_bootstrap
47                    };
48                is_time_passed(now, *time, timeout)
49            }
50        }
51    }
52}
53
54#[derive(Clone, Debug, Serialize, Deserialize, MallocSizeOf)]
55pub struct P2pNetworkKadState {
56    pub routing_table: P2pNetworkKadRoutingTable,
57    pub latest_request_peers: P2pNetworkKadLatestRequestPeers,
58    #[with_malloc_size_of_func = "measurement::requests_map"]
59    pub requests: BTreeMap<PeerId, P2pNetworkKadRequestState>,
60    pub streams: StreamState<P2pNetworkKadStreamState>,
61    pub status: P2pNetworkKadStatus,
62    pub filter_addrs: bool,
63    pub has_bootstrapped: bool,
64}
65
66impl Default for P2pNetworkKadState {
67    fn default() -> Self {
68        Self {
69            routing_table: Default::default(),
70            latest_request_peers: Default::default(),
71            requests: Default::default(),
72            streams: Default::default(),
73            status: Default::default(),
74            filter_addrs: std::env::var("MINA_DISCOVERY_FILTER_ADDR")
75                .ok()
76                .and_then(|s| s.parse().ok())
77                .unwrap_or(true),
78            has_bootstrapped: false,
79        }
80    }
81}
82
83impl P2pNetworkKadState {
84    pub fn is_bootstrapped(&self) -> bool {
85        matches!(&self.status, P2pNetworkKadStatus::Bootstrapped { .. })
86    }
87
88    pub fn bootstrap_state(&self) -> Option<&super::bootstrap::P2pNetworkKadBootstrapState> {
89        if let P2pNetworkKadStatus::Bootstrapping(state) = &self.status {
90            Some(state)
91        } else {
92            None
93        }
94    }
95
96    pub fn bootstrap_state_mut(&mut self) -> Option<&mut P2pNetworkKadBootstrapState> {
97        if let P2pNetworkKadStatus::Bootstrapping(state) = &mut self.status {
98            Some(state)
99        } else {
100            None
101        }
102    }
103
104    pub fn bootstrap_stats(&self) -> Option<&P2pNetworkKadBootstrapStats> {
105        match &self.status {
106            P2pNetworkKadStatus::Init => None,
107            P2pNetworkKadStatus::Bootstrapping(state) => Some(&state.stats),
108            P2pNetworkKadStatus::Bootstrapped { stats, .. } => Some(stats),
109        }
110    }
111
112    pub fn request(&self, peer_id: &PeerId) -> Option<&P2pNetworkKadRequestState> {
113        self.requests.get(peer_id)
114    }
115
116    pub fn create_request(
117        &mut self,
118        addr: SocketAddr,
119        peer_id: PeerId,
120        key: PeerId,
121    ) -> Result<&mut P2pNetworkKadRequestState, &P2pNetworkKadRequestState> {
122        match self.requests.entry(peer_id) {
123            std::collections::btree_map::Entry::Vacant(v) => {
124                Ok(v.insert(P2pNetworkKadRequestState {
125                    peer_id,
126                    key,
127                    addr,
128                    status: crate::request::P2pNetworkKadRequestStatus::Default,
129                }))
130            }
131            std::collections::btree_map::Entry::Occupied(o) => Err(o.into_mut()),
132        }
133    }
134
135    pub fn find_kad_stream_state(
136        &self,
137        peer_id: &PeerId,
138        stream_id: &StreamId,
139    ) -> Option<&P2pNetworkKadStreamState> {
140        self.streams.get(peer_id)?.get(stream_id)
141    }
142
143    pub fn create_kad_stream_state(
144        &mut self,
145        incoming: bool,
146        peer_id: &PeerId,
147        stream_id: &StreamId,
148    ) -> Result<&mut P2pNetworkKadStreamState, &P2pNetworkKadStreamState> {
149        match self.streams.entry(*peer_id).or_default().entry(*stream_id) {
150            std::collections::btree_map::Entry::Vacant(e) => {
151                Ok(e.insert(P2pNetworkKadStreamState::new(incoming)))
152            }
153            std::collections::btree_map::Entry::Occupied(e) => Err(e.into_mut()),
154        }
155    }
156
157    pub fn find_kad_stream_state_mut(
158        &mut self,
159        peer_id: &PeerId,
160        stream_id: &StreamId,
161    ) -> Option<&mut P2pNetworkKadStreamState> {
162        self.streams.get_mut(peer_id)?.get_mut(stream_id)
163    }
164
165    pub fn remove_kad_stream_state(&mut self, peer_id: &PeerId, stream_id: &StreamId) -> bool {
166        self.streams
167            .get_mut(peer_id)
168            .is_some_and(|m| m.remove(stream_id).is_some())
169    }
170}
171
172#[derive(
173    Clone,
174    Debug,
175    Default,
176    Serialize,
177    Deserialize,
178    derive_more::Deref,
179    derive_more::From,
180    MallocSizeOf,
181)]
182pub struct P2pNetworkKadLatestRequestPeers(Vec<(PeerId, P2pNetworkKadLatestRequestPeerKind)>);
183
184impl P2pNetworkKadLatestRequestPeers {
185    pub fn get_new_peers(&self) -> impl Iterator<Item = &'_ PeerId> {
186        self.get_peers_of_kind(P2pNetworkKadLatestRequestPeerKind::New)
187    }
188
189    pub fn get_existing_peers(&self) -> impl Iterator<Item = &'_ PeerId> {
190        self.get_peers_of_kind(P2pNetworkKadLatestRequestPeerKind::Existing)
191    }
192
193    pub fn get_discarded_peers(&self) -> impl Iterator<Item = &'_ PeerId> {
194        self.get_peers_of_kind(P2pNetworkKadLatestRequestPeerKind::Discarded)
195    }
196
197    fn get_peers_of_kind(
198        &self,
199        kind: P2pNetworkKadLatestRequestPeerKind,
200    ) -> impl Iterator<Item = &'_ PeerId> {
201        self.iter()
202            .filter_map(move |(peer_id, k)| (kind == *k).then_some(peer_id))
203    }
204}
205
206#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, MallocSizeOf)]
207pub enum P2pNetworkKadLatestRequestPeerKind {
208    New,
209    Existing,
210    Discarded,
211}
212mod measurement {
213    use std::{collections::BTreeMap, mem};
214
215    use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
216
217    use super::P2pNetworkKadRequestState;
218    use crate::PeerId;
219
220    pub fn requests_map(
221        val: &BTreeMap<PeerId, P2pNetworkKadRequestState>,
222        ops: &mut MallocSizeOfOps,
223    ) -> usize {
224        val.iter()
225            .map(|(k, v)| mem::size_of_val(k) + mem::size_of_val(v) + v.size_of(ops))
226            .sum()
227    }
228}