p2p/channels/transaction/
p2p_channels_transaction_reducer.rs

1use super::{
2    P2pChannelsTransactionAction, P2pChannelsTransactionState, TransactionPropagationChannelMsg,
3    TransactionPropagationState,
4};
5use crate::{
6    channels::{ChannelId, MsgId, P2pChannelsEffectfulAction},
7    P2pNetworkPubsubAction, P2pState,
8};
9use mina_p2p_messages::{gossip::GossipNetMessageV2, v2};
10use openmina_core::{bug_condition, transaction::TransactionWithHash, Substate};
11use redux::ActionWithMeta;
12
13impl P2pChannelsTransactionState {
14    pub fn reducer<Action, State>(
15        mut state_context: Substate<Action, State, P2pState>,
16        action: ActionWithMeta<P2pChannelsTransactionAction>,
17    ) -> Result<(), String>
18    where
19        State: crate::P2pStateTrait,
20        Action: crate::P2pActionTrait<State>,
21    {
22        let (action, meta) = action.split();
23        let p2p_state = state_context.get_substate_mut()?;
24
25        let transaction_state = action
26            .peer_id()
27            .and_then(|peer_id| p2p_state.get_ready_peer_mut(peer_id))
28            .map(|peer_state| &mut peer_state.channels.transaction)
29            .ok_or_else(|| format!("Invalid state for: {action:?}"));
30
31        match action {
32            P2pChannelsTransactionAction::Init { peer_id } => {
33                let state = transaction_state.inspect_err(|error| bug_condition!("{}", error))?;
34                *state = Self::Init { time: meta.time() };
35
36                let dispatcher = state_context.into_dispatcher();
37                dispatcher.push(P2pChannelsEffectfulAction::InitChannel {
38                    peer_id,
39                    id: ChannelId::TransactionPropagation,
40                    on_success: redux::callback!(
41                        on_transaction_channel_init(peer_id: crate::PeerId) -> crate::P2pAction {
42                            P2pChannelsTransactionAction::Pending { peer_id }
43                        }
44                    ),
45                });
46                Ok(())
47            }
48            P2pChannelsTransactionAction::Pending { .. } => {
49                let state = transaction_state.inspect_err(|error| bug_condition!("{}", error))?;
50                *state = Self::Pending { time: meta.time() };
51                Ok(())
52            }
53            P2pChannelsTransactionAction::Ready { .. } => {
54                let state = transaction_state.inspect_err(|error| bug_condition!("{}", error))?;
55                *state = Self::Ready {
56                    time: meta.time(),
57                    local: TransactionPropagationState::WaitingForRequest { time: meta.time() },
58                    remote: TransactionPropagationState::WaitingForRequest { time: meta.time() },
59                    next_send_index: 0,
60                };
61                Ok(())
62            }
63            P2pChannelsTransactionAction::RequestSend { limit, peer_id, .. } => {
64                let state = transaction_state.inspect_err(|error| bug_condition!("{}", error))?;
65                let Self::Ready { local, .. } = state else {
66                    bug_condition!(
67                    "Invalid state for `P2pChannelsTransactionAction::RequestSend `, state: {:?}",
68                    state
69                );
70                    return Ok(());
71                };
72                *local = TransactionPropagationState::Requested {
73                    time: meta.time(),
74                    requested_limit: limit,
75                };
76
77                let dispatcher = state_context.into_dispatcher();
78                dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
79                    peer_id,
80                    msg_id: MsgId::first(),
81                    msg: TransactionPropagationChannelMsg::GetNext { limit }.into(),
82                });
83                Ok(())
84            }
85            P2pChannelsTransactionAction::PromiseReceived { promised_count, .. } => {
86                let state = transaction_state.inspect_err(|error| bug_condition!("{}", error))?;
87                let Self::Ready { local, .. } = state else {
88                    bug_condition!(
89                    "Invalid state for `P2pChannelsTransactionAction::PromiseReceived `, state: {:?}",
90                    state
91                );
92                    return Ok(());
93                };
94                let TransactionPropagationState::Requested {
95                    requested_limit, ..
96                } = &local
97                else {
98                    bug_condition!(
99                    "Invalid state for `P2pChannelsTransactionAction::PromiseReceived `, state: {:?}",
100                    state
101                );
102                    return Ok(());
103                };
104                *local = TransactionPropagationState::Responding {
105                    time: meta.time(),
106                    requested_limit: *requested_limit,
107                    promised_count,
108                    current_count: 0,
109                };
110                Ok(())
111            }
112            P2pChannelsTransactionAction::Received {
113                peer_id,
114                transaction,
115            } => {
116                let state = transaction_state.inspect_err(|error| bug_condition!("{}", error))?;
117                let Self::Ready { local, .. } = state else {
118                    bug_condition!(
119                        "Invalid state for `P2pChannelsTransactionAction::Received `, state: {:?}",
120                        state
121                    );
122                    return Ok(());
123                };
124                let TransactionPropagationState::Responding {
125                    promised_count,
126                    current_count,
127                    ..
128                } = local
129                else {
130                    return Ok(());
131                };
132
133                *current_count += 1;
134
135                if current_count >= promised_count {
136                    *local = TransactionPropagationState::Responded {
137                        time: meta.time(),
138                        count: *current_count,
139                    };
140                }
141
142                let (dispatcher, state) = state_context.into_dispatcher_and_state();
143                let p2p_state: &P2pState = state.substate()?;
144
145                if let Some(callback) = &p2p_state.callbacks.on_p2p_channels_transaction_received {
146                    dispatcher.push_callback(callback.clone(), (peer_id, transaction));
147                }
148
149                Ok(())
150            }
151            P2pChannelsTransactionAction::RequestReceived { limit, .. } => {
152                let state = transaction_state.inspect_err(|error| bug_condition!("{}", error))?;
153                let Self::Ready { remote, .. } = state else {
154                    bug_condition!(
155                    "Invalid state for `P2pChannelsTransactionAction::RequestReceived `, state: {:?}",
156                    state
157                );
158                    return Ok(());
159                };
160                *remote = TransactionPropagationState::Requested {
161                    time: meta.time(),
162                    requested_limit: limit,
163                };
164                Ok(())
165            }
166            P2pChannelsTransactionAction::ResponseSend {
167                transactions,
168                last_index,
169                peer_id,
170                ..
171            } => {
172                let state = transaction_state.inspect_err(|error| bug_condition!("{}", error))?;
173                let Self::Ready {
174                    remote,
175                    next_send_index,
176                    ..
177                } = state
178                else {
179                    bug_condition!(
180                    "Invalid state for `P2pChannelsTransactionAction::ResponseSend `, state: {:?}",
181                    state
182                );
183                    return Ok(());
184                };
185                *next_send_index = last_index + 1;
186
187                let count = transactions.len() as u8;
188                if count == 0 {
189                    return Ok(());
190                }
191
192                *remote = TransactionPropagationState::Responded {
193                    time: meta.time(),
194                    count,
195                };
196
197                let dispatcher = state_context.into_dispatcher();
198                let msg = TransactionPropagationChannelMsg::WillSend { count }.into();
199                dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
200                    peer_id,
201                    msg_id: MsgId::first(),
202                    msg,
203                });
204
205                for tx in transactions {
206                    let msg = TransactionPropagationChannelMsg::Transaction(tx).into();
207                    dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
208                        peer_id,
209                        msg_id: MsgId::first(),
210                        msg,
211                    });
212                }
213                Ok(())
214            }
215            P2pChannelsTransactionAction::Libp2pReceived {
216                transactions,
217                message_id,
218                peer_id,
219                ..
220            } => {
221                let (dispatcher, state) = state_context.into_dispatcher_and_state();
222                let p2p_state: &P2pState = state.substate()?;
223
224                if let Some(callback) = &p2p_state
225                    .callbacks
226                    .on_p2p_channels_transactions_libp2p_received
227                {
228                    let transactions = transactions
229                        .into_iter()
230                        .map(TransactionWithHash::try_new)
231                        .filter_map(Result::ok)
232                        .collect();
233
234                    dispatcher.push_callback(callback.clone(), (peer_id, transactions, message_id));
235                }
236
237                Ok(())
238            }
239            #[cfg(not(feature = "p2p-libp2p"))]
240            P2pChannelsTransactionAction::Libp2pBroadcast { .. } => Ok(()),
241            #[cfg(feature = "p2p-libp2p")]
242            P2pChannelsTransactionAction::Libp2pBroadcast {
243                transaction,
244                nonce,
245                is_local,
246            } => {
247                let dispatcher = state_context.into_dispatcher();
248                let message = v2::NetworkPoolTransactionPoolDiffVersionedStableV2(
249                    std::iter::once(*transaction).collect(),
250                );
251                let nonce = nonce.into();
252                let message = GossipNetMessageV2::TransactionPoolDiff { message, nonce };
253                if is_local {
254                    dispatcher.push(P2pNetworkPubsubAction::Broadcast { message });
255                } else {
256                    // rebroadcast block if received from webrtc network, otherwise noop.
257                    dispatcher.push(P2pNetworkPubsubAction::WebRtcRebroadcast { message });
258                }
259                Ok(())
260            }
261        }
262    }
263}