p2p/channels/transaction/
p2p_channels_transaction_reducer.rs1use super::{
2 P2pChannelsTransactionAction, P2pChannelsTransactionState, TransactionPropagationChannelMsg,
3 TransactionPropagationState,
4};
5use crate::{
6 channels::{ChannelId, MsgId, P2pChannelsEffectfulAction},
7 P2pNetworkPubsubAction, P2pState,
8};
9use mina_p2p_messages::{gossip::GossipNetMessageV2, v2};
10use openmina_core::{bug_condition, transaction::TransactionWithHash, Substate};
11use redux::ActionWithMeta;
12
13impl P2pChannelsTransactionState {
14 pub fn reducer<Action, State>(
15 mut state_context: Substate<Action, State, P2pState>,
16 action: ActionWithMeta<P2pChannelsTransactionAction>,
17 ) -> Result<(), String>
18 where
19 State: crate::P2pStateTrait,
20 Action: crate::P2pActionTrait<State>,
21 {
22 let (action, meta) = action.split();
23 let p2p_state = state_context.get_substate_mut()?;
24
25 let transaction_state = action
26 .peer_id()
27 .and_then(|peer_id| p2p_state.get_ready_peer_mut(peer_id))
28 .map(|peer_state| &mut peer_state.channels.transaction)
29 .ok_or_else(|| format!("Invalid state for: {action:?}"));
30
31 match action {
32 P2pChannelsTransactionAction::Init { peer_id } => {
33 let state = transaction_state.inspect_err(|error| bug_condition!("{}", error))?;
34 *state = Self::Init { time: meta.time() };
35
36 let dispatcher = state_context.into_dispatcher();
37 dispatcher.push(P2pChannelsEffectfulAction::InitChannel {
38 peer_id,
39 id: ChannelId::TransactionPropagation,
40 on_success: redux::callback!(
41 on_transaction_channel_init(peer_id: crate::PeerId) -> crate::P2pAction {
42 P2pChannelsTransactionAction::Pending { peer_id }
43 }
44 ),
45 });
46 Ok(())
47 }
48 P2pChannelsTransactionAction::Pending { .. } => {
49 let state = transaction_state.inspect_err(|error| bug_condition!("{}", error))?;
50 *state = Self::Pending { time: meta.time() };
51 Ok(())
52 }
53 P2pChannelsTransactionAction::Ready { .. } => {
54 let state = transaction_state.inspect_err(|error| bug_condition!("{}", error))?;
55 *state = Self::Ready {
56 time: meta.time(),
57 local: TransactionPropagationState::WaitingForRequest { time: meta.time() },
58 remote: TransactionPropagationState::WaitingForRequest { time: meta.time() },
59 next_send_index: 0,
60 };
61 Ok(())
62 }
63 P2pChannelsTransactionAction::RequestSend { limit, peer_id, .. } => {
64 let state = transaction_state.inspect_err(|error| bug_condition!("{}", error))?;
65 let Self::Ready { local, .. } = state else {
66 bug_condition!(
67 "Invalid state for `P2pChannelsTransactionAction::RequestSend `, state: {:?}",
68 state
69 );
70 return Ok(());
71 };
72 *local = TransactionPropagationState::Requested {
73 time: meta.time(),
74 requested_limit: limit,
75 };
76
77 let dispatcher = state_context.into_dispatcher();
78 dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
79 peer_id,
80 msg_id: MsgId::first(),
81 msg: TransactionPropagationChannelMsg::GetNext { limit }.into(),
82 });
83 Ok(())
84 }
85 P2pChannelsTransactionAction::PromiseReceived { promised_count, .. } => {
86 let state = transaction_state.inspect_err(|error| bug_condition!("{}", error))?;
87 let Self::Ready { local, .. } = state else {
88 bug_condition!(
89 "Invalid state for `P2pChannelsTransactionAction::PromiseReceived `, state: {:?}",
90 state
91 );
92 return Ok(());
93 };
94 let TransactionPropagationState::Requested {
95 requested_limit, ..
96 } = &local
97 else {
98 bug_condition!(
99 "Invalid state for `P2pChannelsTransactionAction::PromiseReceived `, state: {:?}",
100 state
101 );
102 return Ok(());
103 };
104 *local = TransactionPropagationState::Responding {
105 time: meta.time(),
106 requested_limit: *requested_limit,
107 promised_count,
108 current_count: 0,
109 };
110 Ok(())
111 }
112 P2pChannelsTransactionAction::Received {
113 peer_id,
114 transaction,
115 } => {
116 let state = transaction_state.inspect_err(|error| bug_condition!("{}", error))?;
117 let Self::Ready { local, .. } = state else {
118 bug_condition!(
119 "Invalid state for `P2pChannelsTransactionAction::Received `, state: {:?}",
120 state
121 );
122 return Ok(());
123 };
124 let TransactionPropagationState::Responding {
125 promised_count,
126 current_count,
127 ..
128 } = local
129 else {
130 return Ok(());
131 };
132
133 *current_count += 1;
134
135 if current_count >= promised_count {
136 *local = TransactionPropagationState::Responded {
137 time: meta.time(),
138 count: *current_count,
139 };
140 }
141
142 let (dispatcher, state) = state_context.into_dispatcher_and_state();
143 let p2p_state: &P2pState = state.substate()?;
144
145 if let Some(callback) = &p2p_state.callbacks.on_p2p_channels_transaction_received {
146 dispatcher.push_callback(callback.clone(), (peer_id, transaction));
147 }
148
149 Ok(())
150 }
151 P2pChannelsTransactionAction::RequestReceived { limit, .. } => {
152 let state = transaction_state.inspect_err(|error| bug_condition!("{}", error))?;
153 let Self::Ready { remote, .. } = state else {
154 bug_condition!(
155 "Invalid state for `P2pChannelsTransactionAction::RequestReceived `, state: {:?}",
156 state
157 );
158 return Ok(());
159 };
160 *remote = TransactionPropagationState::Requested {
161 time: meta.time(),
162 requested_limit: limit,
163 };
164 Ok(())
165 }
166 P2pChannelsTransactionAction::ResponseSend {
167 transactions,
168 last_index,
169 peer_id,
170 ..
171 } => {
172 let state = transaction_state.inspect_err(|error| bug_condition!("{}", error))?;
173 let Self::Ready {
174 remote,
175 next_send_index,
176 ..
177 } = state
178 else {
179 bug_condition!(
180 "Invalid state for `P2pChannelsTransactionAction::ResponseSend `, state: {:?}",
181 state
182 );
183 return Ok(());
184 };
185 *next_send_index = last_index + 1;
186
187 let count = transactions.len() as u8;
188 if count == 0 {
189 return Ok(());
190 }
191
192 *remote = TransactionPropagationState::Responded {
193 time: meta.time(),
194 count,
195 };
196
197 let dispatcher = state_context.into_dispatcher();
198 let msg = TransactionPropagationChannelMsg::WillSend { count }.into();
199 dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
200 peer_id,
201 msg_id: MsgId::first(),
202 msg,
203 });
204
205 for tx in transactions {
206 let msg = TransactionPropagationChannelMsg::Transaction(tx).into();
207 dispatcher.push(P2pChannelsEffectfulAction::MessageSend {
208 peer_id,
209 msg_id: MsgId::first(),
210 msg,
211 });
212 }
213 Ok(())
214 }
215 P2pChannelsTransactionAction::Libp2pReceived {
216 transactions,
217 message_id,
218 peer_id,
219 ..
220 } => {
221 let (dispatcher, state) = state_context.into_dispatcher_and_state();
222 let p2p_state: &P2pState = state.substate()?;
223
224 if let Some(callback) = &p2p_state
225 .callbacks
226 .on_p2p_channels_transactions_libp2p_received
227 {
228 let transactions = transactions
229 .into_iter()
230 .map(TransactionWithHash::try_new)
231 .filter_map(Result::ok)
232 .collect();
233
234 dispatcher.push_callback(callback.clone(), (peer_id, transactions, message_id));
235 }
236
237 Ok(())
238 }
239 #[cfg(not(feature = "p2p-libp2p"))]
240 P2pChannelsTransactionAction::Libp2pBroadcast { .. } => Ok(()),
241 #[cfg(feature = "p2p-libp2p")]
242 P2pChannelsTransactionAction::Libp2pBroadcast {
243 transaction,
244 nonce,
245 is_local,
246 } => {
247 let dispatcher = state_context.into_dispatcher();
248 let message = v2::NetworkPoolTransactionPoolDiffVersionedStableV2(
249 std::iter::once(*transaction).collect(),
250 );
251 let nonce = nonce.into();
252 let message = GossipNetMessageV2::TransactionPoolDiff { message, nonce };
253 if is_local {
254 dispatcher.push(P2pNetworkPubsubAction::Broadcast { message });
255 } else {
256 dispatcher.push(P2pNetworkPubsubAction::WebRtcRebroadcast { message });
258 }
259 Ok(())
260 }
261 }
262 }
263}