p2p/channels/
p2p_channels_reducer.rs

1use super::{
2    best_tip::{BestTipPropagationChannelMsg, P2pChannelsBestTipAction, P2pChannelsBestTipState},
3    rpc::{P2pChannelsRpcAction, P2pChannelsRpcState, RpcChannelMsg},
4    signaling::{
5        discovery::{
6            P2pChannelsSignalingDiscoveryAction, P2pChannelsSignalingDiscoveryState,
7            SignalingDiscoveryChannelMsg,
8        },
9        exchange::{
10            P2pChannelsSignalingExchangeAction, P2pChannelsSignalingExchangeState,
11            SignalingExchangeChannelMsg,
12        },
13    },
14    snark::{P2pChannelsSnarkAction, P2pChannelsSnarkState, SnarkPropagationChannelMsg},
15    snark_job_commitment::{
16        P2pChannelsSnarkJobCommitmentAction, P2pChannelsSnarkJobCommitmentState,
17        SnarkJobCommitmentPropagationChannelMsg,
18    },
19    streaming_rpc::{
20        P2pChannelsStreamingRpcAction, P2pChannelsStreamingRpcState, StreamingRpcChannelMsg,
21    },
22    transaction::{
23        P2pChannelsTransactionAction, P2pChannelsTransactionState, TransactionPropagationChannelMsg,
24    },
25    ChannelMsg, P2pChannelsAction, P2pChannelsMessageReceivedAction, P2pChannelsState,
26};
27use crate::{
28    disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
29    P2pState,
30};
31use openmina_core::{block::BlockWithHash, error, Substate};
32use redux::{ActionWithMeta, Dispatcher};
33
34impl P2pChannelsState {
35    pub fn reducer<Action, State>(
36        state_context: Substate<Action, State, P2pState>,
37        action: ActionWithMeta<P2pChannelsAction>,
38    ) -> Result<(), String>
39    where
40        State: crate::P2pStateTrait,
41        Action: crate::P2pActionTrait<State>,
42    {
43        let (action, meta) = action.split();
44
45        match action {
46            P2pChannelsAction::MessageReceived(action) => {
47                let (dispatcher, state) = state_context.into_dispatcher_and_state();
48                Self::dispatch_message(meta.with_action(action), dispatcher, state)
49            }
50            P2pChannelsAction::SignalingDiscovery(action) => {
51                P2pChannelsSignalingDiscoveryState::reducer(state_context, meta.with_action(action))
52            }
53            P2pChannelsAction::SignalingExchange(action) => {
54                P2pChannelsSignalingExchangeState::reducer(state_context, meta.with_action(action))
55            }
56            P2pChannelsAction::BestTip(action) => {
57                P2pChannelsBestTipState::reducer(state_context, meta.with_action(action))
58            }
59            P2pChannelsAction::Transaction(action) => {
60                P2pChannelsTransactionState::reducer(state_context, meta.with_action(action))
61            }
62            P2pChannelsAction::Snark(action) => {
63                P2pChannelsSnarkState::reducer(state_context, meta.with_action(action))
64            }
65            P2pChannelsAction::SnarkJobCommitment(action) => {
66                P2pChannelsSnarkJobCommitmentState::reducer(state_context, meta.with_action(action))
67            }
68            P2pChannelsAction::Rpc(action) => {
69                P2pChannelsRpcState::reducer(state_context, meta.with_action(action))
70            }
71            P2pChannelsAction::StreamingRpc(action) => {
72                P2pChannelsStreamingRpcState::reducer(state_context, meta.with_action(action))
73            }
74        }
75    }
76
77    fn dispatch_message<Action, State>(
78        action: ActionWithMeta<P2pChannelsMessageReceivedAction>,
79        dispatcher: &mut Dispatcher<Action, State>,
80        state: &State,
81    ) -> Result<(), String>
82    where
83        State: crate::P2pStateTrait,
84        Action: crate::P2pActionTrait<State>,
85    {
86        let (action, meta) = action.split();
87        let time = meta.time();
88
89        let peer_id = action.peer_id;
90        let chain_id = action.message.channel_id();
91
92        let mut is_enabled = |action: Action| dispatcher.push_if_enabled(action, state, time);
93
94        let was_expected = match *action.message {
95            ChannelMsg::SignalingDiscovery(msg) => match msg {
96                SignalingDiscoveryChannelMsg::GetNext => is_enabled(
97                    P2pChannelsSignalingDiscoveryAction::RequestReceived { peer_id }.into(),
98                ),
99                SignalingDiscoveryChannelMsg::Discover => is_enabled(
100                    P2pChannelsSignalingDiscoveryAction::DiscoveryRequestReceived { peer_id }
101                        .into(),
102                ),
103                SignalingDiscoveryChannelMsg::Discovered { target_public_key } => is_enabled(
104                    P2pChannelsSignalingDiscoveryAction::DiscoveredReceived {
105                        peer_id,
106                        target_public_key,
107                    }
108                    .into(),
109                ),
110                SignalingDiscoveryChannelMsg::DiscoveredReject => is_enabled(
111                    P2pChannelsSignalingDiscoveryAction::DiscoveredRejectReceived { peer_id }
112                        .into(),
113                ),
114                SignalingDiscoveryChannelMsg::DiscoveredAccept(offer) => is_enabled(
115                    P2pChannelsSignalingDiscoveryAction::DiscoveredAcceptReceived {
116                        peer_id,
117                        offer,
118                    }
119                    .into(),
120                ),
121                SignalingDiscoveryChannelMsg::Answer(answer) => is_enabled(
122                    P2pChannelsSignalingDiscoveryAction::AnswerReceived { peer_id, answer }.into(),
123                ),
124            },
125            ChannelMsg::SignalingExchange(msg) => match msg {
126                SignalingExchangeChannelMsg::GetNext => is_enabled(
127                    P2pChannelsSignalingExchangeAction::RequestReceived { peer_id }.into(),
128                ),
129                SignalingExchangeChannelMsg::OfferToYou {
130                    offerer_pub_key,
131                    offer,
132                } => is_enabled(
133                    P2pChannelsSignalingExchangeAction::OfferReceived {
134                        peer_id,
135                        offerer_pub_key,
136                        offer,
137                    }
138                    .into(),
139                ),
140                SignalingExchangeChannelMsg::Answer(answer) => is_enabled(
141                    P2pChannelsSignalingExchangeAction::AnswerReceived { peer_id, answer }.into(),
142                ),
143            },
144            ChannelMsg::BestTipPropagation(msg) => match msg {
145                BestTipPropagationChannelMsg::GetNext => {
146                    is_enabled(P2pChannelsBestTipAction::RequestReceived { peer_id }.into())
147                }
148                BestTipPropagationChannelMsg::BestTip(best_tip) => {
149                    match BlockWithHash::try_new(best_tip) {
150                        Ok(best_tip) => is_enabled(
151                            P2pChannelsBestTipAction::Received { peer_id, best_tip }.into(),
152                        ),
153                        Err(_) => {
154                            error!(meta.time(); "BestTipPropagationChannelMsg::BestTip: Invalid bigint in block");
155                            false
156                        }
157                    }
158                }
159            },
160            ChannelMsg::TransactionPropagation(msg) => match msg {
161                TransactionPropagationChannelMsg::GetNext { limit } => is_enabled(
162                    P2pChannelsTransactionAction::RequestReceived { peer_id, limit }.into(),
163                ),
164                TransactionPropagationChannelMsg::WillSend { count } => is_enabled(
165                    P2pChannelsTransactionAction::PromiseReceived {
166                        peer_id,
167                        promised_count: count,
168                    }
169                    .into(),
170                ),
171                TransactionPropagationChannelMsg::Transaction(transaction) => is_enabled(
172                    P2pChannelsTransactionAction::Received {
173                        peer_id,
174                        transaction: Box::new(transaction),
175                    }
176                    .into(),
177                ),
178            },
179            ChannelMsg::SnarkPropagation(msg) => match msg {
180                SnarkPropagationChannelMsg::GetNext { limit } => {
181                    is_enabled(P2pChannelsSnarkAction::RequestReceived { peer_id, limit }.into())
182                }
183                SnarkPropagationChannelMsg::WillSend { count } => is_enabled(
184                    P2pChannelsSnarkAction::PromiseReceived {
185                        peer_id,
186                        promised_count: count,
187                    }
188                    .into(),
189                ),
190                SnarkPropagationChannelMsg::Snark(snark) => is_enabled(
191                    P2pChannelsSnarkAction::Received {
192                        peer_id,
193                        snark: Box::new(snark),
194                    }
195                    .into(),
196                ),
197            },
198            ChannelMsg::SnarkJobCommitmentPropagation(msg) => match msg {
199                SnarkJobCommitmentPropagationChannelMsg::GetNext { limit } => is_enabled(
200                    P2pChannelsSnarkJobCommitmentAction::RequestReceived { peer_id, limit }.into(),
201                ),
202                SnarkJobCommitmentPropagationChannelMsg::WillSend { count } => is_enabled(
203                    P2pChannelsSnarkJobCommitmentAction::PromiseReceived {
204                        peer_id,
205                        promised_count: count,
206                    }
207                    .into(),
208                ),
209                SnarkJobCommitmentPropagationChannelMsg::Commitment(commitment) => is_enabled(
210                    P2pChannelsSnarkJobCommitmentAction::Received {
211                        peer_id,
212                        commitment: Box::new(commitment),
213                    }
214                    .into(),
215                ),
216            },
217            ChannelMsg::Rpc(msg) => match msg {
218                RpcChannelMsg::Request(id, request) => is_enabled(
219                    P2pChannelsRpcAction::RequestReceived {
220                        peer_id,
221                        id,
222                        request: Box::new(request),
223                    }
224                    .into(),
225                ),
226                RpcChannelMsg::Response(id, response) => is_enabled(
227                    P2pChannelsRpcAction::ResponseReceived {
228                        peer_id,
229                        id,
230                        response: response.map(Box::new),
231                    }
232                    .into(),
233                ),
234            },
235            ChannelMsg::StreamingRpc(msg) => match msg {
236                StreamingRpcChannelMsg::Next(id) => is_enabled(
237                    P2pChannelsStreamingRpcAction::ResponsePartNextSend { peer_id, id }.into(),
238                ),
239                StreamingRpcChannelMsg::Request(id, request) => is_enabled(
240                    P2pChannelsStreamingRpcAction::RequestReceived {
241                        peer_id,
242                        id,
243                        request: Box::new(request),
244                    }
245                    .into(),
246                ),
247                StreamingRpcChannelMsg::Response(id, response) => match response {
248                    None => is_enabled(
249                        P2pChannelsStreamingRpcAction::ResponseReceived {
250                            peer_id,
251                            id,
252                            response: None,
253                        }
254                        .into(),
255                    ),
256                    Some(response) => is_enabled(
257                        P2pChannelsStreamingRpcAction::ResponsePartReceived {
258                            peer_id,
259                            id,
260                            response,
261                        }
262                        .into(),
263                    ),
264                },
265            },
266        };
267
268        if !was_expected {
269            // dbg!(&action.message);
270            let reason = P2pDisconnectionReason::P2pChannelMsgUnexpected(chain_id);
271            dispatcher.push(P2pDisconnectionAction::Init { peer_id, reason });
272        }
273
274        Ok(())
275    }
276}