p2p/channels/
mod.rs

1pub mod best_tip;
2pub mod rpc;
3pub mod signaling;
4pub mod snark;
5pub mod snark_job_commitment;
6pub mod streaming_rpc;
7pub mod transaction;
8
9mod p2p_channels_state;
10pub use p2p_channels_state::*;
11
12mod p2p_channels_actions;
13pub use p2p_channels_actions::*;
14
15mod p2p_channels_reducer;
16
17mod p2p_channels_service;
18pub use p2p_channels_service::*;
19
20mod p2p_channels_effectful_effects;
21
22use binprot::{BinProtRead, BinProtWrite};
23use binprot_derive::{BinProtRead, BinProtWrite};
24use derive_more::From;
25use serde::{Deserialize, Serialize};
26use signaling::{discovery::SignalingDiscoveryChannelMsg, exchange::SignalingExchangeChannelMsg};
27use strum_macros::EnumIter;
28
29use self::{
30    best_tip::BestTipPropagationChannelMsg, rpc::RpcChannelMsg, snark::SnarkPropagationChannelMsg,
31    snark_job_commitment::SnarkJobCommitmentPropagationChannelMsg,
32    streaming_rpc::StreamingRpcChannelMsg, transaction::TransactionPropagationChannelMsg,
33};
34
35#[derive(Serialize, Deserialize, EnumIter, Debug, Ord, PartialOrd, Eq, PartialEq, Clone, Copy)]
36#[repr(u8)]
37pub enum ChannelId {
38    SignalingDiscovery = 1,
39    SignalingExchange = 2,
40    BestTipPropagation = 3,
41    TransactionPropagation = 4,
42    SnarkPropagation = 5,
43    SnarkJobCommitmentPropagation = 6,
44    Rpc = 7,
45    StreamingRpc = 8,
46}
47
48impl ChannelId {
49    #[inline(always)]
50    pub fn to_u8(self) -> u8 {
51        self as u8
52    }
53
54    #[inline(always)]
55    pub fn to_u16(self) -> u16 {
56        self as u16
57    }
58
59    pub fn name(self) -> &'static str {
60        match self {
61            Self::SignalingDiscovery => "signaling/discovery",
62            Self::SignalingExchange => "signaling/exchange",
63            Self::BestTipPropagation => "best_tip/propagation",
64            Self::TransactionPropagation => "transaction/propagation",
65            Self::SnarkPropagation => "snark/propagation",
66            Self::SnarkJobCommitmentPropagation => "snark_job_commitment/propagation",
67            Self::Rpc => "rpc",
68            Self::StreamingRpc => "rpc/streaming",
69        }
70    }
71
72    pub fn supported_by_libp2p(self) -> bool {
73        match self {
74            Self::SignalingDiscovery => false,
75            Self::SignalingExchange => false,
76            Self::BestTipPropagation => true,
77            Self::TransactionPropagation => true,
78            Self::SnarkPropagation => true,
79            Self::SnarkJobCommitmentPropagation => false,
80            Self::Rpc => true,
81            Self::StreamingRpc => false,
82        }
83    }
84
85    pub fn max_msg_size(self) -> usize {
86        match self {
87            // TODO(binier): measure signaling message sizes
88            Self::SignalingDiscovery => 16 * 1024, // 16KB
89            Self::SignalingExchange => 16 * 1024,  // 16KB
90            // TODO(binier): reduce this value once we change message for best tip
91            // propagation to just propagating consensus state with block hash.
92            Self::BestTipPropagation => 32 * 1024 * 1024, // 32MB
93            Self::TransactionPropagation => 1024,         // 1KB - just transaction info.
94            Self::SnarkPropagation => 1024,               // 1KB - just snark info.
95            Self::SnarkJobCommitmentPropagation => 2 * 1024, // 2KB,
96            Self::Rpc => 256 * 1024 * 1024,               // 256MB,
97            Self::StreamingRpc => 16 * 1024 * 1024,       // 16MB,
98        }
99    }
100
101    pub fn iter_all() -> impl Iterator<Item = ChannelId> {
102        <Self as strum::IntoEnumIterator>::iter()
103    }
104
105    pub fn for_libp2p() -> impl Iterator<Item = ChannelId> {
106        Self::iter_all().filter(|chan| chan.supported_by_libp2p())
107    }
108}
109
110impl std::fmt::Display for ChannelId {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        write!(f, "{}", self.to_u8())
113    }
114}
115
116#[derive(Serialize, Deserialize, Debug, Ord, PartialOrd, Eq, PartialEq, Clone, Copy)]
117pub struct MsgId(u64);
118
119impl MsgId {
120    pub fn first() -> Self {
121        Self(1)
122    }
123
124    pub fn next(self) -> Self {
125        Self(self.0 + 1)
126    }
127}
128
129#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, From, Debug, Clone)]
130pub enum ChannelMsg {
131    SignalingDiscovery(SignalingDiscoveryChannelMsg),
132    SignalingExchange(SignalingExchangeChannelMsg),
133    BestTipPropagation(BestTipPropagationChannelMsg),
134    TransactionPropagation(TransactionPropagationChannelMsg),
135    SnarkPropagation(SnarkPropagationChannelMsg),
136    SnarkJobCommitmentPropagation(SnarkJobCommitmentPropagationChannelMsg),
137    Rpc(RpcChannelMsg),
138    StreamingRpc(StreamingRpcChannelMsg),
139}
140
141impl ChannelMsg {
142    pub fn channel_id(&self) -> ChannelId {
143        match self {
144            Self::SignalingDiscovery(_) => ChannelId::SignalingDiscovery,
145            Self::SignalingExchange(_) => ChannelId::SignalingExchange,
146            Self::BestTipPropagation(_) => ChannelId::BestTipPropagation,
147            Self::TransactionPropagation(_) => ChannelId::TransactionPropagation,
148            Self::SnarkPropagation(_) => ChannelId::SnarkPropagation,
149            Self::SnarkJobCommitmentPropagation(_) => ChannelId::SnarkJobCommitmentPropagation,
150            Self::Rpc(_) => ChannelId::Rpc,
151            Self::StreamingRpc(_) => ChannelId::StreamingRpc,
152        }
153    }
154
155    pub fn encode<W>(&self, w: &mut W) -> std::io::Result<()>
156    where
157        W: std::io::Write,
158    {
159        match self {
160            Self::SignalingDiscovery(v) => v.binprot_write(w),
161            Self::SignalingExchange(v) => v.binprot_write(w),
162            Self::BestTipPropagation(v) => v.binprot_write(w),
163            Self::TransactionPropagation(v) => v.binprot_write(w),
164            Self::SnarkPropagation(v) => v.binprot_write(w),
165            Self::SnarkJobCommitmentPropagation(v) => v.binprot_write(w),
166            Self::Rpc(v) => v.binprot_write(w),
167            Self::StreamingRpc(v) => v.binprot_write(w),
168        }
169    }
170
171    pub fn decode<R>(r: &mut R, id: ChannelId) -> Result<Self, binprot::Error>
172    where
173        Self: Sized,
174        R: std::io::Read + ?Sized,
175    {
176        match id {
177            ChannelId::SignalingDiscovery => {
178                SignalingDiscoveryChannelMsg::binprot_read(r).map(|v| v.into())
179            }
180            ChannelId::SignalingExchange => {
181                SignalingExchangeChannelMsg::binprot_read(r).map(|v| v.into())
182            }
183            ChannelId::BestTipPropagation => {
184                BestTipPropagationChannelMsg::binprot_read(r).map(|v| v.into())
185            }
186            ChannelId::TransactionPropagation => {
187                TransactionPropagationChannelMsg::binprot_read(r).map(|v| v.into())
188            }
189            ChannelId::SnarkPropagation => {
190                SnarkPropagationChannelMsg::binprot_read(r).map(|v| v.into())
191            }
192            ChannelId::SnarkJobCommitmentPropagation => {
193                SnarkJobCommitmentPropagationChannelMsg::binprot_read(r).map(|v| v.into())
194            }
195            ChannelId::Rpc => RpcChannelMsg::binprot_read(r).map(|v| v.into()),
196            ChannelId::StreamingRpc => StreamingRpcChannelMsg::binprot_read(r).map(|v| v.into()),
197        }
198    }
199}
200
201impl crate::P2pState {
202    /// Initializes enabled channels.
203    pub fn channels_init<Action, State>(
204        &self,
205        dispatcher: &mut redux::Dispatcher<Action, State>,
206        peer_id: crate::PeerId,
207    ) where
208        State: crate::P2pStateTrait,
209        Action: crate::P2pActionTrait<State>,
210    {
211        // Dispatches can be done without a loop, but inside we do
212        // exhaustive matching so that we don't miss any channels.
213        for id in self.config.enabled_channels.iter().copied() {
214            match id {
215                ChannelId::SignalingDiscovery => {
216                    dispatcher.push(
217                        signaling::discovery::P2pChannelsSignalingDiscoveryAction::Init { peer_id },
218                    );
219                }
220                ChannelId::SignalingExchange => {
221                    dispatcher.push(
222                        signaling::exchange::P2pChannelsSignalingExchangeAction::Init { peer_id },
223                    );
224                }
225                ChannelId::BestTipPropagation => {
226                    dispatcher.push(best_tip::P2pChannelsBestTipAction::Init { peer_id });
227                }
228                ChannelId::TransactionPropagation => {
229                    dispatcher.push(transaction::P2pChannelsTransactionAction::Init { peer_id });
230                }
231                ChannelId::SnarkPropagation => {
232                    dispatcher.push(snark::P2pChannelsSnarkAction::Init { peer_id });
233                }
234                ChannelId::SnarkJobCommitmentPropagation => {
235                    dispatcher.push(
236                        snark_job_commitment::P2pChannelsSnarkJobCommitmentAction::Init { peer_id },
237                    );
238                }
239                ChannelId::Rpc => {
240                    dispatcher.push(rpc::P2pChannelsRpcAction::Init { peer_id });
241                }
242                ChannelId::StreamingRpc => {
243                    dispatcher.push(streaming_rpc::P2pChannelsStreamingRpcAction::Init { peer_id });
244                }
245            }
246        }
247    }
248}