1pub mod best_tip;
2pub mod rpc;
3pub mod signaling;
4pub mod snark;
5pub mod snark_job_commitment;
6pub mod streaming_rpc;
7pub mod transaction;
8
9mod p2p_channels_state;
10pub use p2p_channels_state::*;
11
12mod p2p_channels_actions;
13pub use p2p_channels_actions::*;
14
15mod p2p_channels_reducer;
16
17mod p2p_channels_service;
18pub use p2p_channels_service::*;
19
20mod p2p_channels_effectful_effects;
21
22use binprot::{BinProtRead, BinProtWrite};
23use binprot_derive::{BinProtRead, BinProtWrite};
24use derive_more::From;
25use serde::{Deserialize, Serialize};
26use signaling::{discovery::SignalingDiscoveryChannelMsg, exchange::SignalingExchangeChannelMsg};
27use strum_macros::EnumIter;
28
29use self::{
30 best_tip::BestTipPropagationChannelMsg, rpc::RpcChannelMsg, snark::SnarkPropagationChannelMsg,
31 snark_job_commitment::SnarkJobCommitmentPropagationChannelMsg,
32 streaming_rpc::StreamingRpcChannelMsg, transaction::TransactionPropagationChannelMsg,
33};
34
35#[derive(Serialize, Deserialize, EnumIter, Debug, Ord, PartialOrd, Eq, PartialEq, Clone, Copy)]
36#[repr(u8)]
37pub enum ChannelId {
38 SignalingDiscovery = 1,
39 SignalingExchange = 2,
40 BestTipPropagation = 3,
41 TransactionPropagation = 4,
42 SnarkPropagation = 5,
43 SnarkJobCommitmentPropagation = 6,
44 Rpc = 7,
45 StreamingRpc = 8,
46}
47
48impl ChannelId {
49 #[inline(always)]
50 pub fn to_u8(self) -> u8 {
51 self as u8
52 }
53
54 #[inline(always)]
55 pub fn to_u16(self) -> u16 {
56 self as u16
57 }
58
59 pub fn name(self) -> &'static str {
60 match self {
61 Self::SignalingDiscovery => "signaling/discovery",
62 Self::SignalingExchange => "signaling/exchange",
63 Self::BestTipPropagation => "best_tip/propagation",
64 Self::TransactionPropagation => "transaction/propagation",
65 Self::SnarkPropagation => "snark/propagation",
66 Self::SnarkJobCommitmentPropagation => "snark_job_commitment/propagation",
67 Self::Rpc => "rpc",
68 Self::StreamingRpc => "rpc/streaming",
69 }
70 }
71
72 pub fn supported_by_libp2p(self) -> bool {
73 match self {
74 Self::SignalingDiscovery => false,
75 Self::SignalingExchange => false,
76 Self::BestTipPropagation => true,
77 Self::TransactionPropagation => true,
78 Self::SnarkPropagation => true,
79 Self::SnarkJobCommitmentPropagation => false,
80 Self::Rpc => true,
81 Self::StreamingRpc => false,
82 }
83 }
84
85 pub fn max_msg_size(self) -> usize {
86 match self {
87 Self::SignalingDiscovery => 16 * 1024, Self::SignalingExchange => 16 * 1024, Self::BestTipPropagation => 32 * 1024 * 1024, Self::TransactionPropagation => 1024, Self::SnarkPropagation => 1024, Self::SnarkJobCommitmentPropagation => 2 * 1024, Self::Rpc => 256 * 1024 * 1024, Self::StreamingRpc => 16 * 1024 * 1024, }
99 }
100
101 pub fn iter_all() -> impl Iterator<Item = ChannelId> {
102 <Self as strum::IntoEnumIterator>::iter()
103 }
104
105 pub fn for_libp2p() -> impl Iterator<Item = ChannelId> {
106 Self::iter_all().filter(|chan| chan.supported_by_libp2p())
107 }
108}
109
110impl std::fmt::Display for ChannelId {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 write!(f, "{}", self.to_u8())
113 }
114}
115
116#[derive(Serialize, Deserialize, Debug, Ord, PartialOrd, Eq, PartialEq, Clone, Copy)]
117pub struct MsgId(u64);
118
119impl MsgId {
120 pub fn first() -> Self {
121 Self(1)
122 }
123
124 pub fn next(self) -> Self {
125 Self(self.0 + 1)
126 }
127}
128
129#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, From, Debug, Clone)]
130pub enum ChannelMsg {
131 SignalingDiscovery(SignalingDiscoveryChannelMsg),
132 SignalingExchange(SignalingExchangeChannelMsg),
133 BestTipPropagation(BestTipPropagationChannelMsg),
134 TransactionPropagation(TransactionPropagationChannelMsg),
135 SnarkPropagation(SnarkPropagationChannelMsg),
136 SnarkJobCommitmentPropagation(SnarkJobCommitmentPropagationChannelMsg),
137 Rpc(RpcChannelMsg),
138 StreamingRpc(StreamingRpcChannelMsg),
139}
140
141impl ChannelMsg {
142 pub fn channel_id(&self) -> ChannelId {
143 match self {
144 Self::SignalingDiscovery(_) => ChannelId::SignalingDiscovery,
145 Self::SignalingExchange(_) => ChannelId::SignalingExchange,
146 Self::BestTipPropagation(_) => ChannelId::BestTipPropagation,
147 Self::TransactionPropagation(_) => ChannelId::TransactionPropagation,
148 Self::SnarkPropagation(_) => ChannelId::SnarkPropagation,
149 Self::SnarkJobCommitmentPropagation(_) => ChannelId::SnarkJobCommitmentPropagation,
150 Self::Rpc(_) => ChannelId::Rpc,
151 Self::StreamingRpc(_) => ChannelId::StreamingRpc,
152 }
153 }
154
155 pub fn encode<W>(&self, w: &mut W) -> std::io::Result<()>
156 where
157 W: std::io::Write,
158 {
159 match self {
160 Self::SignalingDiscovery(v) => v.binprot_write(w),
161 Self::SignalingExchange(v) => v.binprot_write(w),
162 Self::BestTipPropagation(v) => v.binprot_write(w),
163 Self::TransactionPropagation(v) => v.binprot_write(w),
164 Self::SnarkPropagation(v) => v.binprot_write(w),
165 Self::SnarkJobCommitmentPropagation(v) => v.binprot_write(w),
166 Self::Rpc(v) => v.binprot_write(w),
167 Self::StreamingRpc(v) => v.binprot_write(w),
168 }
169 }
170
171 pub fn decode<R>(r: &mut R, id: ChannelId) -> Result<Self, binprot::Error>
172 where
173 Self: Sized,
174 R: std::io::Read + ?Sized,
175 {
176 match id {
177 ChannelId::SignalingDiscovery => {
178 SignalingDiscoveryChannelMsg::binprot_read(r).map(|v| v.into())
179 }
180 ChannelId::SignalingExchange => {
181 SignalingExchangeChannelMsg::binprot_read(r).map(|v| v.into())
182 }
183 ChannelId::BestTipPropagation => {
184 BestTipPropagationChannelMsg::binprot_read(r).map(|v| v.into())
185 }
186 ChannelId::TransactionPropagation => {
187 TransactionPropagationChannelMsg::binprot_read(r).map(|v| v.into())
188 }
189 ChannelId::SnarkPropagation => {
190 SnarkPropagationChannelMsg::binprot_read(r).map(|v| v.into())
191 }
192 ChannelId::SnarkJobCommitmentPropagation => {
193 SnarkJobCommitmentPropagationChannelMsg::binprot_read(r).map(|v| v.into())
194 }
195 ChannelId::Rpc => RpcChannelMsg::binprot_read(r).map(|v| v.into()),
196 ChannelId::StreamingRpc => StreamingRpcChannelMsg::binprot_read(r).map(|v| v.into()),
197 }
198 }
199}
200
201impl crate::P2pState {
202 pub fn channels_init<Action, State>(
204 &self,
205 dispatcher: &mut redux::Dispatcher<Action, State>,
206 peer_id: crate::PeerId,
207 ) where
208 State: crate::P2pStateTrait,
209 Action: crate::P2pActionTrait<State>,
210 {
211 for id in self.config.enabled_channels.iter().copied() {
214 match id {
215 ChannelId::SignalingDiscovery => {
216 dispatcher.push(
217 signaling::discovery::P2pChannelsSignalingDiscoveryAction::Init { peer_id },
218 );
219 }
220 ChannelId::SignalingExchange => {
221 dispatcher.push(
222 signaling::exchange::P2pChannelsSignalingExchangeAction::Init { peer_id },
223 );
224 }
225 ChannelId::BestTipPropagation => {
226 dispatcher.push(best_tip::P2pChannelsBestTipAction::Init { peer_id });
227 }
228 ChannelId::TransactionPropagation => {
229 dispatcher.push(transaction::P2pChannelsTransactionAction::Init { peer_id });
230 }
231 ChannelId::SnarkPropagation => {
232 dispatcher.push(snark::P2pChannelsSnarkAction::Init { peer_id });
233 }
234 ChannelId::SnarkJobCommitmentPropagation => {
235 dispatcher.push(
236 snark_job_commitment::P2pChannelsSnarkJobCommitmentAction::Init { peer_id },
237 );
238 }
239 ChannelId::Rpc => {
240 dispatcher.push(rpc::P2pChannelsRpcAction::Init { peer_id });
241 }
242 ChannelId::StreamingRpc => {
243 dispatcher.push(streaming_rpc::P2pChannelsStreamingRpcAction::Init { peer_id });
244 }
245 }
246 }
247 }
248}