node/transaction_pool/
transaction_pool_reducer.rs

1use ledger::{
2    scan_state::transaction_logic::{valid, GenericCommand, UserCommand},
3    transaction_pool::{
4        diff::{self, DiffVerified},
5        transaction_hash, ApplyDecision, TransactionPoolErrors,
6    },
7    Account, AccountId,
8};
9use openmina_core::{
10    bug_condition,
11    constants::constraint_constants,
12    transaction::{Transaction, TransactionPoolMessageSource, TransactionWithHash},
13};
14use p2p::{
15    channels::transaction::P2pChannelsTransactionAction, BroadcastMessageId, P2pNetworkPubsubAction,
16};
17use redux::callback;
18use snark::user_command_verify::{SnarkUserCommandVerifyAction, SnarkUserCommandVerifyId};
19use std::collections::{BTreeMap, BTreeSet};
20
21use crate::{BlockProducerAction, RpcAction};
22
23use super::{
24    PendingId, TransactionPoolAction, TransactionPoolActionWithMetaRef,
25    TransactionPoolEffectfulAction, TransactionPoolState, TransactionState,
26};
27
28impl TransactionPoolState {
29    pub fn reducer(mut state: crate::Substate<Self>, action: TransactionPoolActionWithMetaRef<'_>) {
30        // Uncoment following line to save actions to `/tmp/pool.bin`
31        // Self::save_actions(&mut state);
32
33        let substate = state.get_substate_mut().unwrap();
34        if let Some(file) = substate.file.as_mut() {
35            postcard::to_io(&action, file).unwrap();
36        };
37
38        Self::handle_action(state, action)
39    }
40
41    pub(super) fn handle_action(
42        mut state: crate::Substate<Self>,
43        action: TransactionPoolActionWithMetaRef<'_>,
44    ) {
45        let (action, meta) = action.split();
46        let Some((global_slot, global_slot_from_genesis)) =
47            // TODO: remove usage of `unsafe_get_state`
48            Self::global_slots(state.unsafe_get_state())
49        else {
50            return;
51        };
52        let substate = state.get_substate_mut().unwrap();
53
54        match action {
55            TransactionPoolAction::Candidate(a) => {
56                super::candidate::TransactionPoolCandidatesState::reducer(
57                    openmina_core::Substate::from_compatible_substate(state),
58                    meta.with_action(a),
59                );
60            }
61            TransactionPoolAction::StartVerify {
62                commands,
63                from_source,
64            } => {
65                let Ok(commands) = commands
66                    .iter()
67                    .map(TransactionWithHash::body)
68                    .map(UserCommand::try_from)
69                    .collect::<Result<Vec<_>, _>>()
70                else {
71                    // ignore all commands if one is invalid
72                    return;
73                };
74
75                let account_ids = commands
76                    .iter()
77                    .flat_map(UserCommand::accounts_referenced)
78                    .collect::<BTreeSet<_>>();
79                let best_tip_hash = substate.best_tip_hash.clone().unwrap();
80                let pending_id = substate.make_action_pending(action);
81
82                let dispatcher = state.into_dispatcher();
83                dispatcher.push(TransactionPoolEffectfulAction::FetchAccounts {
84                    account_ids,
85                    ledger_hash: best_tip_hash.clone(),
86                    on_result: callback!(fetch_to_verify((accounts: BTreeMap<AccountId, Account>, id: Option<PendingId>, from_source: TransactionPoolMessageSource)) -> crate::Action {
87                        TransactionPoolAction::StartVerifyWithAccounts { accounts, pending_id: id.unwrap(), from_source }
88                    }),
89                    pending_id: Some(pending_id),
90                    from_source: *from_source,
91                });
92            }
93            TransactionPoolAction::StartVerifyWithAccounts {
94                accounts,
95                pending_id,
96                from_source,
97            } => {
98                let TransactionPoolAction::StartVerify { commands, .. } =
99                    substate.pending_actions.remove(pending_id).unwrap()
100                else {
101                    panic!()
102                };
103
104                // TODO: Convert those commands only once
105                let Ok(commands) = commands
106                    .iter()
107                    .map(TransactionWithHash::body)
108                    .map(UserCommand::try_from)
109                    .collect::<Result<Vec<_>, _>>()
110                else {
111                    return;
112                };
113                let diff = diff::Diff { list: commands };
114
115                match substate
116                    .pool
117                    .prevalidate(diff)
118                    .and_then(|diff| substate.pool.convert_diff_to_verifiable(diff, accounts))
119                {
120                    Ok(verifiable) => {
121                        let (dispatcher, global_state) = state.into_dispatcher_and_state();
122                        let req_id = global_state.snark.user_command_verify.next_req_id();
123
124                        dispatcher.push(SnarkUserCommandVerifyAction::Init {
125                            req_id,
126                            commands: verifiable,
127                            from_source: *from_source,
128                            on_success: callback!(
129                                on_snark_user_command_verify_success(
130                                    (req_id: SnarkUserCommandVerifyId, valids: Vec<valid::UserCommand>, from_source: TransactionPoolMessageSource)
131                                ) -> crate::Action {
132                                    TransactionPoolAction::VerifySuccess {
133                                        valids,
134                                        from_source,
135                                    }
136                                }
137                            ),
138                            on_error: callback!(
139                                on_snark_user_command_verify_error(
140                                    (req_id: SnarkUserCommandVerifyId, errors: Vec<String>)
141                                ) -> crate::Action {
142                                    TransactionPoolAction::VerifyError { errors }
143                                }
144                            )
145                        });
146                    }
147                    Err(e) => {
148                        let dispatch_errors = |errors: Vec<String>| {
149                            let dispatcher = state.into_dispatcher();
150                            dispatcher.push(TransactionPoolAction::VerifyError {
151                                errors: errors.clone(),
152                            });
153
154                            match from_source {
155                                TransactionPoolMessageSource::Rpc { id } => {
156                                    dispatcher.push(RpcAction::TransactionInjectFailure {
157                                        rpc_id: *id,
158                                        errors,
159                                    });
160                                }
161                                TransactionPoolMessageSource::Pubsub { id } => {
162                                    dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
163                                        message_id: Some(BroadcastMessageId::MessageId {
164                                            message_id: *id,
165                                        }),
166                                        peer_id: None,
167                                        reason: "Transaction diff rejected".to_owned(),
168                                    });
169                                }
170                                TransactionPoolMessageSource::None => {}
171                            }
172                        };
173                        match e {
174                            TransactionPoolErrors::BatchedErrors(errors) => {
175                                let errors: Vec<_> =
176                                    errors.into_iter().map(|e| e.to_string()).collect();
177                                dispatch_errors(errors);
178                            }
179                            TransactionPoolErrors::LoadingVK(error) => dispatch_errors(vec![error]),
180                            TransactionPoolErrors::Unexpected(es) => {
181                                panic!("{es}")
182                            }
183                        }
184                    }
185                }
186            }
187            TransactionPoolAction::VerifySuccess {
188                valids,
189                from_source,
190            } => {
191                let valids = valids
192                    .iter()
193                    .cloned()
194                    .map(transaction_hash::hash_command)
195                    .collect::<Vec<_>>();
196                let best_tip_hash = substate.best_tip_hash.clone().unwrap();
197                let diff = DiffVerified { list: valids };
198
199                let dispatcher = state.into_dispatcher();
200                dispatcher.push(TransactionPoolAction::ApplyVerifiedDiff {
201                    best_tip_hash,
202                    diff,
203                    from_source: *from_source,
204                });
205            }
206            TransactionPoolAction::VerifyError { .. } => {
207                // just logging the errors
208            }
209            TransactionPoolAction::BestTipChanged { best_tip_hash } => {
210                let account_ids = substate.pool.get_accounts_to_revalidate_on_new_best_tip();
211                substate.best_tip_hash = Some(best_tip_hash.clone());
212
213                let dispatcher = state.into_dispatcher();
214                dispatcher.push(TransactionPoolEffectfulAction::FetchAccounts {
215                    account_ids,
216                    ledger_hash: best_tip_hash.clone(),
217                    on_result: callback!(fetch_for_best_tip((accounts: BTreeMap<AccountId, Account>, id: Option<PendingId>, from_source: TransactionPoolMessageSource)) -> crate::Action {
218                        TransactionPoolAction::BestTipChangedWithAccounts { accounts }
219                    }),
220                    pending_id: None,
221                    from_source: TransactionPoolMessageSource::None,
222                });
223            }
224            TransactionPoolAction::BestTipChangedWithAccounts { accounts } => {
225                match substate
226                    .pool
227                    .on_new_best_tip(global_slot_from_genesis, accounts)
228                {
229                    Err(e) => bug_condition!("transaction pool::on_new_best_tip failed: {:?}", e),
230                    Ok(dropped) => {
231                        for tx in dropped {
232                            substate.dpool.remove(&tx.hash);
233                        }
234                    }
235                }
236            }
237            TransactionPoolAction::ApplyVerifiedDiff {
238                best_tip_hash,
239                diff,
240                from_source,
241            } => {
242                let account_ids = substate.pool.get_accounts_to_apply_diff(diff);
243                let pending_id = substate.make_action_pending(action);
244
245                let dispatcher = state.into_dispatcher();
246                dispatcher.push(TransactionPoolEffectfulAction::FetchAccounts {
247                    account_ids,
248                    ledger_hash: best_tip_hash.clone(),
249                    on_result: callback!(fetch_for_apply((accounts: BTreeMap<AccountId, Account>, id: Option<PendingId>, from_rpc: TransactionPoolMessageSource)) -> crate::Action {
250                        TransactionPoolAction::ApplyVerifiedDiffWithAccounts {
251                            accounts,
252                            pending_id: id.unwrap(),
253                        }
254                    }),
255                    pending_id: Some(pending_id),
256                    from_source: *from_source,
257                });
258            }
259            TransactionPoolAction::ApplyVerifiedDiffWithAccounts {
260                accounts,
261                pending_id,
262            } => {
263                let TransactionPoolAction::ApplyVerifiedDiff {
264                    best_tip_hash: _,
265                    diff,
266                    from_source,
267                } = substate.pending_actions.remove(pending_id).unwrap()
268                else {
269                    panic!()
270                };
271                let is_sender_local = from_source.is_sender_local();
272
273                // Note(adonagy): Action for rebroadcast, in his action we can use forget_check
274                let (was_accepted, accepted, rejected) = match substate.pool.unsafe_apply(
275                    meta.time(),
276                    global_slot_from_genesis,
277                    global_slot,
278                    &diff,
279                    accounts,
280                    is_sender_local,
281                ) {
282                    Ok((ApplyDecision::Accept, accepted, rejected, dropped)) => {
283                        for hash in dropped {
284                            substate.dpool.remove(&hash);
285                        }
286                        for tx in &accepted {
287                            substate.dpool.insert(TransactionState {
288                                time: meta.time(),
289                                hash: tx.hash.clone(),
290                            });
291                        }
292
293                        (true, accepted, rejected)
294                    }
295                    Ok((ApplyDecision::Reject, accepted, rejected, _)) => {
296                        (false, accepted, rejected)
297                    }
298                    Err(e) => {
299                        crate::core::warn!(meta.time(); kind = "TransactionPoolUnsafeApplyError", summary = e);
300                        return;
301                    }
302                };
303
304                let dispatcher = state.into_dispatcher();
305
306                // TODO: use callbacks
307                match (was_accepted, from_source) {
308                    (true, TransactionPoolMessageSource::Rpc { id }) => {
309                        // Note: even though the diff was labeled as accepted the specific tx could be rejected
310                        //       (if it is not grounds for diff rejection)
311                        if !rejected.is_empty() {
312                            dispatcher.push(RpcAction::TransactionInjectRejected {
313                                rpc_id: id,
314                                response: rejected.clone(),
315                            });
316                        } else if !accepted.is_empty() {
317                            dispatcher.push(RpcAction::TransactionInjectSuccess {
318                                rpc_id: id,
319                                response: accepted.clone(),
320                            });
321                        }
322                    }
323                    (true, TransactionPoolMessageSource::Pubsub { id }) => {
324                        dispatcher.push(P2pNetworkPubsubAction::BroadcastValidatedMessage {
325                            message_id: BroadcastMessageId::MessageId { message_id: id },
326                        });
327                    }
328                    (false, TransactionPoolMessageSource::Rpc { id }) => {
329                        dispatcher.push(RpcAction::TransactionInjectRejected {
330                            rpc_id: id,
331                            response: rejected.clone(),
332                        });
333                    }
334                    (false, TransactionPoolMessageSource::Pubsub { id }) => {
335                        dispatcher.push(P2pNetworkPubsubAction::RejectMessage {
336                            message_id: Some(BroadcastMessageId::MessageId { message_id: id }),
337                            peer_id: None,
338                            reason: "Rejected transaction diff".to_owned(),
339                        });
340                    }
341                    (_, TransactionPoolMessageSource::None) => {}
342                }
343
344                if was_accepted && !from_source.is_libp2p() {
345                    dispatcher.push(TransactionPoolAction::Rebroadcast {
346                        accepted,
347                        rejected,
348                        is_local: is_sender_local,
349                    });
350                }
351            }
352            TransactionPoolAction::ApplyTransitionFrontierDiff {
353                best_tip_hash,
354                diff,
355            } => {
356                assert_eq!(substate.best_tip_hash.as_ref().unwrap(), best_tip_hash);
357
358                let (account_ids, uncommitted) =
359                    substate.pool.get_accounts_to_handle_transition_diff(diff);
360                let pending_id = substate.make_action_pending(action);
361
362                let dispatcher = state.into_dispatcher();
363                dispatcher.push(TransactionPoolEffectfulAction::FetchAccounts {
364                    account_ids: account_ids.union(&uncommitted).cloned().collect(),
365                    ledger_hash: best_tip_hash.clone(),
366                    on_result: callback!(fetch_for_diff((accounts: BTreeMap<AccountId, Account>, id: Option<PendingId>, from_source: TransactionPoolMessageSource)) -> crate::Action {
367                        TransactionPoolAction::ApplyTransitionFrontierDiffWithAccounts {
368                            accounts,
369                            pending_id: id.unwrap(),
370                        }
371                    }),
372                    pending_id: Some(pending_id),
373                    from_source: TransactionPoolMessageSource::None,
374                });
375            }
376            TransactionPoolAction::ApplyTransitionFrontierDiffWithAccounts {
377                accounts,
378                pending_id,
379            } => {
380                let TransactionPoolAction::ApplyTransitionFrontierDiff {
381                    best_tip_hash: _,
382                    diff,
383                } = substate.pending_actions.remove(pending_id).unwrap()
384                else {
385                    panic!()
386                };
387
388                let collect = |set: &BTreeSet<AccountId>| {
389                    set.iter()
390                        .filter_map(|id| {
391                            let account = accounts.get(id).cloned()?;
392                            Some((id.clone(), account))
393                        })
394                        .collect::<BTreeMap<_, _>>()
395                };
396
397                let (account_ids, uncommitted) =
398                    substate.pool.get_accounts_to_handle_transition_diff(&diff);
399
400                let in_cmds = collect(&account_ids);
401                let uncommitted = collect(&uncommitted);
402
403                if let Err(e) = substate.pool.handle_transition_frontier_diff(
404                    global_slot_from_genesis,
405                    global_slot,
406                    &diff,
407                    &account_ids,
408                    &in_cmds,
409                    &uncommitted,
410                ) {
411                    bug_condition!(
412                        "transaction pool::handle_transition_frontier_diff failed: {:?}",
413                        e
414                    );
415                }
416            }
417            TransactionPoolAction::Rebroadcast {
418                accepted,
419                rejected,
420                is_local,
421            } => {
422                let rejected = rejected.iter().map(|(cmd, _)| cmd.data.forget_check());
423
424                let all_commands = accepted
425                    .iter()
426                    .map(|cmd| cmd.data.forget_check())
427                    .chain(rejected)
428                    .collect::<Vec<_>>();
429
430                let dispatcher = state.into_dispatcher();
431
432                for cmd in all_commands {
433                    dispatcher.push(P2pChannelsTransactionAction::Libp2pBroadcast {
434                        transaction: Box::new((&cmd).into()),
435                        nonce: 0,
436                        is_local: *is_local,
437                    });
438                }
439            }
440            TransactionPoolAction::CollectTransactionsByFee => {
441                let transaction_capacity =
442                    2u64.pow(constraint_constants().transaction_capacity_log_2 as u32);
443                let transactions_by_fee = substate
444                    .pool
445                    .list_includable_transactions(transaction_capacity as usize)
446                    .into_iter()
447                    .map(|cmd| cmd.data)
448                    .collect::<Vec<_>>();
449
450                let dispatcher = state.into_dispatcher();
451
452                dispatcher.push(BlockProducerAction::WonSlotTransactionsSuccess {
453                    transactions_by_fee,
454                });
455            }
456            TransactionPoolAction::P2pSendAll => {
457                let (dispatcher, global_state) = state.into_dispatcher_and_state();
458                for peer_id in global_state.p2p.ready_peers() {
459                    dispatcher.push(TransactionPoolAction::P2pSend { peer_id });
460                }
461            }
462            TransactionPoolAction::P2pSend { peer_id } => {
463                let peer_id = *peer_id;
464                let (dispatcher, global_state) = state.into_dispatcher_and_state();
465                let Some(peer) = global_state.p2p.get_ready_peer(&peer_id) else {
466                    return;
467                };
468
469                // Send commitments.
470                let index_and_limit = peer.channels.transaction.next_send_index_and_limit();
471                let (transactions, first_index, last_index) = global_state
472                    .transaction_pool
473                    .dpool
474                    .next_messages_to_send(index_and_limit, |state| {
475                        let tx = global_state.transaction_pool.get(&state.hash)?;
476                        let tx = tx.clone().forget();
477                        // TODO(binier): avoid conversion
478                        Some((&Transaction::from(&tx)).into())
479                    });
480
481                dispatcher.push(P2pChannelsTransactionAction::ResponseSend {
482                    peer_id,
483                    transactions,
484                    first_index,
485                    last_index,
486                });
487            }
488        }
489    }
490}