p2p/network/
mod.rs

1mod p2p_network_actions;
2use malloc_size_of_derive::MallocSizeOf;
3use serde::{Deserialize, Serialize};
4
5use crate::Limit;
6
7use self::identify::stream::P2pNetworkIdentifyStreamError;
8pub use self::p2p_network_actions::*;
9
10mod p2p_network_service;
11pub use self::p2p_network_service::*;
12
13mod p2p_network_state;
14pub use self::p2p_network_state::P2pNetworkState;
15
16#[cfg(feature = "p2p-libp2p")]
17mod p2p_network_reducer;
18
19#[cfg(feature = "p2p-libp2p")]
20mod p2p_network_effects;
21
22pub mod scheduler;
23pub use self::scheduler::*;
24
25pub mod scheduler_effectful;
26pub use self::scheduler_effectful::*;
27
28pub mod pnet;
29pub use self::pnet::*;
30
31pub mod pnet_effectful;
32pub use self::pnet_effectful::*;
33
34pub mod select;
35pub use self::select::*;
36
37pub mod noise;
38pub use self::noise::*;
39
40pub mod yamux;
41use self::stream::{P2pNetworkKadIncomingStreamError, P2pNetworkKadOutgoingStreamError};
42pub use self::yamux::*;
43
44pub mod identify;
45
46pub mod kad;
47pub use self::kad::*;
48
49pub mod pubsub;
50pub use self::pubsub::*;
51
52pub mod rpc;
53pub use self::rpc::*;
54
55pub use self::data::{Data, DataSized};
56mod data {
57    use std::{fmt, ops};
58
59    use malloc_size_of_derive::MallocSizeOf;
60    use serde::{Deserialize, Serialize};
61
62    #[derive(Clone, MallocSizeOf)]
63    pub struct DataSized<const N: usize>(pub [u8; N]);
64
65    #[derive(Clone, Default, MallocSizeOf)]
66    pub struct Data(pub Box<[u8]>);
67
68    impl Data {
69        pub fn empty() -> Self {
70            Self(Box::new([0; 0]))
71        }
72    }
73
74    impl<const N: usize> Serialize for DataSized<N> {
75        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
76        where
77            S: serde::Serializer,
78        {
79            hex::encode(self.0).serialize(serializer)
80        }
81    }
82
83    impl<'de, const N: usize> Deserialize<'de> for DataSized<N> {
84        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
85        where
86            D: serde::Deserializer<'de>,
87        {
88            use serde::de::Error;
89
90            let hex_str = <&'de str>::deserialize(deserializer)?;
91            hex::decode(hex_str)
92                .map_err(Error::custom)
93                .and_then(|v| v.try_into().map_err(|_| Error::custom("wrong size")))
94                .map(Self)
95        }
96    }
97
98    impl<const N: usize> fmt::Display for DataSized<N> {
99        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100            write!(f, "{}", hex::encode(self.0))
101        }
102    }
103
104    impl<const N: usize> fmt::Debug for DataSized<N> {
105        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106            f.debug_tuple("Data").field(&self.to_string()).finish()
107        }
108    }
109
110    impl<const N: usize> From<[u8; N]> for DataSized<N> {
111        fn from(value: [u8; N]) -> Self {
112            Self(value)
113        }
114    }
115
116    impl Serialize for Data {
117        fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
118        where
119            S: serde::Serializer,
120        {
121            hex::encode(&self.0).serialize(serializer)
122        }
123    }
124
125    impl<'de> Deserialize<'de> for Data {
126        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
127        where
128            D: serde::Deserializer<'de>,
129        {
130            let hex_str = <&'de str>::deserialize(deserializer)?;
131            hex::decode(hex_str)
132                .map_err(serde::de::Error::custom)
133                .map(Vec::into_boxed_slice)
134                .map(Self)
135        }
136    }
137
138    impl fmt::Display for Data {
139        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
140            write!(f, "{} (len {})", hex::encode(&self.0), self.0.len())
141        }
142    }
143
144    impl fmt::Debug for Data {
145        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
146            let s = if self.len() > 32 {
147                let l = self.len();
148                format!(
149                    "{}...omitted...{} (len {})",
150                    hex::encode(&self.0[..12]),
151                    hex::encode(&self.0[(l - 12)..]),
152                    l
153                )
154            } else {
155                self.to_string()
156            };
157            f.debug_tuple("Data").field(&s).finish()
158        }
159    }
160
161    impl From<Vec<u8>> for Data {
162        fn from(value: Vec<u8>) -> Self {
163            Self(value.into_boxed_slice())
164        }
165    }
166
167    impl From<Box<[u8]>> for Data {
168        fn from(value: Box<[u8]>) -> Self {
169            Self(value)
170        }
171    }
172
173    impl ops::Deref for Data {
174        type Target = [u8];
175
176        fn deref(&self) -> &Self::Target {
177            &self.0
178        }
179    }
180
181    impl ops::DerefMut for Data {
182        fn deref_mut(&mut self) -> &mut Self::Target {
183            &mut self.0
184        }
185    }
186}
187
188/// Errors that might happen while handling protobuf messages received via a stream.
189#[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize, MallocSizeOf)]
190pub enum P2pNetworkStreamProtobufError<T> {
191    #[error("error reading message length")]
192    MessageLength,
193    #[error("message is too long: {0} exceeds {1}")]
194    Limit(
195        usize,
196        #[ignore_malloc_size_of = "doesn't allocate"] Limit<usize>,
197    ),
198    #[error("error reading message: {0}")]
199    Message(String),
200    #[error("error converting protobuf message: {0}")]
201    Convert(#[from] T),
202}