1use backtrace::Backtrace;
2use serde::{Deserialize, Serialize};
3use std::{
4 borrow::{Borrow, Cow},
5 collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
6};
7
8use itertools::Itertools;
9use mina_curves::pasta::Fp;
10use mina_p2p_messages::{bigint::BigInt, v2};
11use openmina_core::{bug_condition, consensus::ConsensusConstants};
12
13use crate::{
14 scan_state::{
15 currency::{Amount, Balance, BlockTime, Fee, Magnitude, Nonce, Slot},
16 fee_rate::FeeRate,
17 transaction_logic::{
18 valid, verifiable,
19 zkapp_command::{
20 from_unapplied_sequence::{self, FromUnappliedSequence},
21 MaybeWithStatus, WithHash,
22 },
23 TransactionStatus::Applied,
24 UserCommand, WellFormednessError, WithStatus,
25 },
26 },
27 verifier::{Verifier, VerifierError},
28 Account, AccountId, BaseLedger, Mask, TokenId, VerificationKey, VerificationKeyWire,
29};
30
31#[derive(Debug, thiserror::Error)]
32pub enum TransactionPoolErrors {
33 #[error("Transaction pool errors: {0:?}")]
35 BatchedErrors(Vec<TransactionError>),
36 #[error("{0:?}")]
37 LoadingVK(String),
38 #[error("Unexpected error: {0}")]
40 Unexpected(String),
41}
42
43#[derive(Debug, thiserror::Error)]
44pub enum TransactionError {
45 #[error(transparent)]
46 Verifier(#[from] VerifierError),
47 #[error(transparent)]
48 WellFormedness(#[from] WellFormednessError),
49}
50
51impl From<String> for TransactionPoolErrors {
52 fn from(value: String) -> Self {
53 Self::Unexpected(value)
54 }
55}
56
57#[inline(never)]
58fn my_assert(v: bool) -> Result<(), CommandError> {
59 if !v {
60 let backtrace = Backtrace::new();
61 let s = format!("assert failed {:?}", backtrace);
62 bug_condition!("{:?}", s);
63 return Err(CommandError::Custom(Cow::Owned(s)));
64 }
65 Ok(())
66}
67
68mod consensus {
69 use crate::scan_state::currency::{BlockTimeSpan, Epoch, Length};
70
71 use super::*;
72
73 #[derive(Clone, Debug, Serialize, Deserialize)]
74 pub struct Constants {
75 k: Length,
76 delta: Length,
77 slots_per_sub_window: Length,
78 slots_per_window: Length,
79 sub_windows_per_window: Length,
80 slots_per_epoch: Length,
81 grace_period_slots: Length,
82 grace_period_end: Slot,
83 checkpoint_window_slots_per_year: Length,
84 checkpoint_window_size_in_slots: Length,
85 block_window_duration_ms: BlockTimeSpan,
86 slot_duration_ms: BlockTimeSpan,
87 epoch_duration: BlockTimeSpan,
88 delta_duration: BlockTimeSpan,
89 genesis_state_timestamp: BlockTime,
90 }
91
92 impl Constants {
93 pub fn create(constants: &ConsensusConstants) -> Self {
96 Constants {
97 k: Length::from_u32(constants.k),
98 delta: Length::from_u32(constants.delta),
99 slots_per_sub_window: Length::from_u32(constants.slots_per_sub_window),
100 slots_per_window: Length::from_u32(constants.slots_per_window),
101 sub_windows_per_window: Length::from_u32(constants.sub_windows_per_window),
102 slots_per_epoch: Length::from_u32(constants.slots_per_epoch),
103 grace_period_slots: Length::from_u32(constants.grace_period_slots),
104 grace_period_end: Slot::from_u32(constants.grace_period_end),
105 checkpoint_window_slots_per_year: Length::from_u32(
106 constants.checkpoint_window_slots_per_year,
107 ),
108 checkpoint_window_size_in_slots: Length::from_u32(
109 constants.checkpoint_window_size_in_slots,
110 ),
111 block_window_duration_ms: BlockTimeSpan::from_u64(
112 constants.block_window_duration_ms,
113 ),
114 slot_duration_ms: BlockTimeSpan::from_u64(constants.slot_duration_ms),
115 epoch_duration: BlockTimeSpan::from_u64(constants.epoch_duration),
116 delta_duration: BlockTimeSpan::from_u64(constants.delta_duration),
117 genesis_state_timestamp: constants.genesis_state_timestamp.into(),
118 }
119 }
120 }
121
122 impl Epoch {
124 fn of_time_exn(constants: &Constants, time: BlockTime) -> Result<Self, String> {
125 if time < constants.genesis_state_timestamp {
126 return Err(
127 "Epoch.of_time: time is earlier than genesis block timestamp".to_string(),
128 );
129 }
130
131 let time_since_genesis = time.diff(constants.genesis_state_timestamp);
132 let epoch = time_since_genesis.to_ms() / constants.epoch_duration.to_ms();
133 let epoch: u32 = epoch.try_into().unwrap();
134
135 Ok(Self::from_u32(epoch))
136 }
137
138 fn start_time(constants: &Constants, epoch: Self) -> BlockTime {
139 let ms = constants
140 .genesis_state_timestamp
141 .to_span_since_epoch()
142 .to_ms()
143 + ((epoch.as_u32() as u64) * constants.epoch_duration.to_ms());
144 BlockTime::of_span_since_epoch(BlockTimeSpan::of_ms(ms))
145 }
146
147 pub fn epoch_and_slot_of_time_exn(
148 constants: &Constants,
149 time: BlockTime,
150 ) -> Result<(Self, Slot), String> {
151 let epoch = Self::of_time_exn(constants, time)?;
152 let time_since_epoch = time.diff(Self::start_time(constants, epoch));
153
154 let slot: u64 = time_since_epoch.to_ms() / constants.slot_duration_ms.to_ms();
155 let slot = Slot::from_u32(slot.try_into().unwrap());
156
157 Ok((epoch, slot))
158 }
159 }
160
161 pub struct GlobalSlot {
163 slot_number: Slot,
164 slots_per_epoch: Length,
165 }
166
167 impl GlobalSlot {
168 fn create(constants: &Constants, epoch: Epoch, slot: Slot) -> Self {
169 let slot_number = slot.as_u32() + (constants.slots_per_epoch.as_u32() * epoch.as_u32());
170 Self {
171 slot_number: Slot::from_u32(slot_number),
172 slots_per_epoch: constants.slots_per_epoch,
173 }
174 }
175
176 fn of_epoch_and_slot(constants: &Constants, (epoch, slot): (Epoch, Slot)) -> Self {
177 Self::create(constants, epoch, slot)
178 }
179
180 pub fn of_time_exn(constants: &Constants, time: BlockTime) -> Result<Self, String> {
181 Ok(Self::of_epoch_and_slot(
182 constants,
183 Epoch::epoch_and_slot_of_time_exn(constants, time)?,
184 ))
185 }
186
187 pub fn to_global_slot(&self) -> Slot {
188 let Self {
189 slot_number,
190 slots_per_epoch: _,
191 } = self;
192 *slot_number
193 }
194 }
195}
196
197const REPLACE_FEE: Fee = Fee::of_nanomina_int_exn(1);
199
200pub type ValidCommandWithHash = WithHash<valid::UserCommand, v2::TransactionHash>;
201
202pub mod diff {
203 use super::*;
204
205 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, strum_macros::Display)]
206 pub enum Error {
207 InsufficientReplaceFee,
208 Duplicate,
209 InvalidNonce,
210 InsufficientFunds,
211 Overflow,
212 BadToken,
213 UnwantedFeeToken,
214 Expired,
215 Overloaded,
216 FeePayerAccountNotFound,
217 FeePayerNotPermittedToSend,
218 AfterSlotTxEnd,
219 BacktrackNonceMismatch,
220 InvalidCurrencyConsumed,
221 Custom,
222 }
223
224 impl Error {
225 pub fn grounds_for_diff_rejection(&self) -> bool {
226 match self {
227 Error::InsufficientReplaceFee
228 | Error::Duplicate
229 | Error::InvalidNonce
230 | Error::InsufficientFunds
231 | Error::Expired
232 | Error::Overloaded
233 | Error::FeePayerAccountNotFound
234 | Error::FeePayerNotPermittedToSend
235 | Error::AfterSlotTxEnd
236 | Error::InvalidCurrencyConsumed
237 | Error::Custom
238 | Error::BacktrackNonceMismatch => false,
239 Error::Overflow | Error::BadToken | Error::UnwantedFeeToken => true,
240 }
241 }
242 }
243
244 #[derive(Debug)]
245 pub struct Diff {
246 pub list: Vec<UserCommand>,
247 }
248
249 #[derive(Serialize, Deserialize, Debug, Clone)]
250 pub struct DiffVerified {
251 pub list: Vec<ValidCommandWithHash>,
252 }
253
254 struct Rejected {
255 list: Vec<(UserCommand, Error)>,
256 }
257
258 #[derive(Serialize, Deserialize, Debug, Clone)]
259 pub struct BestTipDiff {
260 pub new_commands: Vec<WithStatus<valid::UserCommand>>,
261 pub removed_commands: Vec<WithStatus<valid::UserCommand>>,
262 pub reorg_best_tip: bool,
263 }
264}
265
266fn preload_accounts(
267 ledger: &Mask,
268 account_ids: &BTreeSet<AccountId>,
269) -> HashMap<AccountId, Account> {
270 account_ids
271 .iter()
272 .filter_map(|id| {
273 let addr = ledger.location_of_account(id)?;
274 let account = ledger.get(addr)?;
275 Some((id.clone(), *account))
276 })
277 .collect()
278}
279
280#[derive(Clone, Debug, Serialize, Deserialize)]
281pub struct Config {
282 pub trust_system: (),
283 pub pool_max_size: usize,
284 pub slot_tx_end: Option<Slot>,
285}
286
287#[derive(Serialize, Deserialize)]
289struct VkRefcountTableBigInts {
290 verification_keys: Vec<(BigInt, (usize, WithHash<VerificationKey, BigInt>))>,
291 account_id_to_vks: Vec<(AccountId, Vec<(BigInt, usize)>)>,
292 vk_to_account_ids: Vec<(BigInt, Vec<(AccountId, usize)>)>,
293}
294impl From<VkRefcountTable> for VkRefcountTableBigInts {
295 fn from(value: VkRefcountTable) -> Self {
296 let VkRefcountTable {
297 verification_keys,
298 account_id_to_vks,
299 vk_to_account_ids,
300 } = value;
301 Self {
302 verification_keys: verification_keys
303 .into_iter()
304 .map(|(hash, (count, vk))| {
305 assert_eq!(hash, vk.hash());
306 let hash: BigInt = hash.into();
307 (
308 hash.clone(),
309 (
310 count,
311 WithHash {
312 data: vk.vk().clone(),
313 hash,
314 },
315 ),
316 )
317 })
318 .collect(),
319 account_id_to_vks: account_id_to_vks
320 .into_iter()
321 .map(|(id, map)| {
322 (
323 id,
324 map.into_iter()
325 .map(|(hash, count)| (hash.into(), count))
326 .collect(),
327 )
328 })
329 .collect(),
330 vk_to_account_ids: vk_to_account_ids
331 .into_iter()
332 .map(|(hash, map)| (hash.into(), map.into_iter().collect()))
333 .collect(),
334 }
335 }
336}
337impl From<VkRefcountTableBigInts> for VkRefcountTable {
338 fn from(value: VkRefcountTableBigInts) -> Self {
339 let VkRefcountTableBigInts {
340 verification_keys,
341 account_id_to_vks,
342 vk_to_account_ids,
343 } = value;
344 Self {
345 verification_keys: verification_keys
346 .into_iter()
347 .map(|(hash, (count, vk))| {
348 assert_eq!(hash, vk.hash);
349 let hash: Fp = hash.to_field().unwrap(); (hash, (count, VerificationKeyWire::with_hash(vk.data, hash)))
351 })
352 .collect(),
353 account_id_to_vks: account_id_to_vks
354 .into_iter()
355 .map(|(id, map)| {
356 let map = map
357 .into_iter()
358 .map(|(bigint, count)| (bigint.to_field::<Fp>().unwrap(), count)) .collect();
360 (id, map)
361 })
362 .collect(),
363 vk_to_account_ids: vk_to_account_ids
364 .into_iter()
365 .map(|(hash, map)| (hash.to_field().unwrap(), map.into_iter().collect())) .collect(),
367 }
368 }
369}
370
371#[derive(Clone, Debug, Default, Serialize, Deserialize)]
372#[serde(into = "VkRefcountTableBigInts")]
373#[serde(from = "VkRefcountTableBigInts")]
374struct VkRefcountTable {
375 verification_keys: HashMap<Fp, (usize, VerificationKeyWire)>,
376 account_id_to_vks: HashMap<AccountId, HashMap<Fp, usize>>,
377 vk_to_account_ids: HashMap<Fp, HashMap<AccountId, usize>>,
378}
379
380impl VkRefcountTable {
381 fn find_vk(&self, f: &Fp) -> Option<&(usize, VerificationKeyWire)> {
382 self.verification_keys.get(f)
383 }
384
385 fn find_vks_by_account_id(&self, account_id: &AccountId) -> Vec<&VerificationKeyWire> {
386 let Some(vks) = self.account_id_to_vks.get(account_id) else {
387 return Vec::new();
388 };
389
390 vks.iter()
391 .map(|(f, _)| self.find_vk(f).expect("malformed Vk_refcount_table.t"))
392 .map(|(_, vk)| vk)
393 .collect()
394 }
395
396 fn inc(&mut self, account_id: AccountId, vk: VerificationKeyWire) {
397 use std::collections::hash_map::Entry::{Occupied, Vacant};
398
399 match self.verification_keys.entry(vk.hash()) {
400 Vacant(e) => {
401 e.insert((1, vk.clone()));
402 }
403 Occupied(mut e) => {
404 let (count, _vk) = e.get_mut();
405 *count += 1;
406 }
407 }
408
409 let map = self
410 .account_id_to_vks
411 .entry(account_id.clone())
412 .or_default(); let count = map.entry(vk.hash()).or_default(); *count += 1;
415
416 let map = self.vk_to_account_ids.entry(vk.hash()).or_default(); let count = map.entry(account_id).or_default(); *count += 1;
419 }
420
421 fn dec(&mut self, account_id: AccountId, vk_hash: Fp) {
422 use std::collections::hash_map::Entry::{Occupied, Vacant};
423
424 match self.verification_keys.entry(vk_hash) {
425 Vacant(_e) => {
426 bug_condition!("vk_map: Unexpected error on self.verification_keys: vacant vk_hash")
427 }
428 Occupied(mut e) => {
429 let (count, _vk) = e.get_mut();
430 if *count == 1 {
431 e.remove();
432 } else {
433 *count = (*count).checked_sub(1).unwrap();
434 }
435 }
436 }
437
438 fn remove<K1, K2>(
439 key1: K1,
440 key2: K2,
441 table: &mut HashMap<K1, HashMap<K2, usize>>,
442 ) -> Result<(), &'static str>
443 where
444 K1: std::hash::Hash + Eq,
445 K2: std::hash::Hash + Eq,
446 {
447 match table.entry(key1) {
448 Vacant(_e) => return Err("vacant on key1"),
449 Occupied(mut e) => {
450 let map = e.get_mut();
451 match map.entry(key2) {
452 Vacant(_e) => return Err("vacant on key2"),
453 Occupied(mut e2) => {
454 let count: &mut usize = e2.get_mut();
455 if *count == 1 {
456 e2.remove();
457 e.remove();
458 } else {
459 *count = count.checked_sub(1).ok_or("invalid count state")?
460 }
461 }
462 }
463 }
464 }
465 Ok(())
466 }
467
468 if let Err(e) = remove(account_id.clone(), vk_hash, &mut self.account_id_to_vks) {
469 bug_condition!(
470 "vk_map: Unexpected error on self.account_id_to_vks: {:?}",
471 e
472 );
473 }
474 if let Err(e) = remove(vk_hash, account_id.clone(), &mut self.vk_to_account_ids) {
475 bug_condition!(
476 "vk_map: Unexpected error on self.vk_to_account_ids: {:?}",
477 e
478 );
479 }
480 }
481
482 fn decrement_list(&mut self, list: &[ValidCommandWithHash]) {
483 list.iter().for_each(|c| {
484 for (id, vk) in c.data.forget_check().extract_vks() {
485 self.dec(id, vk.hash());
486 }
487 });
488 }
489
490 fn decrement_hashed<I, V>(&mut self, list: I)
491 where
492 I: IntoIterator<Item = V>,
493 V: Borrow<ValidCommandWithHash>,
494 {
495 list.into_iter().for_each(|c| {
496 for (id, vk) in c.borrow().data.forget_check().extract_vks() {
497 self.dec(id, vk.hash());
498 }
499 });
500 }
501
502 fn increment_hashed<I, V>(&mut self, list: I)
503 where
504 I: IntoIterator<Item = V>,
505 V: Borrow<ValidCommandWithHash>,
506 {
507 list.into_iter().for_each(|c| {
508 for (id, vk) in c.borrow().data.forget_check().extract_vks() {
509 self.inc(id, vk);
510 }
511 });
512 }
513
514 fn increment_list(&mut self, list: &[ValidCommandWithHash]) {
515 list.iter().for_each(|c| {
516 for (id, vk) in c.data.forget_check().extract_vks() {
517 self.inc(id, vk);
518 }
519 });
520 }
521}
522
523#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
524enum Batch {
525 Of(usize),
526}
527
528#[derive(Debug)]
529pub enum CommandError {
530 InvalidNonce {
531 account_nonce: Nonce,
532 expected: Nonce,
533 },
534 InsufficientFunds {
535 balance: Balance,
536 consumed: Amount,
537 },
538 InsufficientReplaceFee {
541 replace_fee: Fee,
542 fee: Fee,
543 },
544 Overflow,
545 BadToken,
546 Expired {
547 valid_until: Slot,
548 global_slot_since_genesis: Slot,
549 },
550 UnwantedFeeToken {
551 token_id: TokenId,
552 },
553 AfterSlotTxEnd,
554 BacktrackNonceMismatch {
555 expected_nonce: Nonce,
556 first_nonce: Nonce,
557 },
558 InvalidCurrencyConsumed,
559 Custom(Cow<'static, str>),
560}
561
562impl From<CommandError> for String {
563 fn from(value: CommandError) -> Self {
564 format!("{:?}", value)
565 }
566}
567
568impl From<CommandError> for diff::Error {
569 fn from(value: CommandError) -> Self {
570 match value {
571 CommandError::InvalidNonce { .. } => diff::Error::InvalidNonce,
572 CommandError::InsufficientFunds { .. } => diff::Error::InsufficientFunds,
573 CommandError::InsufficientReplaceFee { .. } => diff::Error::InsufficientReplaceFee,
574 CommandError::Overflow => diff::Error::Overflow,
575 CommandError::BadToken => diff::Error::BadToken,
576 CommandError::Expired { .. } => diff::Error::Expired,
577 CommandError::UnwantedFeeToken { .. } => diff::Error::UnwantedFeeToken,
578 CommandError::AfterSlotTxEnd => diff::Error::AfterSlotTxEnd,
579 CommandError::BacktrackNonceMismatch { .. } => diff::Error::BacktrackNonceMismatch,
580 CommandError::InvalidCurrencyConsumed => diff::Error::InvalidCurrencyConsumed,
581 CommandError::Custom(_) => diff::Error::Custom,
582 }
583 }
584}
585
586#[derive(Clone, Debug, Serialize, Deserialize)]
587pub struct IndexedPoolConfig {
588 pub consensus_constants: consensus::Constants,
589 slot_tx_end: Option<Slot>,
590}
591
592#[derive(Clone, Debug, Serialize, Deserialize)]
603pub struct IndexedPool {
604 applicable_by_fee: HashMap<FeeRate, HashSet<ValidCommandWithHash>>,
607 all_by_sender: HashMap<AccountId, (VecDeque<ValidCommandWithHash>, Amount)>,
612 all_by_fee: HashMap<FeeRate, HashSet<ValidCommandWithHash>>,
614 all_by_hash: HashMap<v2::TransactionHash, ValidCommandWithHash>,
615 transactions_with_expiration: HashMap<Slot, HashSet<ValidCommandWithHash>>,
617 size: usize,
618 pub config: IndexedPoolConfig,
619}
620
621enum Update {
622 Add {
623 command: ValidCommandWithHash,
624 fee_per_wu: FeeRate,
625 add_to_applicable_by_fee: bool,
626 },
627 RemoveAllByFeeAndHashAndExpiration {
628 commands: VecDeque<ValidCommandWithHash>,
629 },
630 RemoveFromApplicableByFee {
631 fee_per_wu: FeeRate,
632 command: ValidCommandWithHash,
633 },
634}
635
636#[derive(Clone)]
637struct SenderState {
638 sender: AccountId,
639 state: Option<(VecDeque<ValidCommandWithHash>, Amount)>,
640}
641
642pub enum RevalidateKind<'a> {
643 EntirePool,
644 Subset(&'a BTreeSet<AccountId>),
645}
646
647impl IndexedPool {
648 fn new(constants: &ConsensusConstants) -> Self {
649 Self {
650 applicable_by_fee: HashMap::new(),
651 all_by_sender: HashMap::new(),
652 all_by_fee: HashMap::new(),
653 all_by_hash: HashMap::new(),
654 transactions_with_expiration: HashMap::new(),
655 size: 0,
656 config: IndexedPoolConfig {
657 consensus_constants: consensus::Constants::create(constants),
658 slot_tx_end: None,
659 },
660 }
661 }
662
663 fn size(&self) -> usize {
664 self.size
665 }
666
667 fn min_fee(&self) -> Option<FeeRate> {
668 self.all_by_fee.keys().min().cloned()
669 }
670
671 fn member(&self, cmd: &ValidCommandWithHash) -> bool {
672 self.all_by_hash.contains_key(&cmd.hash)
673 }
674
675 pub fn get(&self, hash: &v2::TransactionHash) -> Option<&ValidCommandWithHash> {
676 self.all_by_hash.get(hash)
677 }
678
679 fn check_expiry(
680 &self,
681 global_slot_since_genesis: Slot,
682 cmd: &UserCommand,
683 ) -> Result<(), CommandError> {
684 let valid_until = cmd.valid_until();
685
686 if valid_until < global_slot_since_genesis {
687 return Err(CommandError::Expired {
688 valid_until,
689 global_slot_since_genesis,
690 });
691 }
692
693 Ok(())
694 }
695
696 fn map_set_insert<K, V>(map: &mut HashMap<K, HashSet<V>>, key: K, value: V)
698 where
699 K: std::hash::Hash + PartialEq + Eq,
700 V: std::hash::Hash + PartialEq + Eq,
701 {
702 let entry = map.entry(key).or_default();
703 entry.insert(value);
704 }
705
706 fn map_set_remove<K, V>(map: &mut HashMap<K, HashSet<V>>, key: K, value: &V)
708 where
709 K: std::hash::Hash + PartialEq + Eq,
710 V: std::hash::Hash + PartialEq + Eq,
711 {
712 let Entry::Occupied(mut entry) = map.entry(key) else {
713 return;
714 };
715 let set = entry.get_mut();
716 set.remove(value);
717 if set.is_empty() {
718 entry.remove();
719 }
720 }
721
722 fn update_expiration_map(&mut self, cmd: ValidCommandWithHash, is_add: bool) {
723 let user_cmd = cmd.data.forget_check();
724 let expiry = user_cmd.valid_until();
725 if expiry == Slot::max() {
726 return; }
728 if is_add {
729 Self::map_set_insert(&mut self.transactions_with_expiration, expiry, cmd);
730 } else {
731 Self::map_set_remove(&mut self.transactions_with_expiration, expiry, &cmd);
732 }
733 }
734
735 fn remove_from_expiration_exn(&mut self, cmd: ValidCommandWithHash) {
736 self.update_expiration_map(cmd, false);
737 }
738
739 fn add_to_expiration(&mut self, cmd: ValidCommandWithHash) {
740 self.update_expiration_map(cmd, true);
741 }
742
743 fn remove_applicable_exn(&mut self, cmd: &ValidCommandWithHash) {
746 let fee_per_wu = cmd.data.forget_check().fee_per_wu();
747 Self::map_set_remove(&mut self.applicable_by_fee, fee_per_wu, cmd);
748 }
749
750 fn make_queue<T>() -> VecDeque<T> {
751 VecDeque::with_capacity(256)
752 }
753
754 fn add_from_backtrack(
755 &mut self,
756 global_slot_since_genesis: Slot,
757 current_global_slot: Slot,
758 cmd: ValidCommandWithHash,
759 ) -> Result<(), CommandError> {
760 let IndexedPoolConfig { slot_tx_end, .. } = &self.config;
761
762 if !slot_tx_end
763 .as_ref()
764 .map(|slot_tx_end| current_global_slot < *slot_tx_end)
765 .unwrap_or(true)
766 {
767 return Err(CommandError::AfterSlotTxEnd);
768 }
769
770 let ValidCommandWithHash {
771 data: unchecked,
772 hash: cmd_hash,
773 } = &cmd;
774 let unchecked = unchecked.forget_check();
775
776 self.check_expiry(global_slot_since_genesis, &unchecked)?;
777
778 let fee_payer = unchecked.fee_payer();
779 let fee_per_wu = unchecked.fee_per_wu();
780
781 let consumed = currency_consumed(&unchecked)?;
782
783 match self.all_by_sender.get_mut(&fee_payer) {
784 None => {
785 {
786 let mut queue = Self::make_queue();
787 queue.push_back(cmd.clone());
788 self.all_by_sender.insert(fee_payer, (queue, consumed));
789 }
790 Self::map_set_insert(&mut self.all_by_fee, fee_per_wu.clone(), cmd.clone());
791 self.all_by_hash.insert(cmd_hash.clone(), cmd.clone());
792 Self::map_set_insert(&mut self.applicable_by_fee, fee_per_wu.clone(), cmd.clone());
793 self.add_to_expiration(cmd);
794 self.size += 1;
795 }
796 Some((queue, currency_reserved)) => {
797 let first_queued = queue.front().cloned().unwrap();
798 let expected_nonce = unchecked.expected_target_nonce();
799 let first_nonce = first_queued.data.forget_check().applicable_at_nonce();
800
801 if expected_nonce != first_nonce {
802 return Err(CommandError::BacktrackNonceMismatch {
805 expected_nonce,
806 first_nonce,
807 });
808 }
809
810 {
812 queue.push_front(cmd.clone());
813 *currency_reserved = currency_reserved.checked_add(&consumed).unwrap();
814 }
815
816 self.remove_applicable_exn(&first_queued);
817
818 Self::map_set_insert(&mut self.applicable_by_fee, fee_per_wu.clone(), cmd.clone());
819 Self::map_set_insert(&mut self.all_by_fee, fee_per_wu.clone(), cmd.clone());
820 self.all_by_hash.insert(cmd_hash.clone(), cmd.clone());
821 self.add_to_expiration(cmd);
822 self.size += 1;
823 }
824 }
825 Ok(())
826 }
827
828 fn update_add(
829 &mut self,
830 cmd: ValidCommandWithHash,
831 fee_per_wu: FeeRate,
832 add_to_applicable_by_fee: bool,
833 ) {
834 if add_to_applicable_by_fee {
835 Self::map_set_insert(&mut self.applicable_by_fee, fee_per_wu.clone(), cmd.clone());
836 }
837
838 let cmd_hash = cmd.hash.clone();
839
840 Self::map_set_insert(&mut self.all_by_fee, fee_per_wu, cmd.clone());
841 self.all_by_hash.insert(cmd_hash, cmd.clone());
842 self.add_to_expiration(cmd);
843 self.size += 1;
844 }
845
846 fn update_remove_all_by_fee_and_hash_and_expiration<I>(&mut self, cmds: I)
849 where
850 I: IntoIterator<Item = ValidCommandWithHash>,
851 {
852 for cmd in cmds {
853 let fee_per_wu = cmd.data.forget_check().fee_per_wu();
854 let cmd_hash = cmd.hash.clone();
855 Self::map_set_remove(&mut self.all_by_fee, fee_per_wu, &cmd);
856 self.all_by_hash.remove(&cmd_hash);
857 self.remove_from_expiration_exn(cmd);
858 self.size = self.size.checked_sub(1).unwrap();
859 }
860 }
861
862 fn update_remove_from_applicable_by_fee(
863 &mut self,
864 fee_per_wu: FeeRate,
865 command: &ValidCommandWithHash,
866 ) {
867 Self::map_set_remove(&mut self.applicable_by_fee, fee_per_wu, command)
868 }
869
870 fn remove_with_dependents_exn(
871 &mut self,
872 cmd: &ValidCommandWithHash,
873 ) -> Result<VecDeque<ValidCommandWithHash>, CommandError> {
874 let sender = cmd.data.fee_payer();
875 let mut by_sender = SenderState {
876 state: self.all_by_sender.get(&sender).cloned(),
877 sender,
878 };
879
880 let mut updates = Vec::<Update>::with_capacity(128);
881 let result = self.remove_with_dependents_exn_impl(cmd, &mut by_sender, &mut updates);
882
883 self.set_sender(by_sender);
884 self.apply_updates(updates);
885
886 result
887 }
888
889 fn remove_with_dependents_exn_impl(
890 &self,
891 cmd: &ValidCommandWithHash,
892 by_sender: &mut SenderState,
893 updates: &mut Vec<Update>,
894 ) -> Result<VecDeque<ValidCommandWithHash>, CommandError> {
895 let (sender_queue, reserved_currency_ref) = by_sender.state.as_mut().unwrap();
896 let unchecked = cmd.data.forget_check();
897
898 my_assert(!sender_queue.is_empty())?;
899
900 let cmd_nonce = unchecked.applicable_at_nonce();
901
902 let cmd_index = sender_queue
903 .iter()
904 .position(|cmd| {
905 let nonce = cmd.data.forget_check().applicable_at_nonce();
906 nonce == cmd_nonce
908 })
909 .unwrap();
910
911 let drop_queue = sender_queue.split_off(cmd_index);
912 let keep_queue = sender_queue;
913 my_assert(!drop_queue.is_empty())?;
914
915 let currency_to_remove = drop_queue.iter().try_fold(Amount::zero(), |acc, cmd| {
916 let consumed = currency_consumed(&cmd.data.forget_check())?;
917 Ok(consumed.checked_add(&acc).unwrap())
918 })?;
919
920 let reserved_currency = reserved_currency_ref
923 .checked_sub(¤cy_to_remove)
924 .unwrap();
925
926 updates.push(Update::RemoveAllByFeeAndHashAndExpiration {
927 commands: drop_queue.clone(),
928 });
929
930 if cmd_index == 0 {
931 updates.push(Update::RemoveFromApplicableByFee {
932 fee_per_wu: unchecked.fee_per_wu(),
933 command: cmd.clone(),
934 });
935 }
936
937 if !keep_queue.is_empty() {
940 *reserved_currency_ref = reserved_currency;
941 } else {
942 my_assert(reserved_currency.is_zero())?;
943 by_sender.state = None;
944 }
945
946 Ok(drop_queue)
947 }
948
949 fn apply_updates(&mut self, updates: Vec<Update>) {
950 for update in updates {
951 match update {
952 Update::Add {
953 command,
954 fee_per_wu,
955 add_to_applicable_by_fee,
956 } => self.update_add(command, fee_per_wu, add_to_applicable_by_fee),
957 Update::RemoveAllByFeeAndHashAndExpiration { commands } => {
958 self.update_remove_all_by_fee_and_hash_and_expiration(commands)
959 }
960 Update::RemoveFromApplicableByFee {
961 fee_per_wu,
962 command,
963 } => self.update_remove_from_applicable_by_fee(fee_per_wu, &command),
964 }
965 }
966 }
967
968 fn set_sender(&mut self, by_sender: SenderState) {
969 let SenderState { sender, state } = by_sender;
970
971 match state {
972 Some(state) => {
973 self.all_by_sender.insert(sender, state);
974 }
975 None => {
976 self.all_by_sender.remove(&sender);
977 }
978 }
979 }
980
981 fn add_from_gossip_exn(
982 &mut self,
983 global_slot_since_genesis: Slot,
984 current_global_slot: Slot,
985 cmd: &ValidCommandWithHash,
986 current_nonce: Nonce,
987 balance: Balance,
988 ) -> Result<(ValidCommandWithHash, VecDeque<ValidCommandWithHash>), CommandError> {
989 let sender = cmd.data.fee_payer();
990 let mut by_sender = SenderState {
991 state: self.all_by_sender.get(&sender).cloned(),
992 sender,
993 };
994
995 let mut updates = Vec::<Update>::with_capacity(128);
996 let result = self.add_from_gossip_exn_impl(
997 global_slot_since_genesis,
998 current_global_slot,
999 cmd,
1000 current_nonce,
1001 balance,
1002 &mut by_sender,
1003 &mut updates,
1004 )?;
1005
1006 self.set_sender(by_sender);
1007 self.apply_updates(updates);
1008
1009 Ok(result)
1010 }
1011
1012 fn add_from_gossip_exn_impl(
1013 &self,
1014 global_slot_since_genesis: Slot,
1015 current_global_slot: Slot,
1016 cmd: &ValidCommandWithHash,
1017 current_nonce: Nonce,
1018 balance: Balance,
1019 by_sender: &mut SenderState,
1020 updates: &mut Vec<Update>,
1021 ) -> Result<(ValidCommandWithHash, VecDeque<ValidCommandWithHash>), CommandError> {
1022 let IndexedPoolConfig { slot_tx_end, .. } = &self.config;
1023
1024 if !slot_tx_end
1025 .as_ref()
1026 .map(|slot_tx_end| current_global_slot < *slot_tx_end)
1027 .unwrap_or(true)
1028 {
1029 return Err(CommandError::AfterSlotTxEnd);
1030 }
1031
1032 let unchecked = cmd.data.forget_check();
1033 let fee = unchecked.fee();
1034 let fee_per_wu = unchecked.fee_per_wu();
1035 let cmd_applicable_at_nonce = unchecked.applicable_at_nonce();
1036
1037 let consumed = {
1038 self.check_expiry(global_slot_since_genesis, &unchecked)?;
1039 let consumed = currency_consumed(&unchecked).map_err(|_| CommandError::Overflow)?;
1040 if !unchecked.fee_token().is_default() {
1041 return Err(CommandError::UnwantedFeeToken {
1042 token_id: unchecked.fee_token(),
1043 });
1044 }
1045 consumed
1046 };
1047
1048 match by_sender.state.clone() {
1049 None => {
1050 if current_nonce != cmd_applicable_at_nonce {
1051 return Err(CommandError::InvalidNonce {
1052 account_nonce: current_nonce,
1053 expected: cmd_applicable_at_nonce,
1054 });
1055 }
1056 if consumed > balance.to_amount() {
1057 return Err(CommandError::InsufficientFunds { balance, consumed });
1058 }
1059
1060 let mut queue = Self::make_queue();
1061 queue.push_back(cmd.clone());
1062 by_sender.state = Some((queue, consumed));
1063
1064 updates.push(Update::Add {
1065 command: cmd.clone(),
1066 fee_per_wu,
1067 add_to_applicable_by_fee: true,
1068 });
1069
1070 Ok((cmd.clone(), Self::make_queue()))
1071 }
1072 Some((mut queued_cmds, reserved_currency)) => {
1073 my_assert(!queued_cmds.is_empty())?;
1074 let queue_applicable_at_nonce = {
1075 let first = queued_cmds.front().unwrap();
1076 first.data.forget_check().applicable_at_nonce()
1077 };
1078 let queue_target_nonce = {
1079 let last = queued_cmds.back().unwrap();
1080 last.data.forget_check().expected_target_nonce()
1081 };
1082 if queue_target_nonce == cmd_applicable_at_nonce {
1083 let reserved_currency = consumed
1084 .checked_add(&reserved_currency)
1085 .ok_or(CommandError::Overflow)?;
1086
1087 if reserved_currency > balance.to_amount() {
1088 return Err(CommandError::InsufficientFunds {
1089 balance,
1090 consumed: reserved_currency,
1091 });
1092 }
1093
1094 queued_cmds.push_back(cmd.clone());
1095
1096 updates.push(Update::Add {
1097 command: cmd.clone(),
1098 fee_per_wu,
1099 add_to_applicable_by_fee: false,
1100 });
1101
1102 by_sender.state = Some((queued_cmds, reserved_currency));
1103
1104 Ok((cmd.clone(), Self::make_queue()))
1105 } else if queue_applicable_at_nonce == current_nonce {
1106 if !cmd_applicable_at_nonce
1107 .between(&queue_applicable_at_nonce, &queue_target_nonce)
1108 {
1109 return Err(CommandError::InvalidNonce {
1110 account_nonce: cmd_applicable_at_nonce,
1111 expected: queue_applicable_at_nonce,
1112 });
1113 }
1114
1115 let replacement_index = queued_cmds
1116 .iter()
1117 .position(|cmd| {
1118 let cmd_applicable_at_nonce_prime =
1119 cmd.data.forget_check().applicable_at_nonce();
1120 cmd_applicable_at_nonce <= cmd_applicable_at_nonce_prime
1121 })
1122 .unwrap();
1123
1124 let drop_queue = queued_cmds.split_off(replacement_index);
1125
1126 let to_drop = drop_queue.front().unwrap().data.forget_check();
1127 my_assert(cmd_applicable_at_nonce <= to_drop.applicable_at_nonce())?;
1128
1129 {
1132 let replace_fee = to_drop.fee();
1133 if fee < replace_fee {
1134 return Err(CommandError::InsufficientReplaceFee { replace_fee, fee });
1135 }
1136 }
1137
1138 let dropped = self.remove_with_dependents_exn_impl(
1139 drop_queue.front().unwrap(),
1140 by_sender,
1141 updates,
1142 )?;
1143 my_assert(drop_queue == dropped)?;
1144
1145 let (cmd, _) = {
1146 let (v, dropped) = self.add_from_gossip_exn_impl(
1147 global_slot_since_genesis,
1148 current_global_slot,
1149 cmd,
1150 current_nonce,
1151 balance,
1152 by_sender,
1153 updates,
1154 )?;
1155 my_assert(dropped.is_empty())?;
1157 (v, dropped)
1158 };
1159
1160 let drop_head = dropped.front().cloned().unwrap();
1161 let mut drop_tail = dropped.into_iter().skip(1).peekable();
1162
1163 let mut increment = fee.checked_sub(&to_drop.fee()).unwrap();
1164 let mut dropped = None::<VecDeque<_>>;
1165 let mut current_nonce = current_nonce;
1166 let mut this_updates = Vec::with_capacity(128);
1167
1168 while let Some(cmd) = drop_tail.peek() {
1169 if dropped.is_some() {
1170 let cmd_unchecked = cmd.data.forget_check();
1171 let replace_fee = cmd_unchecked.fee();
1172
1173 increment = increment.checked_sub(&replace_fee).ok_or({
1174 CommandError::InsufficientReplaceFee {
1175 replace_fee,
1176 fee: increment,
1177 }
1178 })?;
1179 } else {
1180 current_nonce = current_nonce.succ();
1181 let by_sender_pre = by_sender.clone();
1182 this_updates.clear();
1183
1184 match self.add_from_gossip_exn_impl(
1185 global_slot_since_genesis,
1186 current_global_slot,
1187 cmd,
1188 current_nonce,
1189 balance,
1190 by_sender,
1191 &mut this_updates,
1192 ) {
1193 Ok((_cmd, dropped)) => {
1194 my_assert(dropped.is_empty())?;
1195 updates.append(&mut this_updates);
1196 }
1197 Err(_) => {
1198 *by_sender = by_sender_pre;
1199 dropped = Some(drop_tail.clone().skip(1).collect());
1200 continue; }
1202 }
1203 }
1204 let _ = drop_tail.next();
1205 }
1206
1207 if increment < REPLACE_FEE {
1208 return Err(CommandError::InsufficientReplaceFee {
1209 replace_fee: REPLACE_FEE,
1210 fee: increment,
1211 });
1212 }
1213
1214 let mut dropped = dropped.unwrap_or_else(Self::make_queue);
1215 dropped.push_front(drop_head);
1216
1217 Ok((cmd, dropped))
1218 } else {
1219 Err(CommandError::InvalidNonce {
1221 account_nonce: cmd_applicable_at_nonce,
1222 expected: queue_target_nonce,
1223 })
1224 }
1225 }
1226 }
1227 }
1228
1229 fn expired_by_global_slot(&self, global_slot_since_genesis: Slot) -> Vec<ValidCommandWithHash> {
1230 self.transactions_with_expiration
1231 .iter()
1232 .filter(|(slot, _cmd)| **slot < global_slot_since_genesis)
1233 .flat_map(|(_slot, cmd)| cmd.iter().cloned())
1234 .collect()
1235 }
1236
1237 fn expired(&self, global_slot_since_genesis: Slot) -> Vec<ValidCommandWithHash> {
1238 self.expired_by_global_slot(global_slot_since_genesis)
1239 }
1240
1241 fn remove_expired(
1242 &mut self,
1243 global_slot_since_genesis: Slot,
1244 ) -> Result<Vec<ValidCommandWithHash>, CommandError> {
1245 let mut dropped = Vec::with_capacity(128);
1246 for cmd in self.expired(global_slot_since_genesis) {
1247 if self.member(&cmd) {
1248 let removed = self.remove_with_dependents_exn(&cmd)?;
1249 dropped.extend(removed);
1250 }
1251 }
1252 Ok(dropped)
1253 }
1254
1255 fn remove_lowest_fee(&mut self) -> Result<VecDeque<ValidCommandWithHash>, CommandError> {
1256 let Some(set) = self.min_fee().and_then(|fee| self.all_by_fee.get(&fee)) else {
1257 return Ok(VecDeque::new());
1258 };
1259
1260 #[allow(clippy::mutable_key_type)]
1262 let bset: BTreeSet<_> = set.iter().collect();
1263 let min = bset.first().map(|min| (*min).clone()).unwrap();
1265
1266 self.remove_with_dependents_exn(&min)
1267 }
1268
1269 fn drop_until_sufficient_balance(
1272 mut queue: VecDeque<ValidCommandWithHash>,
1273 mut currency_reserved: Amount,
1274 current_balance: Amount,
1275 ) -> Result<
1276 (
1277 VecDeque<ValidCommandWithHash>,
1278 Amount,
1279 VecDeque<ValidCommandWithHash>,
1280 ),
1281 CommandError,
1282 > {
1283 let mut dropped_so_far = VecDeque::with_capacity(queue.len());
1284
1285 while currency_reserved > current_balance {
1286 let last = queue.pop_back().unwrap();
1287 let consumed = currency_consumed(&last.data.forget_check())?;
1288 dropped_so_far.push_back(last);
1289 currency_reserved = currency_reserved.checked_sub(&consumed).unwrap();
1290 }
1291
1292 Ok((queue, currency_reserved, dropped_so_far))
1293 }
1294
1295 fn revalidate<F>(
1296 &mut self,
1297 global_slot_since_genesis: Slot,
1298 kind: RevalidateKind,
1299 get_account: F,
1300 ) -> Result<Vec<ValidCommandWithHash>, CommandError>
1301 where
1302 F: Fn(&AccountId) -> Option<Account>,
1303 {
1304 let requires_revalidation = |account_id: &AccountId| match kind {
1305 RevalidateKind::EntirePool => true,
1306 RevalidateKind::Subset(set) => set.contains(account_id),
1307 };
1308
1309 let mut dropped = Vec::new();
1310
1311 for (sender, (mut queue, mut currency_reserved)) in self.all_by_sender.clone() {
1312 if !requires_revalidation(&sender) {
1313 continue;
1314 }
1315 let account: Account = get_account(&sender)
1316 .ok_or(CommandError::Custom(Cow::Borrowed("Account not find")))?;
1317 let current_balance = account
1318 .liquid_balance_at_slot(global_slot_since_genesis)
1319 .to_amount();
1320 let first_cmd = queue.front().cloned().unwrap();
1321 let first_nonce = first_cmd.data.forget_check().applicable_at_nonce();
1322
1323 if !(account.has_permission_to_send() && account.has_permission_to_increment_nonce())
1324 || account.nonce < first_nonce
1325 {
1326 let this_dropped = self.remove_with_dependents_exn(&first_cmd)?;
1327 dropped.extend(this_dropped);
1328 } else {
1329 let first_applicable_nonce_index = queue.iter().position(|cmd| {
1331 let nonce = cmd.data.forget_check().applicable_at_nonce();
1332 nonce == account.nonce
1333 });
1334
1335 let retained_for_nonce = match first_applicable_nonce_index {
1336 Some(index) => queue.split_off(index),
1337 None => Default::default(),
1338 };
1339 let dropped_for_nonce = queue;
1340
1341 for cmd in &dropped_for_nonce {
1342 currency_reserved = currency_reserved
1343 .checked_sub(¤cy_consumed(&cmd.data.forget_check())?)
1344 .unwrap();
1345 }
1346
1347 let (keep_queue, currency_reserved, dropped_for_balance) =
1348 Self::drop_until_sufficient_balance(
1349 retained_for_nonce,
1350 currency_reserved,
1351 current_balance,
1352 )?;
1353
1354 let keeping_prefix = dropped_for_nonce.is_empty();
1355 let keeping_suffix = dropped_for_balance.is_empty();
1356 let to_drop: Vec<_> = dropped_for_nonce
1357 .into_iter()
1358 .chain(dropped_for_balance)
1359 .collect();
1360
1361 match keep_queue.front().cloned() {
1362 _ if keeping_prefix && keeping_suffix => {
1363 }
1365 None => {
1366 self.remove_applicable_exn(&first_cmd);
1369 self.all_by_sender.remove(&sender);
1370 }
1371 Some(_) if keeping_prefix => {
1372 self.all_by_sender
1375 .insert(sender, (keep_queue, currency_reserved));
1376 }
1377 Some(first_kept) => {
1378 let first_kept_unchecked = first_kept.data.forget_check();
1381 self.all_by_sender
1382 .insert(sender, (keep_queue, currency_reserved));
1383 self.remove_applicable_exn(&first_cmd);
1384 Self::map_set_insert(
1385 &mut self.applicable_by_fee,
1386 first_kept_unchecked.fee_per_wu(),
1387 first_kept,
1388 );
1389 }
1390 }
1391 self.update_remove_all_by_fee_and_hash_and_expiration(to_drop.clone());
1392 dropped.extend(to_drop);
1393 }
1394 }
1395
1396 Ok(dropped)
1397 }
1398
1399 fn list_includable_transactions(&self, limit: usize) -> Vec<ValidCommandWithHash> {
1402 let mut txns = Vec::with_capacity(self.applicable_by_fee.len());
1403
1404 let mut applicable_by_fee = self.applicable_by_fee.clone();
1406 let mut all_by_sender = self.all_by_sender.clone();
1407
1408 while !applicable_by_fee.is_empty() && txns.len() < limit {
1409 let (fee, mut set) = applicable_by_fee
1410 .iter()
1411 .max_by_key(|(rate, _)| *rate)
1412 .map(|(rate, set)| (rate.clone(), set.clone()))
1413 .unwrap();
1414
1415 let txn = set.iter().min_by_key(|b| &b.hash).cloned().unwrap();
1417
1418 {
1419 set.remove(&txn);
1420 if set.is_empty() {
1421 applicable_by_fee.remove(&fee);
1422 } else {
1423 applicable_by_fee.insert(fee, set);
1424 }
1425 }
1426
1427 let sender = txn.data.forget_check().fee_payer();
1428
1429 let (sender_queue, _amount) = all_by_sender.get_mut(&sender).unwrap();
1430 let head_txn = sender_queue.pop_front().unwrap();
1431
1432 if txn.hash == head_txn.hash {
1433 match sender_queue.front().cloned() {
1434 None => {
1435 all_by_sender.remove(&sender);
1436 }
1437 Some(next_txn) => {
1438 let fee = next_txn.data.forget_check().fee_per_wu();
1439 applicable_by_fee.entry(fee).or_default().insert(next_txn);
1440 }
1441 }
1442 } else {
1443 openmina_core::warn!(
1444 openmina_core::log::system_time();
1445 kind = "transaction pool", message = "Sender queue is malformed");
1446 all_by_sender.remove(&sender);
1447 }
1448
1449 txns.push(txn);
1450 }
1451 txns
1452 }
1453
1454 fn transactions(&mut self, limit: usize) -> Vec<ValidCommandWithHash> {
1458 let mut txns = Vec::with_capacity(self.applicable_by_fee.len());
1459 loop {
1460 if self.applicable_by_fee.is_empty() {
1461 assert!(self.all_by_sender.is_empty());
1462 return txns;
1463 }
1464
1465 if txns.len() >= limit {
1466 return txns;
1467 }
1468
1469 let (fee, mut set) = self
1470 .applicable_by_fee
1471 .iter()
1472 .max_by_key(|(rate, _)| *rate)
1473 .map(|(rate, set)| (rate.clone(), set.clone()))
1474 .unwrap();
1475
1476 let txn = set.iter().min_by_key(|b| &b.hash).cloned().unwrap();
1478
1479 {
1480 set.remove(&txn);
1481 if set.is_empty() {
1482 self.applicable_by_fee.remove(&fee);
1483 } else {
1484 self.applicable_by_fee.insert(fee, set);
1485 }
1486 }
1487
1488 let sender = txn.data.forget_check().fee_payer();
1489
1490 let (sender_queue, _amount) = self.all_by_sender.get_mut(&sender).unwrap();
1491 let head_txn = sender_queue.pop_front().unwrap();
1492
1493 if txn.hash == head_txn.hash {
1494 match sender_queue.front().cloned() {
1495 None => {
1496 self.all_by_sender.remove(&sender);
1497 }
1498 Some(next_txn) => {
1499 let fee = next_txn.data.forget_check().fee_per_wu();
1500 self.applicable_by_fee
1501 .entry(fee)
1502 .or_default()
1503 .insert(next_txn);
1504 }
1505 }
1506 } else {
1507 openmina_core::warn!(
1508 openmina_core::log::system_time();
1509 kind = "transaction pool", message = "Sender queue is malformed");
1510 self.all_by_sender.remove(&sender);
1511 }
1512
1513 txns.push(txn);
1514 }
1515 }
1516
1517 fn get_all_transactions(&self) -> Vec<ValidCommandWithHash> {
1519 self.all_by_sender
1520 .values()
1521 .cloned()
1522 .flat_map(|(cmds, _)| cmds.into_iter())
1523 .collect()
1524 }
1525
1526 fn get_pending_amount_and_nonce(&self) -> HashMap<AccountId, (Option<Nonce>, Amount)> {
1527 self.all_by_sender
1529 .clone()
1530 .into_iter()
1531 .map(|(acc_id, (cmds, amount))| (acc_id, (cmds.back().unwrap().data.nonce(), amount)))
1532 .collect()
1533 }
1534}
1535
1536fn currency_consumed(cmd: &UserCommand) -> Result<Amount, CommandError> {
1537 use crate::scan_state::transaction_logic::signed_command::{Body::*, PaymentPayload};
1538
1539 let fee_amount = Amount::of_fee(&cmd.fee());
1540 let amount = match cmd {
1541 UserCommand::SignedCommand(c) => {
1542 match &c.payload.body {
1543 Payment(PaymentPayload { amount, .. }) => {
1544 *amount
1546 }
1547 StakeDelegation(_) => Amount::zero(),
1548 }
1549 }
1550 UserCommand::ZkAppCommand(_) => Amount::zero(),
1551 };
1552
1553 fee_amount
1554 .checked_add(&amount)
1555 .ok_or(CommandError::InvalidCurrencyConsumed)
1556}
1557
1558pub mod transaction_hash {
1559 use super::*;
1560
1561 pub fn hash_command(cmd: valid::UserCommand) -> ValidCommandWithHash {
1562 let hash = match &cmd {
1563 valid::UserCommand::SignedCommand(cmd) => {
1564 v2::MinaBaseSignedCommandStableV2::from(&**cmd)
1565 .hash()
1566 .unwrap()
1567 }
1568 valid::UserCommand::ZkAppCommand(cmd) => {
1569 let cmd = cmd.clone().forget();
1570 v2::MinaBaseZkappCommandTStableV1WireStableV1::from(&cmd)
1571 .hash()
1572 .unwrap()
1573 }
1574 };
1575
1576 WithHash { data: cmd, hash }
1577 }
1578}
1579
1580#[derive(Debug)]
1581pub enum ApplyDecision {
1582 Accept,
1583 Reject,
1584}
1585
1586const MAX_PER_15_SECONDS: usize = 10;
1587
1588#[derive(Clone, Debug, Serialize, Deserialize)]
1589pub struct TransactionPool {
1590 pub pool: IndexedPool,
1591 locally_generated_uncommitted: HashMap<ValidCommandWithHash, (redux::Timestamp, Batch)>,
1592 locally_generated_committed: HashMap<ValidCommandWithHash, (redux::Timestamp, Batch)>,
1593 current_batch: usize,
1594 remaining_in_batch: usize,
1595 pub config: Config,
1596 batcher: (),
1597 best_tip_diff_relay: Option<()>,
1598 verification_key_table: VkRefcountTable,
1599}
1600
1601impl TransactionPool {
1602 pub fn new(config: Config, consensus_constants: &ConsensusConstants) -> Self {
1603 Self {
1604 pool: IndexedPool::new(consensus_constants),
1605 locally_generated_uncommitted: Default::default(),
1606 locally_generated_committed: Default::default(),
1607 current_batch: 0,
1608 remaining_in_batch: 0,
1609 config,
1610 batcher: (),
1611 best_tip_diff_relay: None,
1612 verification_key_table: Default::default(),
1613 }
1614 }
1615
1616 pub fn size(&self) -> usize {
1617 self.pool.size()
1618 }
1619
1620 pub fn get_all_transactions(&self) -> Vec<ValidCommandWithHash> {
1621 self.pool.get_all_transactions()
1622 }
1623
1624 pub fn get_pending_amount_and_nonce(&self) -> HashMap<AccountId, (Option<Nonce>, Amount)> {
1625 self.pool.get_pending_amount_and_nonce()
1626 }
1627
1628 pub fn transactions(&mut self, limit: usize) -> Vec<ValidCommandWithHash> {
1629 self.pool.transactions(limit)
1630 }
1631
1632 pub fn list_includable_transactions(&self, limit: usize) -> Vec<ValidCommandWithHash> {
1633 self.pool.list_includable_transactions(limit)
1634 }
1635
1636 pub fn get_accounts_to_revalidate_on_new_best_tip(&self) -> BTreeSet<AccountId> {
1637 self.pool.all_by_sender.keys().cloned().collect()
1638 }
1639
1640 pub fn on_new_best_tip(
1641 &mut self,
1642 global_slot_since_genesis: Slot,
1643 accounts: &BTreeMap<AccountId, Account>,
1644 ) -> Result<Vec<ValidCommandWithHash>, CommandError> {
1645 let dropped = self.pool.revalidate(
1646 global_slot_since_genesis,
1647 RevalidateKind::EntirePool,
1648 |sender_id| {
1649 Some(
1650 accounts
1651 .get(sender_id)
1652 .cloned()
1653 .unwrap_or_else(Account::empty),
1654 )
1655 },
1656 )?;
1657
1658 let dropped_locally_generated = dropped
1659 .iter()
1660 .filter(|cmd| {
1661 let dropped_commited = self.locally_generated_committed.remove(cmd).is_some();
1662 let dropped_uncommited = self.locally_generated_uncommitted.remove(cmd).is_some();
1663 assert!(!(dropped_commited && dropped_uncommited));
1665 dropped_commited || dropped_uncommited
1666 })
1667 .collect::<Vec<_>>();
1668
1669 if !dropped_locally_generated.is_empty() {
1670 openmina_core::info!(
1671 openmina_core::log::system_time();
1672 kind = "transaction pool",
1673 message = "Dropped locally generated commands $cmds from pool when transition frontier was recreated.",
1674 dropped = format!("{dropped_locally_generated:?}")
1675 );
1676 }
1677
1678 Ok(dropped)
1679 }
1680
1681 fn has_sufficient_fee(&self, pool_max_size: usize, cmd: &valid::UserCommand) -> bool {
1682 match self.pool.min_fee() {
1683 None => true,
1684 Some(min_fee) => {
1685 if self.pool.size() >= pool_max_size {
1686 cmd.forget_check().fee_per_wu() > min_fee
1687 } else {
1688 true
1689 }
1690 }
1691 }
1692 }
1693
1694 fn drop_until_below_max_size(
1695 &mut self,
1696 pool_max_size: usize,
1697 ) -> Result<Vec<ValidCommandWithHash>, CommandError> {
1698 let mut list = Vec::new();
1699
1700 while self.pool.size() > pool_max_size {
1701 let dropped = self.pool.remove_lowest_fee()?;
1702 my_assert(!dropped.is_empty())?;
1703 list.extend(dropped)
1704 }
1705
1706 Ok(list)
1707 }
1708
1709 pub fn get_accounts_to_handle_transition_diff(
1710 &self,
1711 diff: &diff::BestTipDiff,
1712 ) -> (BTreeSet<AccountId>, BTreeSet<AccountId>) {
1713 let diff::BestTipDiff {
1714 new_commands,
1715 removed_commands,
1716 reorg_best_tip: _,
1717 } = diff;
1718
1719 let in_cmds = new_commands
1720 .iter()
1721 .chain(removed_commands)
1722 .flat_map(|cmd| cmd.data.forget_check().accounts_referenced())
1723 .collect::<BTreeSet<_>>();
1724
1725 let uncommitted = self
1726 .locally_generated_uncommitted
1727 .keys()
1728 .map(|cmd| cmd.data.fee_payer())
1729 .collect::<BTreeSet<_>>();
1730
1731 (in_cmds, uncommitted)
1732 }
1733
1734 pub fn handle_transition_frontier_diff(
1735 &mut self,
1736 global_slot_since_genesis: Slot,
1737 current_global_slot: Slot,
1738 diff: &diff::BestTipDiff,
1739 account_ids: &BTreeSet<AccountId>,
1740 accounts: &BTreeMap<AccountId, Account>,
1741 uncommited: &BTreeMap<AccountId, Account>,
1742 ) -> Result<(), String> {
1743 let diff::BestTipDiff {
1744 new_commands,
1745 removed_commands,
1746 reorg_best_tip: _,
1747 } = diff;
1748
1749 let (new_commands, removed_commands) = {
1751 let collect_hashed = |cmds: &[WithStatus<valid::UserCommand>]| {
1752 cmds.iter()
1753 .map(|cmd| transaction_hash::hash_command(cmd.data.clone()))
1754 .collect::<Vec<_>>()
1755 };
1756
1757 let mut new_commands = collect_hashed(new_commands);
1758 let mut removed_commands = collect_hashed(removed_commands);
1759
1760 #[allow(clippy::mutable_key_type)]
1761 let new_commands_set = new_commands.iter().collect::<HashSet<_>>();
1762 #[allow(clippy::mutable_key_type)]
1763 let removed_commands_set = removed_commands.iter().collect::<HashSet<_>>();
1764
1765 #[allow(clippy::mutable_key_type)]
1766 let duplicates = new_commands_set
1767 .intersection(&removed_commands_set)
1768 .map(|cmd| (*cmd).clone())
1769 .collect::<HashSet<_>>();
1770
1771 new_commands.retain(|cmd| !duplicates.contains(cmd));
1772 removed_commands.retain(|cmd| !duplicates.contains(cmd));
1773 (new_commands, removed_commands)
1774 };
1775
1776 let pool_max_size = self.config.pool_max_size;
1777
1778 self.verification_key_table.increment_list(&new_commands);
1779 self.verification_key_table
1780 .decrement_list(&removed_commands);
1781
1782 let mut dropped_backtrack = Vec::with_capacity(256);
1783 for cmd in removed_commands {
1784 if let Some(time_added) = self.locally_generated_committed.remove(&cmd) {
1785 self.locally_generated_uncommitted
1786 .insert(cmd.clone(), time_added);
1787 }
1788
1789 let dropped_seq = match self.pool.add_from_backtrack(
1790 global_slot_since_genesis,
1791 current_global_slot,
1792 cmd,
1793 ) {
1794 Ok(_) => self.drop_until_below_max_size(pool_max_size)?,
1795 Err(e) => return Err(format!("{:?}", e)),
1796 };
1797 dropped_backtrack.extend(dropped_seq);
1798 }
1799
1800 self.verification_key_table
1801 .decrement_hashed(&dropped_backtrack);
1802
1803 let locally_generated_dropped = dropped_backtrack
1804 .iter()
1805 .filter(|t| self.locally_generated_uncommitted.contains_key(t))
1806 .collect::<Vec<_>>();
1807
1808 let dropped_commands = {
1809 let accounts_to_check = account_ids;
1810 let existing_account_states_by_id = accounts;
1811
1812 let get_account = |id: &AccountId| {
1813 match existing_account_states_by_id.get(id) {
1814 Some(account) => Some(account.clone()),
1815 None => {
1816 if accounts_to_check.contains(id) {
1817 Some(Account::empty())
1818 } else {
1819 None
1820 }
1826 }
1827 }
1828 };
1829
1830 self.pool.revalidate(
1831 global_slot_since_genesis,
1832 RevalidateKind::Subset(accounts_to_check),
1833 get_account,
1834 )?
1835 };
1836
1837 let (committed_commands, dropped_commit_conflicts): (Vec<_>, Vec<_>) = {
1838 let command_hashes: HashSet<v2::TransactionHash> =
1839 new_commands.iter().map(|cmd| cmd.hash.clone()).collect();
1840
1841 dropped_commands
1842 .iter()
1843 .partition(|cmd| command_hashes.contains(&cmd.hash))
1844 };
1845
1846 for cmd in &committed_commands {
1847 self.verification_key_table.decrement_hashed([&**cmd]);
1848 if let Some(data) = self.locally_generated_uncommitted.remove(cmd) {
1849 let old = self
1850 .locally_generated_committed
1851 .insert((*cmd).clone(), data);
1852 my_assert(old.is_none())?;
1853 };
1854 }
1855
1856 let _commit_conflicts_locally_generated = dropped_commit_conflicts
1857 .iter()
1858 .filter(|cmd| self.locally_generated_uncommitted.remove(cmd).is_some());
1859
1860 for cmd in locally_generated_dropped {
1861 let remove_cmd = |this: &mut Self| {
1865 this.verification_key_table.decrement_hashed([cmd]);
1866 assert!(this.locally_generated_uncommitted.remove(cmd).is_some());
1867 };
1868
1869 if !self.locally_generated_committed.contains_key(cmd) {
1870 if !self.has_sufficient_fee(pool_max_size, &cmd.data) {
1871 remove_cmd(self)
1872 } else {
1873 let unchecked = &cmd.data;
1874 match uncommited.get(&unchecked.fee_payer()) {
1875 Some(account) => {
1876 match self.pool.add_from_gossip_exn(
1877 global_slot_since_genesis,
1878 current_global_slot,
1879 cmd,
1880 account.nonce,
1881 account.liquid_balance_at_slot(global_slot_since_genesis),
1882 ) {
1883 Err(_) => {
1884 remove_cmd(self);
1885 }
1886 Ok(_) => {
1887 self.verification_key_table.increment_hashed([cmd]);
1888 }
1889 }
1890 }
1891 None => {
1892 remove_cmd(self);
1893 }
1894 }
1895 }
1896 }
1897 }
1898
1899 let expired_commands = self.pool.remove_expired(global_slot_since_genesis)?;
1900 for cmd in &expired_commands {
1901 self.verification_key_table.decrement_hashed([cmd]);
1902 self.locally_generated_uncommitted.remove(cmd);
1903 }
1904
1905 Ok(())
1906 }
1907
1908 pub fn get_accounts_to_apply_diff(&self, diff: &diff::DiffVerified) -> BTreeSet<AccountId> {
1909 let fee_payer = |cmd: &ValidCommandWithHash| cmd.data.fee_payer();
1910 diff.list.iter().map(fee_payer).collect()
1911 }
1912
1913 fn apply(
1914 &mut self,
1915 time: redux::Timestamp,
1916 global_slot_since_genesis: Slot,
1917 current_global_slot: Slot,
1918 diff: &diff::DiffVerified,
1919 accounts: &BTreeMap<AccountId, Account>,
1920 is_sender_local: bool,
1921 ) -> Result<
1922 (
1923 ApplyDecision,
1924 Vec<ValidCommandWithHash>,
1925 Vec<(ValidCommandWithHash, diff::Error)>,
1926 HashSet<v2::TransactionHash>,
1927 ),
1928 String,
1929 > {
1930 let fee_payer = |cmd: &ValidCommandWithHash| cmd.data.fee_payer();
1931 let fee_payer_accounts = accounts;
1932
1933 let check_command = |pool: &IndexedPool, cmd: &ValidCommandWithHash| {
1934 if pool.member(cmd) {
1935 Err(diff::Error::Duplicate)
1936 } else {
1937 match fee_payer_accounts.get(&fee_payer(cmd)) {
1938 None => Err(diff::Error::FeePayerAccountNotFound),
1939 Some(account) => {
1940 if account.has_permission_to_send()
1941 && account.has_permission_to_increment_nonce()
1942 {
1943 Ok(())
1944 } else {
1945 Err(diff::Error::FeePayerNotPermittedToSend)
1946 }
1947 }
1948 }
1949 }
1950 };
1951
1952 let add_results = diff
1953 .list
1954 .iter()
1955 .map(|cmd| {
1956 let account = fee_payer_accounts
1957 .get(&fee_payer(cmd))
1958 .ok_or_else(|| "Fee payer not found".to_string())?;
1959
1960 let result: Result<_, diff::Error> = (|| {
1961 check_command(&self.pool, cmd)?;
1962
1963 match self.pool.add_from_gossip_exn(
1964 global_slot_since_genesis,
1965 current_global_slot,
1966 cmd,
1967 account.nonce,
1968 account.liquid_balance_at_slot(global_slot_since_genesis),
1969 ) {
1970 Ok(x) => Ok(x),
1971 Err(e) => Err(e.into()),
1972 }
1973 })();
1974
1975 match result {
1976 Ok((cmd, dropped)) => Ok(Ok((cmd, dropped))),
1977 Err(err) => Ok(Err((cmd, err))),
1978 }
1979 })
1980 .collect::<Result<Vec<Result<_, _>>, String>>()?;
1981
1982 let added_cmds = add_results
1983 .iter()
1984 .filter_map(|cmd| match cmd {
1985 Ok((cmd, _)) => Some(cmd),
1986 Err(_) => None,
1987 })
1988 .collect::<Vec<_>>();
1989
1990 let dropped_for_add = add_results
1991 .iter()
1992 .filter_map(|cmd| match cmd {
1993 Ok((_, dropped)) => Some(dropped),
1994 Err(_) => None,
1995 })
1996 .flatten()
1997 .collect::<Vec<_>>();
1998
1999 let dropped_for_size = self.drop_until_below_max_size(self.config.pool_max_size)?;
2000
2001 let all_dropped_cmds = dropped_for_add
2002 .iter()
2003 .copied()
2004 .chain(dropped_for_size.iter())
2005 .collect::<Vec<_>>();
2006
2007 {
2008 self.verification_key_table.increment_hashed(added_cmds);
2009 self.verification_key_table
2010 .decrement_hashed(all_dropped_cmds.iter().copied());
2011 };
2012
2013 let dropped_for_add_hashes: HashSet<&v2::TransactionHash> =
2014 dropped_for_add.iter().map(|cmd| &cmd.hash).collect();
2015 let dropped_for_size_hashes: HashSet<&v2::TransactionHash> =
2016 dropped_for_size.iter().map(|cmd| &cmd.hash).collect();
2017 let all_dropped_cmd_hashes: HashSet<v2::TransactionHash> = dropped_for_add_hashes
2018 .union(&dropped_for_size_hashes)
2019 .map(|hash| (*hash).clone())
2020 .collect();
2021
2022 if is_sender_local {
2027 for result in add_results.iter() {
2028 let Ok((cmd, _dropped)) = result else {
2029 continue;
2030 };
2031 if !all_dropped_cmd_hashes.contains(&cmd.hash) {
2032 self.register_locally_generated(time, cmd);
2033 }
2034 }
2035 }
2036
2037 let mut accepted = Vec::with_capacity(128);
2038 let mut rejected = Vec::with_capacity(128);
2039
2040 for result in &add_results {
2042 match result {
2043 Ok((cmd, _dropped)) => {
2044 if all_dropped_cmd_hashes.contains(&cmd.hash) {
2045 } else {
2047 accepted.push(cmd.clone());
2048 }
2049 }
2050 Err((cmd, error)) => {
2051 rejected.push(((*cmd).clone(), error.clone()));
2052 }
2053 }
2054 }
2055
2056 let decision = if rejected
2057 .iter()
2058 .any(|(_, error)| error.grounds_for_diff_rejection())
2059 {
2060 ApplyDecision::Reject
2061 } else {
2062 ApplyDecision::Accept
2063 };
2064
2065 Ok((decision, accepted, rejected, all_dropped_cmd_hashes))
2066 }
2067
2068 pub fn unsafe_apply(
2069 &mut self,
2070 time: redux::Timestamp,
2071 global_slot_since_genesis: Slot,
2072 current_global_slot: Slot,
2073 diff: &diff::DiffVerified,
2074 accounts: &BTreeMap<AccountId, Account>,
2075 is_sender_local: bool,
2076 ) -> Result<
2077 (
2078 ApplyDecision,
2079 Vec<ValidCommandWithHash>,
2080 Vec<(ValidCommandWithHash, diff::Error)>,
2081 HashSet<v2::TransactionHash>,
2082 ),
2083 String,
2084 > {
2085 let (decision, accepted, rejected, dropped) = self.apply(
2086 time,
2087 global_slot_since_genesis,
2088 current_global_slot,
2089 diff,
2090 accounts,
2091 is_sender_local,
2092 )?;
2093 Ok((decision, accepted, rejected, dropped))
2094 }
2095
2096 fn register_locally_generated(&mut self, time: redux::Timestamp, cmd: &ValidCommandWithHash) {
2097 match self.locally_generated_uncommitted.entry(cmd.clone()) {
2098 Entry::Occupied(mut entry) => {
2099 let (entry_time, _batch_num) = entry.get_mut();
2100 *entry_time = redux::Timestamp::global_now();
2101 }
2102 Entry::Vacant(entry) => {
2103 let batch_num = if self.remaining_in_batch > 0 {
2104 self.remaining_in_batch -= 1;
2105 self.current_batch
2106 } else {
2107 self.remaining_in_batch = MAX_PER_15_SECONDS - 1;
2108 self.current_batch += 1;
2109 self.current_batch
2110 };
2111 entry.insert((time, Batch::Of(batch_num)));
2112 }
2113 }
2114 }
2115
2116 pub fn prevalidate(&self, diff: diff::Diff) -> Result<diff::Diff, TransactionPoolErrors> {
2117 let well_formedness_errors: HashSet<_> = diff
2118 .list
2119 .iter()
2120 .flat_map(|cmd| match cmd.check_well_formedness() {
2121 Ok(()) => Vec::new(),
2122 Err(errors) => errors,
2123 })
2124 .collect();
2125
2126 if !well_formedness_errors.is_empty() {
2127 return Err(TransactionPoolErrors::BatchedErrors(
2128 well_formedness_errors
2129 .into_iter()
2130 .map(TransactionError::WellFormedness)
2131 .collect_vec(),
2132 ));
2133 }
2134
2135 Ok(diff)
2136 }
2137
2138 pub fn convert_diff_to_verifiable(
2139 &self,
2140 diff: diff::Diff,
2141 accounts: &BTreeMap<AccountId, Account>,
2142 ) -> Result<Vec<WithStatus<verifiable::UserCommand>>, TransactionPoolErrors> {
2143 let cs = diff
2144 .list
2145 .iter()
2146 .cloned()
2147 .map(|cmd| MaybeWithStatus { cmd, status: None })
2148 .collect::<Vec<_>>();
2149
2150 let diff = UserCommand::to_all_verifiable::<FromUnappliedSequence, _>(cs, |account_ids| {
2151 let mempool_vks: HashMap<_, _> = account_ids
2152 .iter()
2153 .map(|id| {
2154 let vks = self.verification_key_table.find_vks_by_account_id(id);
2155 let vks: HashMap<_, _> =
2156 vks.iter().map(|vk| (vk.hash(), (*vk).clone())).collect();
2157 (id.clone(), vks)
2158 })
2159 .collect();
2160
2161 let ledger_vks = UserCommand::load_vks_from_ledger_accounts(accounts);
2162 let ledger_vks: HashMap<_, _> = ledger_vks
2163 .into_iter()
2164 .map(|(id, vk)| {
2165 let mut map = HashMap::new();
2166 map.insert(vk.hash(), vk);
2167 (id, map)
2168 })
2169 .collect();
2170
2171 let new_map: HashMap<AccountId, HashMap<Fp, VerificationKeyWire>> = HashMap::new();
2172 let merged =
2173 mempool_vks
2174 .into_iter()
2175 .chain(ledger_vks)
2176 .fold(new_map, |mut accum, (id, map)| {
2177 let entry = accum.entry(id).or_default();
2178 for (hash, vk) in map {
2179 entry.insert(hash, vk);
2180 }
2181 accum
2182 });
2183
2184 from_unapplied_sequence::Cache::new(merged)
2185 })
2186 .map_err(TransactionPoolErrors::LoadingVK)?;
2187
2188 let diff = diff
2189 .into_iter()
2190 .map(|MaybeWithStatus { cmd, status: _ }| WithStatus {
2191 data: cmd,
2192 status: Applied,
2194 })
2195 .collect::<Vec<_>>();
2196
2197 Ok(diff)
2198 }
2199
2200 pub fn verify_proofs(
2201 &self,
2202 diff: Vec<WithStatus<verifiable::UserCommand>>,
2203 ) -> Result<Vec<valid::UserCommand>, TransactionPoolErrors> {
2204 let (verified, invalid): (Vec<_>, Vec<_>) = Verifier
2205 .verify_commands(diff, None)
2206 .into_iter()
2207 .partition(Result::is_ok);
2208
2209 let verified: Vec<_> = verified.into_iter().map(Result::unwrap).collect();
2210 let invalid: Vec<_> = invalid.into_iter().map(Result::unwrap_err).collect();
2211
2212 if !invalid.is_empty() {
2213 let transaction_pool_errors = invalid
2214 .into_iter()
2215 .map(TransactionError::Verifier)
2216 .collect();
2217 Err(TransactionPoolErrors::BatchedErrors(
2218 transaction_pool_errors,
2219 ))
2220 } else {
2221 Ok(verified)
2222 }
2223 }
2224
2225 fn get_rebroadcastable<F>(&mut self, has_timed_out: F) -> Vec<Vec<UserCommand>>
2226 where
2227 F: Fn(&redux::Timestamp) -> bool,
2228 {
2229 let log = |has_timed_out: bool, s: &str, cmd: &ValidCommandWithHash| -> bool {
2230 if has_timed_out {
2231 eprintln!("{}: {:?}", s, cmd);
2232 false
2233 } else {
2234 true
2235 }
2236 };
2237
2238 self.locally_generated_uncommitted
2239 .retain(|key, (time, _batch)| {
2240 log(
2241 has_timed_out(time),
2242 "No longer rebroadcasting uncommitted expired command",
2243 key,
2244 )
2245 });
2246 self.locally_generated_committed
2247 .retain(|key, (time, _batch)| {
2248 log(
2249 has_timed_out(time),
2250 "Removing committed locally generated expired command",
2251 key,
2252 )
2253 });
2254
2255 let mut rebroadcastable_txs = self
2256 .locally_generated_uncommitted
2257 .iter()
2258 .collect::<Vec<_>>();
2259
2260 rebroadcastable_txs.sort_by(|(txn1, (_, batch1)), (txn2, (_, batch2))| {
2261 use std::cmp::Ordering::Equal;
2262
2263 let get_nonce =
2264 |txn: &ValidCommandWithHash| txn.data.forget_check().applicable_at_nonce();
2265
2266 match batch1.cmp(batch2) {
2267 Equal => (),
2268 x => return x,
2269 }
2270 match get_nonce(txn1).cmp(&get_nonce(txn2)) {
2271 Equal => (),
2272 x => return x,
2273 }
2274 txn1.hash.cmp(&txn2.hash)
2275 });
2276
2277 rebroadcastable_txs
2278 .into_iter()
2279 .group_by(|(_txn, (_time, batch))| batch)
2280 .into_iter()
2281 .map(|(_batch, group_txns)| {
2282 group_txns
2283 .map(|(txn, _)| txn.data.forget_check())
2284 .collect::<Vec<_>>()
2285 })
2286 .collect::<Vec<_>>()
2287 }
2288}
2289
2290#[cfg(test)]
2291mod tests {
2292 use super::*;
2293
2294 #[test]
2296 fn test_map_merge() {
2297 let mut a = HashMap::new();
2298 a.insert(1, {
2299 let mut map = HashMap::new();
2300 map.insert(1, 10);
2301 map.insert(2, 12);
2302 map
2303 });
2304 let mut b = HashMap::new();
2305 b.insert(1, {
2306 let mut map = HashMap::new();
2307 map.insert(3, 20);
2308 map
2309 });
2310
2311 let new_map: HashMap<_, HashMap<_, _>> = HashMap::new();
2312 let merged = a
2313 .into_iter()
2314 .chain(b)
2315 .fold(new_map, |mut accum, (id, map)| {
2316 let entry = accum.entry(id).or_default();
2317 for (hash, vk) in map {
2318 entry.insert(hash, vk);
2319 }
2320 accum
2321 });
2322
2323 let one = merged.get(&1).unwrap();
2324 assert!(one.get(&1).is_some());
2325 assert!(one.get(&2).is_some());
2326 assert!(one.get(&3).is_some());
2327
2328 dbg!(merged);
2329 }
2330}