1use ledger::{
2 scan_state::transaction_logic::{valid, GenericCommand, UserCommand},
3 transaction_pool::{
4 diff::{self, DiffVerified},
5 transaction_hash, ApplyDecision, TransactionPoolErrors,
6 },
7 Account, AccountId,
8};
9use openmina_core::{
10 bug_condition,
11 constants::constraint_constants,
12 transaction::{Transaction, TransactionPoolMessageSource, TransactionWithHash},
13};
14use p2p::{
15 channels::transaction::P2pChannelsTransactionAction, BroadcastMessageId, P2pNetworkPubsubAction,
16};
17use redux::callback;
18use snark::user_command_verify::{SnarkUserCommandVerifyAction, SnarkUserCommandVerifyId};
19use std::collections::{BTreeMap, BTreeSet};
20
21use crate::{BlockProducerAction, RpcAction};
22
23use super::{
24 PendingId, TransactionPoolAction, TransactionPoolActionWithMetaRef,
25 TransactionPoolEffectfulAction, TransactionPoolState, TransactionState,
26};
27
28impl TransactionPoolState {
29 pub fn reducer(mut state: crate::Substate<Self>, action: TransactionPoolActionWithMetaRef<'_>) {
30 let substate = state.get_substate_mut().unwrap();
34 if let Some(file) = substate.file.as_mut() {
35 postcard::to_io(&action, file).unwrap();
36 };
37
38 Self::handle_action(state, action)
39 }
40
41 pub(super) fn handle_action(
42 mut state: crate::Substate<Self>,
43 action: TransactionPoolActionWithMetaRef<'_>,
44 ) {
45 let (action, meta) = action.split();
46 let Some((global_slot, global_slot_from_genesis)) =
47 Self::global_slots(state.unsafe_get_state())
49 else {
50 return;
51 };
52 let substate = state.get_substate_mut().unwrap();
53
54 match action {
55 TransactionPoolAction::Candidate(a) => {
56 super::candidate::TransactionPoolCandidatesState::reducer(
57 openmina_core::Substate::from_compatible_substate(state),
58 meta.with_action(a),
59 );
60 }
61 TransactionPoolAction::StartVerify {
62 commands,
63 from_source,
64 } => {
65 let Ok(commands) = commands
66 .iter()
67 .map(TransactionWithHash::body)
68 .map(UserCommand::try_from)
69 .collect::<Result<Vec<_>, _>>()
70 else {
71 return;
73 };
74
75 let account_ids = commands
76 .iter()
77 .flat_map(UserCommand::accounts_referenced)
78 .collect::<BTreeSet<_>>();
79 let best_tip_hash = substate.best_tip_hash.clone().unwrap();
80 let pending_id = substate.make_action_pending(action);
81
82 let dispatcher = state.into_dispatcher();
83 dispatcher.push(TransactionPoolEffectfulAction::FetchAccounts {
84 account_ids,
85 ledger_hash: best_tip_hash.clone(),
86 on_result: callback!(fetch_to_verify((accounts: BTreeMap<AccountId, Account>, id: Option<PendingId>, from_source: TransactionPoolMessageSource)) -> crate::Action {
87 TransactionPoolAction::StartVerifyWithAccounts { accounts, pending_id: id.unwrap(), from_source }
88 }),
89 pending_id: Some(pending_id),
90 from_source: *from_source,
91 });
92 }
93 TransactionPoolAction::StartVerifyWithAccounts {
94 accounts,
95 pending_id,
96 from_source,
97 } => {
98 let TransactionPoolAction::StartVerify { commands, .. } =
99 substate.pending_actions.remove(pending_id).unwrap()
100 else {
101 panic!()
102 };
103
104 let Ok(commands) = commands
106 .iter()
107 .map(TransactionWithHash::body)
108 .map(UserCommand::try_from)
109 .collect::<Result<Vec<_>, _>>()
110 else {
111 return;
112 };
113 let diff = diff::Diff { list: commands };
114
115 match substate
116 .pool
117 .prevalidate(diff)
118 .and_then(|diff| substate.pool.convert_diff_to_verifiable(diff, accounts))
119 {
120 Ok(verifiable) => {
121 let (dispatcher, global_state) = state.into_dispatcher_and_state();
122 let req_id = global_state.snark.user_command_verify.next_req_id();
123
124 dispatcher.push(SnarkUserCommandVerifyAction::Init {
125 req_id,
126 commands: verifiable,
127 from_source: *from_source,
128 on_success: callback!(
129 on_snark_user_command_verify_success(
130 (req_id: SnarkUserCommandVerifyId, valids: Vec<valid::UserCommand>, from_source: TransactionPoolMessageSource)
131 ) -> crate::Action {
132 TransactionPoolAction::VerifySuccess {
133 valids,
134 from_source,
135 }
136 }
137 ),
138 on_error: callback!(
139 on_snark_user_command_verify_error(
140 (req_id: SnarkUserCommandVerifyId, errors: Vec<String>)
141 ) -> crate::Action {
142 TransactionPoolAction::VerifyError { errors }
143 }
144 )
145 });
146 }
147 Err(e) => {
148 let dispatch_errors = |errors: Vec<String>| {
149 let dispatcher = state.into_dispatcher();
150 dispatcher.push(TransactionPoolAction::VerifyError {
151 errors: errors.clone(),
152 });
153
154 match from_source {
155 TransactionPoolMessageSource::Rpc { id } => {
156 dispatcher.push(RpcAction::TransactionInjectFailure {
157 rpc_id: *id,
158 errors,
159 });
160 }
161 TransactionPoolMessageSource::Pubsub { id } => {
162 dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
163 message_id: Some(BroadcastMessageId::MessageId {
164 message_id: *id,
165 }),
166 peer_id: None,
167 reason: "Transaction diff rejected".to_owned(),
168 });
169 }
170 TransactionPoolMessageSource::None => {}
171 }
172 };
173 match e {
174 TransactionPoolErrors::BatchedErrors(errors) => {
175 let errors: Vec<_> =
176 errors.into_iter().map(|e| e.to_string()).collect();
177 dispatch_errors(errors);
178 }
179 TransactionPoolErrors::LoadingVK(error) => dispatch_errors(vec![error]),
180 TransactionPoolErrors::Unexpected(es) => {
181 panic!("{es}")
182 }
183 }
184 }
185 }
186 }
187 TransactionPoolAction::VerifySuccess {
188 valids,
189 from_source,
190 } => {
191 let valids = valids
192 .iter()
193 .cloned()
194 .map(transaction_hash::hash_command)
195 .collect::<Vec<_>>();
196 let best_tip_hash = substate.best_tip_hash.clone().unwrap();
197 let diff = DiffVerified { list: valids };
198
199 let dispatcher = state.into_dispatcher();
200 dispatcher.push(TransactionPoolAction::ApplyVerifiedDiff {
201 best_tip_hash,
202 diff,
203 from_source: *from_source,
204 });
205 }
206 TransactionPoolAction::VerifyError { .. } => {
207 }
209 TransactionPoolAction::BestTipChanged { best_tip_hash } => {
210 let account_ids = substate.pool.get_accounts_to_revalidate_on_new_best_tip();
211 substate.best_tip_hash = Some(best_tip_hash.clone());
212
213 let dispatcher = state.into_dispatcher();
214 dispatcher.push(TransactionPoolEffectfulAction::FetchAccounts {
215 account_ids,
216 ledger_hash: best_tip_hash.clone(),
217 on_result: callback!(fetch_for_best_tip((accounts: BTreeMap<AccountId, Account>, id: Option<PendingId>, from_source: TransactionPoolMessageSource)) -> crate::Action {
218 TransactionPoolAction::BestTipChangedWithAccounts { accounts }
219 }),
220 pending_id: None,
221 from_source: TransactionPoolMessageSource::None,
222 });
223 }
224 TransactionPoolAction::BestTipChangedWithAccounts { accounts } => {
225 match substate
226 .pool
227 .on_new_best_tip(global_slot_from_genesis, accounts)
228 {
229 Err(e) => bug_condition!("transaction pool::on_new_best_tip failed: {:?}", e),
230 Ok(dropped) => {
231 for tx in dropped {
232 substate.dpool.remove(&tx.hash);
233 }
234 }
235 }
236 }
237 TransactionPoolAction::ApplyVerifiedDiff {
238 best_tip_hash,
239 diff,
240 from_source,
241 } => {
242 let account_ids = substate.pool.get_accounts_to_apply_diff(diff);
243 let pending_id = substate.make_action_pending(action);
244
245 let dispatcher = state.into_dispatcher();
246 dispatcher.push(TransactionPoolEffectfulAction::FetchAccounts {
247 account_ids,
248 ledger_hash: best_tip_hash.clone(),
249 on_result: callback!(fetch_for_apply((accounts: BTreeMap<AccountId, Account>, id: Option<PendingId>, from_rpc: TransactionPoolMessageSource)) -> crate::Action {
250 TransactionPoolAction::ApplyVerifiedDiffWithAccounts {
251 accounts,
252 pending_id: id.unwrap(),
253 }
254 }),
255 pending_id: Some(pending_id),
256 from_source: *from_source,
257 });
258 }
259 TransactionPoolAction::ApplyVerifiedDiffWithAccounts {
260 accounts,
261 pending_id,
262 } => {
263 let TransactionPoolAction::ApplyVerifiedDiff {
264 best_tip_hash: _,
265 diff,
266 from_source,
267 } = substate.pending_actions.remove(pending_id).unwrap()
268 else {
269 panic!()
270 };
271 let is_sender_local = from_source.is_sender_local();
272
273 let (was_accepted, accepted, rejected) = match substate.pool.unsafe_apply(
275 meta.time(),
276 global_slot_from_genesis,
277 global_slot,
278 &diff,
279 accounts,
280 is_sender_local,
281 ) {
282 Ok((ApplyDecision::Accept, accepted, rejected, dropped)) => {
283 for hash in dropped {
284 substate.dpool.remove(&hash);
285 }
286 for tx in &accepted {
287 substate.dpool.insert(TransactionState {
288 time: meta.time(),
289 hash: tx.hash.clone(),
290 });
291 }
292
293 (true, accepted, rejected)
294 }
295 Ok((ApplyDecision::Reject, accepted, rejected, _)) => {
296 (false, accepted, rejected)
297 }
298 Err(e) => {
299 crate::core::warn!(meta.time(); kind = "TransactionPoolUnsafeApplyError", summary = e);
300 return;
301 }
302 };
303
304 let dispatcher = state.into_dispatcher();
305
306 match (was_accepted, from_source) {
308 (true, TransactionPoolMessageSource::Rpc { id }) => {
309 if !rejected.is_empty() {
312 dispatcher.push(RpcAction::TransactionInjectRejected {
313 rpc_id: id,
314 response: rejected.clone(),
315 });
316 } else if !accepted.is_empty() {
317 dispatcher.push(RpcAction::TransactionInjectSuccess {
318 rpc_id: id,
319 response: accepted.clone(),
320 });
321 }
322 }
323 (true, TransactionPoolMessageSource::Pubsub { id }) => {
324 dispatcher.push(P2pNetworkPubsubAction::BroadcastValidatedMessage {
325 message_id: BroadcastMessageId::MessageId { message_id: id },
326 });
327 }
328 (false, TransactionPoolMessageSource::Rpc { id }) => {
329 dispatcher.push(RpcAction::TransactionInjectRejected {
330 rpc_id: id,
331 response: rejected.clone(),
332 });
333 }
334 (false, TransactionPoolMessageSource::Pubsub { id }) => {
335 dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
336 message_id: Some(BroadcastMessageId::MessageId { message_id: id }),
337 peer_id: None,
338 reason: "Rejected transaction diff".to_owned(),
339 });
340 }
341 (_, TransactionPoolMessageSource::None) => {}
342 }
343
344 if was_accepted && !from_source.is_libp2p() {
345 dispatcher.push(TransactionPoolAction::Rebroadcast {
346 accepted,
347 rejected,
348 is_local: is_sender_local,
349 });
350 }
351 }
352 TransactionPoolAction::ApplyTransitionFrontierDiff {
353 best_tip_hash,
354 diff,
355 } => {
356 assert_eq!(substate.best_tip_hash.as_ref().unwrap(), best_tip_hash);
357
358 let (account_ids, uncommitted) =
359 substate.pool.get_accounts_to_handle_transition_diff(diff);
360 let pending_id = substate.make_action_pending(action);
361
362 let dispatcher = state.into_dispatcher();
363 dispatcher.push(TransactionPoolEffectfulAction::FetchAccounts {
364 account_ids: account_ids.union(&uncommitted).cloned().collect(),
365 ledger_hash: best_tip_hash.clone(),
366 on_result: callback!(fetch_for_diff((accounts: BTreeMap<AccountId, Account>, id: Option<PendingId>, from_source: TransactionPoolMessageSource)) -> crate::Action {
367 TransactionPoolAction::ApplyTransitionFrontierDiffWithAccounts {
368 accounts,
369 pending_id: id.unwrap(),
370 }
371 }),
372 pending_id: Some(pending_id),
373 from_source: TransactionPoolMessageSource::None,
374 });
375 }
376 TransactionPoolAction::ApplyTransitionFrontierDiffWithAccounts {
377 accounts,
378 pending_id,
379 } => {
380 let TransactionPoolAction::ApplyTransitionFrontierDiff {
381 best_tip_hash: _,
382 diff,
383 } = substate.pending_actions.remove(pending_id).unwrap()
384 else {
385 panic!()
386 };
387
388 let collect = |set: &BTreeSet<AccountId>| {
389 set.iter()
390 .filter_map(|id| {
391 let account = accounts.get(id).cloned()?;
392 Some((id.clone(), account))
393 })
394 .collect::<BTreeMap<_, _>>()
395 };
396
397 let (account_ids, uncommitted) =
398 substate.pool.get_accounts_to_handle_transition_diff(&diff);
399
400 let in_cmds = collect(&account_ids);
401 let uncommitted = collect(&uncommitted);
402
403 if let Err(e) = substate.pool.handle_transition_frontier_diff(
404 global_slot_from_genesis,
405 global_slot,
406 &diff,
407 &account_ids,
408 &in_cmds,
409 &uncommitted,
410 ) {
411 bug_condition!(
412 "transaction pool::handle_transition_frontier_diff failed: {:?}",
413 e
414 );
415 }
416 }
417 TransactionPoolAction::Rebroadcast {
418 accepted,
419 rejected,
420 is_local,
421 } => {
422 let rejected = rejected.iter().map(|(cmd, _)| cmd.data.forget_check());
423
424 let all_commands = accepted
425 .iter()
426 .map(|cmd| cmd.data.forget_check())
427 .chain(rejected)
428 .collect::<Vec<_>>();
429
430 let dispatcher = state.into_dispatcher();
431
432 for cmd in all_commands {
433 dispatcher.push(P2pChannelsTransactionAction::Libp2pBroadcast {
434 transaction: Box::new((&cmd).into()),
435 nonce: 0,
436 is_local: *is_local,
437 });
438 }
439 }
440 TransactionPoolAction::CollectTransactionsByFee => {
441 let transaction_capacity =
442 2u64.pow(constraint_constants().transaction_capacity_log_2 as u32);
443 let transactions_by_fee = substate
444 .pool
445 .list_includable_transactions(transaction_capacity as usize)
446 .into_iter()
447 .map(|cmd| cmd.data)
448 .collect::<Vec<_>>();
449
450 let dispatcher = state.into_dispatcher();
451
452 dispatcher.push(BlockProducerAction::WonSlotTransactionsSuccess {
453 transactions_by_fee,
454 });
455 }
456 TransactionPoolAction::P2pSendAll => {
457 let (dispatcher, global_state) = state.into_dispatcher_and_state();
458 for peer_id in global_state.p2p.ready_peers() {
459 dispatcher.push(TransactionPoolAction::P2pSend { peer_id });
460 }
461 }
462 TransactionPoolAction::P2pSend { peer_id } => {
463 let peer_id = *peer_id;
464 let (dispatcher, global_state) = state.into_dispatcher_and_state();
465 let Some(peer) = global_state.p2p.get_ready_peer(&peer_id) else {
466 return;
467 };
468
469 let index_and_limit = peer.channels.transaction.next_send_index_and_limit();
471 let (transactions, first_index, last_index) = global_state
472 .transaction_pool
473 .dpool
474 .next_messages_to_send(index_and_limit, |state| {
475 let tx = global_state.transaction_pool.get(&state.hash)?;
476 let tx = tx.clone().forget();
477 Some((&Transaction::from(&tx)).into())
479 });
480
481 dispatcher.push(P2pChannelsTransactionAction::ResponseSend {
482 peer_id,
483 transactions,
484 first_index,
485 last_index,
486 });
487 }
488 }
489 }
490}