1use super::{
2 best_tip::{BestTipPropagationChannelMsg, P2pChannelsBestTipAction, P2pChannelsBestTipState},
3 rpc::{P2pChannelsRpcAction, P2pChannelsRpcState, RpcChannelMsg},
4 signaling::{
5 discovery::{
6 P2pChannelsSignalingDiscoveryAction, P2pChannelsSignalingDiscoveryState,
7 SignalingDiscoveryChannelMsg,
8 },
9 exchange::{
10 P2pChannelsSignalingExchangeAction, P2pChannelsSignalingExchangeState,
11 SignalingExchangeChannelMsg,
12 },
13 },
14 snark::{P2pChannelsSnarkAction, P2pChannelsSnarkState, SnarkPropagationChannelMsg},
15 snark_job_commitment::{
16 P2pChannelsSnarkJobCommitmentAction, P2pChannelsSnarkJobCommitmentState,
17 SnarkJobCommitmentPropagationChannelMsg,
18 },
19 streaming_rpc::{
20 P2pChannelsStreamingRpcAction, P2pChannelsStreamingRpcState, StreamingRpcChannelMsg,
21 },
22 transaction::{
23 P2pChannelsTransactionAction, P2pChannelsTransactionState, TransactionPropagationChannelMsg,
24 },
25 ChannelMsg, P2pChannelsAction, P2pChannelsMessageReceivedAction, P2pChannelsState,
26};
27use crate::{
28 disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
29 P2pState,
30};
31use openmina_core::{block::BlockWithHash, error, Substate};
32use redux::{ActionWithMeta, Dispatcher};
33
34impl P2pChannelsState {
35 pub fn reducer<Action, State>(
36 state_context: Substate<Action, State, P2pState>,
37 action: ActionWithMeta<P2pChannelsAction>,
38 ) -> Result<(), String>
39 where
40 State: crate::P2pStateTrait,
41 Action: crate::P2pActionTrait<State>,
42 {
43 let (action, meta) = action.split();
44
45 match action {
46 P2pChannelsAction::MessageReceived(action) => {
47 let (dispatcher, state) = state_context.into_dispatcher_and_state();
48 Self::dispatch_message(meta.with_action(action), dispatcher, state)
49 }
50 P2pChannelsAction::SignalingDiscovery(action) => {
51 P2pChannelsSignalingDiscoveryState::reducer(state_context, meta.with_action(action))
52 }
53 P2pChannelsAction::SignalingExchange(action) => {
54 P2pChannelsSignalingExchangeState::reducer(state_context, meta.with_action(action))
55 }
56 P2pChannelsAction::BestTip(action) => {
57 P2pChannelsBestTipState::reducer(state_context, meta.with_action(action))
58 }
59 P2pChannelsAction::Transaction(action) => {
60 P2pChannelsTransactionState::reducer(state_context, meta.with_action(action))
61 }
62 P2pChannelsAction::Snark(action) => {
63 P2pChannelsSnarkState::reducer(state_context, meta.with_action(action))
64 }
65 P2pChannelsAction::SnarkJobCommitment(action) => {
66 P2pChannelsSnarkJobCommitmentState::reducer(state_context, meta.with_action(action))
67 }
68 P2pChannelsAction::Rpc(action) => {
69 P2pChannelsRpcState::reducer(state_context, meta.with_action(action))
70 }
71 P2pChannelsAction::StreamingRpc(action) => {
72 P2pChannelsStreamingRpcState::reducer(state_context, meta.with_action(action))
73 }
74 }
75 }
76
77 fn dispatch_message<Action, State>(
78 action: ActionWithMeta<P2pChannelsMessageReceivedAction>,
79 dispatcher: &mut Dispatcher<Action, State>,
80 state: &State,
81 ) -> Result<(), String>
82 where
83 State: crate::P2pStateTrait,
84 Action: crate::P2pActionTrait<State>,
85 {
86 let (action, meta) = action.split();
87 let time = meta.time();
88
89 let peer_id = action.peer_id;
90 let chain_id = action.message.channel_id();
91
92 let mut is_enabled = |action: Action| dispatcher.push_if_enabled(action, state, time);
93
94 let was_expected = match *action.message {
95 ChannelMsg::SignalingDiscovery(msg) => match msg {
96 SignalingDiscoveryChannelMsg::GetNext => is_enabled(
97 P2pChannelsSignalingDiscoveryAction::RequestReceived { peer_id }.into(),
98 ),
99 SignalingDiscoveryChannelMsg::Discover => is_enabled(
100 P2pChannelsSignalingDiscoveryAction::DiscoveryRequestReceived { peer_id }
101 .into(),
102 ),
103 SignalingDiscoveryChannelMsg::Discovered { target_public_key } => is_enabled(
104 P2pChannelsSignalingDiscoveryAction::DiscoveredReceived {
105 peer_id,
106 target_public_key,
107 }
108 .into(),
109 ),
110 SignalingDiscoveryChannelMsg::DiscoveredReject => is_enabled(
111 P2pChannelsSignalingDiscoveryAction::DiscoveredRejectReceived { peer_id }
112 .into(),
113 ),
114 SignalingDiscoveryChannelMsg::DiscoveredAccept(offer) => is_enabled(
115 P2pChannelsSignalingDiscoveryAction::DiscoveredAcceptReceived {
116 peer_id,
117 offer,
118 }
119 .into(),
120 ),
121 SignalingDiscoveryChannelMsg::Answer(answer) => is_enabled(
122 P2pChannelsSignalingDiscoveryAction::AnswerReceived { peer_id, answer }.into(),
123 ),
124 },
125 ChannelMsg::SignalingExchange(msg) => match msg {
126 SignalingExchangeChannelMsg::GetNext => is_enabled(
127 P2pChannelsSignalingExchangeAction::RequestReceived { peer_id }.into(),
128 ),
129 SignalingExchangeChannelMsg::OfferToYou {
130 offerer_pub_key,
131 offer,
132 } => is_enabled(
133 P2pChannelsSignalingExchangeAction::OfferReceived {
134 peer_id,
135 offerer_pub_key,
136 offer,
137 }
138 .into(),
139 ),
140 SignalingExchangeChannelMsg::Answer(answer) => is_enabled(
141 P2pChannelsSignalingExchangeAction::AnswerReceived { peer_id, answer }.into(),
142 ),
143 },
144 ChannelMsg::BestTipPropagation(msg) => match msg {
145 BestTipPropagationChannelMsg::GetNext => {
146 is_enabled(P2pChannelsBestTipAction::RequestReceived { peer_id }.into())
147 }
148 BestTipPropagationChannelMsg::BestTip(best_tip) => {
149 match BlockWithHash::try_new(best_tip) {
150 Ok(best_tip) => is_enabled(
151 P2pChannelsBestTipAction::Received { peer_id, best_tip }.into(),
152 ),
153 Err(_) => {
154 error!(meta.time(); "BestTipPropagationChannelMsg::BestTip: Invalid bigint in block");
155 false
156 }
157 }
158 }
159 },
160 ChannelMsg::TransactionPropagation(msg) => match msg {
161 TransactionPropagationChannelMsg::GetNext { limit } => is_enabled(
162 P2pChannelsTransactionAction::RequestReceived { peer_id, limit }.into(),
163 ),
164 TransactionPropagationChannelMsg::WillSend { count } => is_enabled(
165 P2pChannelsTransactionAction::PromiseReceived {
166 peer_id,
167 promised_count: count,
168 }
169 .into(),
170 ),
171 TransactionPropagationChannelMsg::Transaction(transaction) => is_enabled(
172 P2pChannelsTransactionAction::Received {
173 peer_id,
174 transaction: Box::new(transaction),
175 }
176 .into(),
177 ),
178 },
179 ChannelMsg::SnarkPropagation(msg) => match msg {
180 SnarkPropagationChannelMsg::GetNext { limit } => {
181 is_enabled(P2pChannelsSnarkAction::RequestReceived { peer_id, limit }.into())
182 }
183 SnarkPropagationChannelMsg::WillSend { count } => is_enabled(
184 P2pChannelsSnarkAction::PromiseReceived {
185 peer_id,
186 promised_count: count,
187 }
188 .into(),
189 ),
190 SnarkPropagationChannelMsg::Snark(snark) => is_enabled(
191 P2pChannelsSnarkAction::Received {
192 peer_id,
193 snark: Box::new(snark),
194 }
195 .into(),
196 ),
197 },
198 ChannelMsg::SnarkJobCommitmentPropagation(msg) => match msg {
199 SnarkJobCommitmentPropagationChannelMsg::GetNext { limit } => is_enabled(
200 P2pChannelsSnarkJobCommitmentAction::RequestReceived { peer_id, limit }.into(),
201 ),
202 SnarkJobCommitmentPropagationChannelMsg::WillSend { count } => is_enabled(
203 P2pChannelsSnarkJobCommitmentAction::PromiseReceived {
204 peer_id,
205 promised_count: count,
206 }
207 .into(),
208 ),
209 SnarkJobCommitmentPropagationChannelMsg::Commitment(commitment) => is_enabled(
210 P2pChannelsSnarkJobCommitmentAction::Received {
211 peer_id,
212 commitment: Box::new(commitment),
213 }
214 .into(),
215 ),
216 },
217 ChannelMsg::Rpc(msg) => match msg {
218 RpcChannelMsg::Request(id, request) => is_enabled(
219 P2pChannelsRpcAction::RequestReceived {
220 peer_id,
221 id,
222 request: Box::new(request),
223 }
224 .into(),
225 ),
226 RpcChannelMsg::Response(id, response) => is_enabled(
227 P2pChannelsRpcAction::ResponseReceived {
228 peer_id,
229 id,
230 response: response.map(Box::new),
231 }
232 .into(),
233 ),
234 },
235 ChannelMsg::StreamingRpc(msg) => match msg {
236 StreamingRpcChannelMsg::Next(id) => is_enabled(
237 P2pChannelsStreamingRpcAction::ResponsePartNextSend { peer_id, id }.into(),
238 ),
239 StreamingRpcChannelMsg::Request(id, request) => is_enabled(
240 P2pChannelsStreamingRpcAction::RequestReceived {
241 peer_id,
242 id,
243 request: Box::new(request),
244 }
245 .into(),
246 ),
247 StreamingRpcChannelMsg::Response(id, response) => match response {
248 None => is_enabled(
249 P2pChannelsStreamingRpcAction::ResponseReceived {
250 peer_id,
251 id,
252 response: None,
253 }
254 .into(),
255 ),
256 Some(response) => is_enabled(
257 P2pChannelsStreamingRpcAction::ResponsePartReceived {
258 peer_id,
259 id,
260 response,
261 }
262 .into(),
263 ),
264 },
265 },
266 };
267
268 if !was_expected {
269 let reason = P2pDisconnectionReason::P2pChannelMsgUnexpected(chain_id);
271 dispatcher.push(P2pDisconnectionAction::Init { peer_id, reason });
272 }
273
274 Ok(())
275 }
276}