1use crate::{
2 scan_state::{
3 currency::{Amount, Balance, BlockTime, Fee, Magnitude, Nonce, Slot},
4 fee_rate::FeeRate,
5 transaction_logic::{
6 valid, verifiable,
7 zkapp_command::{
8 from_unapplied_sequence::{self, FromUnappliedSequence},
9 MaybeWithStatus, WithHash,
10 },
11 TransactionStatus::Applied,
12 UserCommand, WellFormednessError, WithStatus,
13 },
14 },
15 verifier::{Verifier, VerifierError},
16 Account, AccountId, BaseLedger, Mask, TokenId, VerificationKey, VerificationKeyWire,
17};
18use backtrace::Backtrace;
19use itertools::Itertools;
20use mina_core::{bug_condition, consensus::ConsensusConstants};
21use mina_curves::pasta::Fp;
22use mina_p2p_messages::{bigint::BigInt, v2};
23use serde::{Deserialize, Serialize};
24use std::{
25 borrow::{Borrow, Cow},
26 collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
27};
28
29#[derive(Debug, thiserror::Error)]
30pub enum TransactionPoolErrors {
31 #[error("Transaction pool errors: {0:?}")]
33 BatchedErrors(Vec<TransactionError>),
34 #[error("{0:?}")]
35 LoadingVK(String),
36 #[error("Unexpected error: {0}")]
38 Unexpected(String),
39}
40
41#[derive(Debug, thiserror::Error)]
42pub enum TransactionError {
43 #[error(transparent)]
44 Verifier(#[from] VerifierError),
45 #[error(transparent)]
46 WellFormedness(#[from] WellFormednessError),
47}
48
49impl From<String> for TransactionPoolErrors {
50 fn from(value: String) -> Self {
51 Self::Unexpected(value)
52 }
53}
54
55#[inline(never)]
56fn my_assert(v: bool) -> Result<(), CommandError> {
57 if !v {
58 let backtrace = Backtrace::new();
59 let s = format!("assert failed {:?}", backtrace);
60 bug_condition!("{:?}", s);
61 return Err(CommandError::Custom(Cow::Owned(s)));
62 }
63 Ok(())
64}
65
66mod consensus {
67 use crate::scan_state::currency::{BlockTimeSpan, Epoch, Length};
68
69 use super::*;
70
71 #[derive(Clone, Debug, Serialize, Deserialize)]
72 pub struct Constants {
73 k: Length,
74 delta: Length,
75 slots_per_sub_window: Length,
76 slots_per_window: Length,
77 sub_windows_per_window: Length,
78 slots_per_epoch: Length,
79 grace_period_slots: Length,
80 grace_period_end: Slot,
81 checkpoint_window_slots_per_year: Length,
82 checkpoint_window_size_in_slots: Length,
83 block_window_duration_ms: BlockTimeSpan,
84 slot_duration_ms: BlockTimeSpan,
85 epoch_duration: BlockTimeSpan,
86 delta_duration: BlockTimeSpan,
87 genesis_state_timestamp: BlockTime,
88 }
89
90 impl Constants {
91 pub fn create(constants: &ConsensusConstants) -> Self {
94 Constants {
95 k: Length::from_u32(constants.k),
96 delta: Length::from_u32(constants.delta),
97 slots_per_sub_window: Length::from_u32(constants.slots_per_sub_window),
98 slots_per_window: Length::from_u32(constants.slots_per_window),
99 sub_windows_per_window: Length::from_u32(constants.sub_windows_per_window),
100 slots_per_epoch: Length::from_u32(constants.slots_per_epoch),
101 grace_period_slots: Length::from_u32(constants.grace_period_slots),
102 grace_period_end: Slot::from_u32(constants.grace_period_end),
103 checkpoint_window_slots_per_year: Length::from_u32(
104 constants.checkpoint_window_slots_per_year,
105 ),
106 checkpoint_window_size_in_slots: Length::from_u32(
107 constants.checkpoint_window_size_in_slots,
108 ),
109 block_window_duration_ms: BlockTimeSpan::from_u64(
110 constants.block_window_duration_ms,
111 ),
112 slot_duration_ms: BlockTimeSpan::from_u64(constants.slot_duration_ms),
113 epoch_duration: BlockTimeSpan::from_u64(constants.epoch_duration),
114 delta_duration: BlockTimeSpan::from_u64(constants.delta_duration),
115 genesis_state_timestamp: constants.genesis_state_timestamp.into(),
116 }
117 }
118 }
119
120 impl Epoch {
122 fn of_time_exn(constants: &Constants, time: BlockTime) -> Result<Self, String> {
123 if time < constants.genesis_state_timestamp {
124 return Err(
125 "Epoch.of_time: time is earlier than genesis block timestamp".to_string(),
126 );
127 }
128
129 let time_since_genesis = time.diff(constants.genesis_state_timestamp);
130 let epoch = time_since_genesis.to_ms() / constants.epoch_duration.to_ms();
131 let epoch: u32 = epoch.try_into().unwrap();
132
133 Ok(Self::from_u32(epoch))
134 }
135
136 fn start_time(constants: &Constants, epoch: Self) -> BlockTime {
137 let ms = constants
138 .genesis_state_timestamp
139 .to_span_since_epoch()
140 .to_ms()
141 + ((epoch.as_u32() as u64) * constants.epoch_duration.to_ms());
142 BlockTime::of_span_since_epoch(BlockTimeSpan::of_ms(ms))
143 }
144
145 pub fn epoch_and_slot_of_time_exn(
146 constants: &Constants,
147 time: BlockTime,
148 ) -> Result<(Self, Slot), String> {
149 let epoch = Self::of_time_exn(constants, time)?;
150 let time_since_epoch = time.diff(Self::start_time(constants, epoch));
151
152 let slot: u64 = time_since_epoch.to_ms() / constants.slot_duration_ms.to_ms();
153 let slot = Slot::from_u32(slot.try_into().unwrap());
154
155 Ok((epoch, slot))
156 }
157 }
158
159 pub struct GlobalSlot {
161 slot_number: Slot,
162 slots_per_epoch: Length,
163 }
164
165 impl GlobalSlot {
166 fn create(constants: &Constants, epoch: Epoch, slot: Slot) -> Self {
167 let slot_number = slot.as_u32() + (constants.slots_per_epoch.as_u32() * epoch.as_u32());
168 Self {
169 slot_number: Slot::from_u32(slot_number),
170 slots_per_epoch: constants.slots_per_epoch,
171 }
172 }
173
174 fn of_epoch_and_slot(constants: &Constants, (epoch, slot): (Epoch, Slot)) -> Self {
175 Self::create(constants, epoch, slot)
176 }
177
178 pub fn of_time_exn(constants: &Constants, time: BlockTime) -> Result<Self, String> {
179 Ok(Self::of_epoch_and_slot(
180 constants,
181 Epoch::epoch_and_slot_of_time_exn(constants, time)?,
182 ))
183 }
184
185 pub fn to_global_slot(&self) -> Slot {
186 let Self {
187 slot_number,
188 slots_per_epoch: _,
189 } = self;
190 *slot_number
191 }
192 }
193}
194
195const REPLACE_FEE: Fee = Fee::of_nanomina_int_exn(1);
197
198pub type ValidCommandWithHash = WithHash<valid::UserCommand, v2::TransactionHash>;
199
200pub mod diff {
201 use super::*;
202
203 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, strum_macros::Display)]
204 pub enum Error {
205 InsufficientReplaceFee,
206 Duplicate,
207 InvalidNonce,
208 InsufficientFunds,
209 Overflow,
210 BadToken,
211 UnwantedFeeToken,
212 Expired,
213 Overloaded,
214 FeePayerAccountNotFound,
215 FeePayerNotPermittedToSend,
216 AfterSlotTxEnd,
217 BacktrackNonceMismatch,
218 InvalidCurrencyConsumed,
219 Custom,
220 }
221
222 impl Error {
223 pub fn grounds_for_diff_rejection(&self) -> bool {
224 match self {
225 Error::InsufficientReplaceFee
226 | Error::Duplicate
227 | Error::InvalidNonce
228 | Error::InsufficientFunds
229 | Error::Expired
230 | Error::Overloaded
231 | Error::FeePayerAccountNotFound
232 | Error::FeePayerNotPermittedToSend
233 | Error::AfterSlotTxEnd
234 | Error::InvalidCurrencyConsumed
235 | Error::Custom
236 | Error::BacktrackNonceMismatch => false,
237 Error::Overflow | Error::BadToken | Error::UnwantedFeeToken => true,
238 }
239 }
240 }
241
242 #[derive(Debug)]
243 pub struct Diff {
244 pub list: Vec<UserCommand>,
245 }
246
247 #[derive(Serialize, Deserialize, Debug, Clone)]
248 pub struct DiffVerified {
249 pub list: Vec<ValidCommandWithHash>,
250 }
251
252 struct Rejected {
253 list: Vec<(UserCommand, Error)>,
254 }
255
256 #[derive(Serialize, Deserialize, Debug, Clone)]
257 pub struct BestTipDiff {
258 pub new_commands: Vec<WithStatus<valid::UserCommand>>,
259 pub removed_commands: Vec<WithStatus<valid::UserCommand>>,
260 pub reorg_best_tip: bool,
261 }
262}
263
264fn preload_accounts(
265 ledger: &Mask,
266 account_ids: &BTreeSet<AccountId>,
267) -> HashMap<AccountId, Account> {
268 account_ids
269 .iter()
270 .filter_map(|id| {
271 let addr = ledger.location_of_account(id)?;
272 let account = ledger.get(addr)?;
273 Some((id.clone(), *account))
274 })
275 .collect()
276}
277
278#[derive(Clone, Debug, Serialize, Deserialize)]
279pub struct Config {
280 pub trust_system: (),
281 pub pool_max_size: usize,
282 pub slot_tx_end: Option<Slot>,
283}
284
285#[derive(Serialize, Deserialize)]
287struct VkRefcountTableBigInts {
288 verification_keys: Vec<(BigInt, (usize, WithHash<VerificationKey, BigInt>))>,
289 account_id_to_vks: Vec<(AccountId, Vec<(BigInt, usize)>)>,
290 vk_to_account_ids: Vec<(BigInt, Vec<(AccountId, usize)>)>,
291}
292impl From<VkRefcountTable> for VkRefcountTableBigInts {
293 fn from(value: VkRefcountTable) -> Self {
294 let VkRefcountTable {
295 verification_keys,
296 account_id_to_vks,
297 vk_to_account_ids,
298 } = value;
299 Self {
300 verification_keys: verification_keys
301 .into_iter()
302 .map(|(hash, (count, vk))| {
303 assert_eq!(hash, vk.hash());
304 let hash: BigInt = hash.into();
305 (
306 hash.clone(),
307 (
308 count,
309 WithHash {
310 data: vk.vk().clone(),
311 hash,
312 },
313 ),
314 )
315 })
316 .collect(),
317 account_id_to_vks: account_id_to_vks
318 .into_iter()
319 .map(|(id, map)| {
320 (
321 id,
322 map.into_iter()
323 .map(|(hash, count)| (hash.into(), count))
324 .collect(),
325 )
326 })
327 .collect(),
328 vk_to_account_ids: vk_to_account_ids
329 .into_iter()
330 .map(|(hash, map)| (hash.into(), map.into_iter().collect()))
331 .collect(),
332 }
333 }
334}
335impl From<VkRefcountTableBigInts> for VkRefcountTable {
336 fn from(value: VkRefcountTableBigInts) -> Self {
337 let VkRefcountTableBigInts {
338 verification_keys,
339 account_id_to_vks,
340 vk_to_account_ids,
341 } = value;
342 Self {
343 verification_keys: verification_keys
344 .into_iter()
345 .map(|(hash, (count, vk))| {
346 assert_eq!(hash, vk.hash);
347 let hash: Fp = hash.to_field().unwrap(); (hash, (count, VerificationKeyWire::with_hash(vk.data, hash)))
349 })
350 .collect(),
351 account_id_to_vks: account_id_to_vks
352 .into_iter()
353 .map(|(id, map)| {
354 let map = map
355 .into_iter()
356 .map(|(bigint, count)| (bigint.to_field::<Fp>().unwrap(), count)) .collect();
358 (id, map)
359 })
360 .collect(),
361 vk_to_account_ids: vk_to_account_ids
362 .into_iter()
363 .map(|(hash, map)| (hash.to_field().unwrap(), map.into_iter().collect())) .collect(),
365 }
366 }
367}
368
369#[derive(Clone, Debug, Default, Serialize, Deserialize)]
370#[serde(into = "VkRefcountTableBigInts")]
371#[serde(from = "VkRefcountTableBigInts")]
372struct VkRefcountTable {
373 verification_keys: HashMap<Fp, (usize, VerificationKeyWire)>,
374 account_id_to_vks: HashMap<AccountId, HashMap<Fp, usize>>,
375 vk_to_account_ids: HashMap<Fp, HashMap<AccountId, usize>>,
376}
377
378impl VkRefcountTable {
379 fn find_vk(&self, f: &Fp) -> Option<&(usize, VerificationKeyWire)> {
380 self.verification_keys.get(f)
381 }
382
383 fn find_vks_by_account_id(&self, account_id: &AccountId) -> Vec<&VerificationKeyWire> {
384 let Some(vks) = self.account_id_to_vks.get(account_id) else {
385 return Vec::new();
386 };
387
388 vks.iter()
389 .map(|(f, _)| self.find_vk(f).expect("malformed Vk_refcount_table.t"))
390 .map(|(_, vk)| vk)
391 .collect()
392 }
393
394 fn inc(&mut self, account_id: AccountId, vk: VerificationKeyWire) {
395 use std::collections::hash_map::Entry::{Occupied, Vacant};
396
397 match self.verification_keys.entry(vk.hash()) {
398 Vacant(e) => {
399 e.insert((1, vk.clone()));
400 }
401 Occupied(mut e) => {
402 let (count, _vk) = e.get_mut();
403 *count += 1;
404 }
405 }
406
407 let map = self
408 .account_id_to_vks
409 .entry(account_id.clone())
410 .or_default(); let count = map.entry(vk.hash()).or_default(); *count += 1;
413
414 let map = self.vk_to_account_ids.entry(vk.hash()).or_default(); let count = map.entry(account_id).or_default(); *count += 1;
417 }
418
419 fn dec(&mut self, account_id: AccountId, vk_hash: Fp) {
420 use std::collections::hash_map::Entry::{Occupied, Vacant};
421
422 match self.verification_keys.entry(vk_hash) {
423 Vacant(_e) => {
424 bug_condition!("vk_map: Unexpected error on self.verification_keys: vacant vk_hash")
425 }
426 Occupied(mut e) => {
427 let (count, _vk) = e.get_mut();
428 if *count == 1 {
429 e.remove();
430 } else {
431 *count = (*count).checked_sub(1).unwrap();
432 }
433 }
434 }
435
436 fn remove<K1, K2>(
437 key1: K1,
438 key2: K2,
439 table: &mut HashMap<K1, HashMap<K2, usize>>,
440 ) -> Result<(), &'static str>
441 where
442 K1: std::hash::Hash + Eq,
443 K2: std::hash::Hash + Eq,
444 {
445 match table.entry(key1) {
446 Vacant(_e) => return Err("vacant on key1"),
447 Occupied(mut e) => {
448 let map = e.get_mut();
449 match map.entry(key2) {
450 Vacant(_e) => return Err("vacant on key2"),
451 Occupied(mut e2) => {
452 let count: &mut usize = e2.get_mut();
453 if *count == 1 {
454 e2.remove();
455 e.remove();
456 } else {
457 *count = count.checked_sub(1).ok_or("invalid count state")?
458 }
459 }
460 }
461 }
462 }
463 Ok(())
464 }
465
466 if let Err(e) = remove(account_id.clone(), vk_hash, &mut self.account_id_to_vks) {
467 bug_condition!(
468 "vk_map: Unexpected error on self.account_id_to_vks: {:?}",
469 e
470 );
471 }
472 if let Err(e) = remove(vk_hash, account_id.clone(), &mut self.vk_to_account_ids) {
473 bug_condition!(
474 "vk_map: Unexpected error on self.vk_to_account_ids: {:?}",
475 e
476 );
477 }
478 }
479
480 fn decrement_list(&mut self, list: &[ValidCommandWithHash]) {
481 list.iter().for_each(|c| {
482 for (id, vk) in c.data.forget_check().extract_vks() {
483 self.dec(id, vk.hash());
484 }
485 });
486 }
487
488 fn decrement_hashed<I, V>(&mut self, list: I)
489 where
490 I: IntoIterator<Item = V>,
491 V: Borrow<ValidCommandWithHash>,
492 {
493 list.into_iter().for_each(|c| {
494 for (id, vk) in c.borrow().data.forget_check().extract_vks() {
495 self.dec(id, vk.hash());
496 }
497 });
498 }
499
500 fn increment_hashed<I, V>(&mut self, list: I)
501 where
502 I: IntoIterator<Item = V>,
503 V: Borrow<ValidCommandWithHash>,
504 {
505 list.into_iter().for_each(|c| {
506 for (id, vk) in c.borrow().data.forget_check().extract_vks() {
507 self.inc(id, vk);
508 }
509 });
510 }
511
512 fn increment_list(&mut self, list: &[ValidCommandWithHash]) {
513 list.iter().for_each(|c| {
514 for (id, vk) in c.data.forget_check().extract_vks() {
515 self.inc(id, vk);
516 }
517 });
518 }
519}
520
521#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
522enum Batch {
523 Of(usize),
524}
525
526#[derive(Debug)]
527pub enum CommandError {
528 InvalidNonce {
529 account_nonce: Nonce,
530 expected: Nonce,
531 },
532 InsufficientFunds {
533 balance: Balance,
534 consumed: Amount,
535 },
536 InsufficientReplaceFee {
539 replace_fee: Fee,
540 fee: Fee,
541 },
542 Overflow,
543 BadToken,
544 Expired {
545 valid_until: Slot,
546 global_slot_since_genesis: Slot,
547 },
548 UnwantedFeeToken {
549 token_id: TokenId,
550 },
551 AfterSlotTxEnd,
552 BacktrackNonceMismatch {
553 expected_nonce: Nonce,
554 first_nonce: Nonce,
555 },
556 InvalidCurrencyConsumed,
557 Custom(Cow<'static, str>),
558}
559
560impl From<CommandError> for String {
561 fn from(value: CommandError) -> Self {
562 format!("{:?}", value)
563 }
564}
565
566impl From<CommandError> for diff::Error {
567 fn from(value: CommandError) -> Self {
568 match value {
569 CommandError::InvalidNonce { .. } => diff::Error::InvalidNonce,
570 CommandError::InsufficientFunds { .. } => diff::Error::InsufficientFunds,
571 CommandError::InsufficientReplaceFee { .. } => diff::Error::InsufficientReplaceFee,
572 CommandError::Overflow => diff::Error::Overflow,
573 CommandError::BadToken => diff::Error::BadToken,
574 CommandError::Expired { .. } => diff::Error::Expired,
575 CommandError::UnwantedFeeToken { .. } => diff::Error::UnwantedFeeToken,
576 CommandError::AfterSlotTxEnd => diff::Error::AfterSlotTxEnd,
577 CommandError::BacktrackNonceMismatch { .. } => diff::Error::BacktrackNonceMismatch,
578 CommandError::InvalidCurrencyConsumed => diff::Error::InvalidCurrencyConsumed,
579 CommandError::Custom(_) => diff::Error::Custom,
580 }
581 }
582}
583
584#[derive(Clone, Debug, Serialize, Deserialize)]
585pub struct IndexedPoolConfig {
586 pub consensus_constants: consensus::Constants,
587 slot_tx_end: Option<Slot>,
588}
589
590#[derive(Clone, Debug, Serialize, Deserialize)]
591pub struct IndexedPool {
592 applicable_by_fee: HashMap<FeeRate, HashSet<ValidCommandWithHash>>,
595 all_by_sender: HashMap<AccountId, (VecDeque<ValidCommandWithHash>, Amount)>,
600 all_by_fee: HashMap<FeeRate, HashSet<ValidCommandWithHash>>,
602 all_by_hash: HashMap<v2::TransactionHash, ValidCommandWithHash>,
603 transactions_with_expiration: HashMap<Slot, HashSet<ValidCommandWithHash>>,
605 size: usize,
606 pub config: IndexedPoolConfig,
607}
608
609enum Update {
610 Add {
611 command: ValidCommandWithHash,
612 fee_per_wu: FeeRate,
613 add_to_applicable_by_fee: bool,
614 },
615 RemoveAllByFeeAndHashAndExpiration {
616 commands: VecDeque<ValidCommandWithHash>,
617 },
618 RemoveFromApplicableByFee {
619 fee_per_wu: FeeRate,
620 command: ValidCommandWithHash,
621 },
622}
623
624#[derive(Clone)]
625struct SenderState {
626 sender: AccountId,
627 state: Option<(VecDeque<ValidCommandWithHash>, Amount)>,
628}
629
630pub enum RevalidateKind<'a> {
631 EntirePool,
632 Subset(&'a BTreeSet<AccountId>),
633}
634
635impl IndexedPool {
636 fn new(constants: &ConsensusConstants) -> Self {
637 Self {
638 applicable_by_fee: HashMap::new(),
639 all_by_sender: HashMap::new(),
640 all_by_fee: HashMap::new(),
641 all_by_hash: HashMap::new(),
642 transactions_with_expiration: HashMap::new(),
643 size: 0,
644 config: IndexedPoolConfig {
645 consensus_constants: consensus::Constants::create(constants),
646 slot_tx_end: None,
647 },
648 }
649 }
650
651 fn size(&self) -> usize {
652 self.size
653 }
654
655 fn min_fee(&self) -> Option<FeeRate> {
656 self.all_by_fee.keys().min().cloned()
657 }
658
659 fn member(&self, cmd: &ValidCommandWithHash) -> bool {
660 self.all_by_hash.contains_key(&cmd.hash)
661 }
662
663 pub fn get(&self, hash: &v2::TransactionHash) -> Option<&ValidCommandWithHash> {
664 self.all_by_hash.get(hash)
665 }
666
667 fn check_expiry(
668 &self,
669 global_slot_since_genesis: Slot,
670 cmd: &UserCommand,
671 ) -> Result<(), CommandError> {
672 let valid_until = cmd.valid_until();
673
674 if valid_until < global_slot_since_genesis {
675 return Err(CommandError::Expired {
676 valid_until,
677 global_slot_since_genesis,
678 });
679 }
680
681 Ok(())
682 }
683
684 fn map_set_insert<K, V>(map: &mut HashMap<K, HashSet<V>>, key: K, value: V)
686 where
687 K: std::hash::Hash + PartialEq + Eq,
688 V: std::hash::Hash + PartialEq + Eq,
689 {
690 let entry = map.entry(key).or_default();
691 entry.insert(value);
692 }
693
694 fn map_set_remove<K, V>(map: &mut HashMap<K, HashSet<V>>, key: K, value: &V)
696 where
697 K: std::hash::Hash + PartialEq + Eq,
698 V: std::hash::Hash + PartialEq + Eq,
699 {
700 let Entry::Occupied(mut entry) = map.entry(key) else {
701 return;
702 };
703 let set = entry.get_mut();
704 set.remove(value);
705 if set.is_empty() {
706 entry.remove();
707 }
708 }
709
710 fn update_expiration_map(&mut self, cmd: ValidCommandWithHash, is_add: bool) {
711 let user_cmd = cmd.data.forget_check();
712 let expiry = user_cmd.valid_until();
713 if expiry == Slot::max() {
714 return; }
716 if is_add {
717 Self::map_set_insert(&mut self.transactions_with_expiration, expiry, cmd);
718 } else {
719 Self::map_set_remove(&mut self.transactions_with_expiration, expiry, &cmd);
720 }
721 }
722
723 fn remove_from_expiration_exn(&mut self, cmd: ValidCommandWithHash) {
724 self.update_expiration_map(cmd, false);
725 }
726
727 fn add_to_expiration(&mut self, cmd: ValidCommandWithHash) {
728 self.update_expiration_map(cmd, true);
729 }
730
731 fn remove_applicable_exn(&mut self, cmd: &ValidCommandWithHash) {
734 let fee_per_wu = cmd.data.forget_check().fee_per_wu();
735 Self::map_set_remove(&mut self.applicable_by_fee, fee_per_wu, cmd);
736 }
737
738 fn make_queue<T>() -> VecDeque<T> {
739 VecDeque::with_capacity(256)
740 }
741
742 fn add_from_backtrack(
743 &mut self,
744 global_slot_since_genesis: Slot,
745 current_global_slot: Slot,
746 cmd: ValidCommandWithHash,
747 ) -> Result<(), CommandError> {
748 let IndexedPoolConfig { slot_tx_end, .. } = &self.config;
749
750 if !slot_tx_end
751 .as_ref()
752 .map(|slot_tx_end| current_global_slot < *slot_tx_end)
753 .unwrap_or(true)
754 {
755 return Err(CommandError::AfterSlotTxEnd);
756 }
757
758 let ValidCommandWithHash {
759 data: unchecked,
760 hash: cmd_hash,
761 } = &cmd;
762 let unchecked = unchecked.forget_check();
763
764 self.check_expiry(global_slot_since_genesis, &unchecked)?;
765
766 let fee_payer = unchecked.fee_payer();
767 let fee_per_wu = unchecked.fee_per_wu();
768
769 let consumed = currency_consumed(&unchecked)?;
770
771 match self.all_by_sender.get_mut(&fee_payer) {
772 None => {
773 {
774 let mut queue = Self::make_queue();
775 queue.push_back(cmd.clone());
776 self.all_by_sender.insert(fee_payer, (queue, consumed));
777 }
778 Self::map_set_insert(&mut self.all_by_fee, fee_per_wu.clone(), cmd.clone());
779 self.all_by_hash.insert(cmd_hash.clone(), cmd.clone());
780 Self::map_set_insert(&mut self.applicable_by_fee, fee_per_wu.clone(), cmd.clone());
781 self.add_to_expiration(cmd);
782 self.size += 1;
783 }
784 Some((queue, currency_reserved)) => {
785 let first_queued = queue.front().cloned().unwrap();
786 let expected_nonce = unchecked.expected_target_nonce();
787 let first_nonce = first_queued.data.forget_check().applicable_at_nonce();
788
789 if expected_nonce != first_nonce {
790 return Err(CommandError::BacktrackNonceMismatch {
793 expected_nonce,
794 first_nonce,
795 });
796 }
797
798 {
800 queue.push_front(cmd.clone());
801 *currency_reserved = currency_reserved.checked_add(&consumed).unwrap();
802 }
803
804 self.remove_applicable_exn(&first_queued);
805
806 Self::map_set_insert(&mut self.applicable_by_fee, fee_per_wu.clone(), cmd.clone());
807 Self::map_set_insert(&mut self.all_by_fee, fee_per_wu.clone(), cmd.clone());
808 self.all_by_hash.insert(cmd_hash.clone(), cmd.clone());
809 self.add_to_expiration(cmd);
810 self.size += 1;
811 }
812 }
813 Ok(())
814 }
815
816 fn update_add(
817 &mut self,
818 cmd: ValidCommandWithHash,
819 fee_per_wu: FeeRate,
820 add_to_applicable_by_fee: bool,
821 ) {
822 if add_to_applicable_by_fee {
823 Self::map_set_insert(&mut self.applicable_by_fee, fee_per_wu.clone(), cmd.clone());
824 }
825
826 let cmd_hash = cmd.hash.clone();
827
828 Self::map_set_insert(&mut self.all_by_fee, fee_per_wu, cmd.clone());
829 self.all_by_hash.insert(cmd_hash, cmd.clone());
830 self.add_to_expiration(cmd);
831 self.size += 1;
832 }
833
834 fn update_remove_all_by_fee_and_hash_and_expiration<I>(&mut self, cmds: I)
837 where
838 I: IntoIterator<Item = ValidCommandWithHash>,
839 {
840 for cmd in cmds {
841 let fee_per_wu = cmd.data.forget_check().fee_per_wu();
842 let cmd_hash = cmd.hash.clone();
843 Self::map_set_remove(&mut self.all_by_fee, fee_per_wu, &cmd);
844 self.all_by_hash.remove(&cmd_hash);
845 self.remove_from_expiration_exn(cmd);
846 self.size = self.size.checked_sub(1).unwrap();
847 }
848 }
849
850 fn update_remove_from_applicable_by_fee(
851 &mut self,
852 fee_per_wu: FeeRate,
853 command: &ValidCommandWithHash,
854 ) {
855 Self::map_set_remove(&mut self.applicable_by_fee, fee_per_wu, command)
856 }
857
858 fn remove_with_dependents_exn(
859 &mut self,
860 cmd: &ValidCommandWithHash,
861 ) -> Result<VecDeque<ValidCommandWithHash>, CommandError> {
862 let sender = cmd.data.fee_payer();
863 let mut by_sender = SenderState {
864 state: self.all_by_sender.get(&sender).cloned(),
865 sender,
866 };
867
868 let mut updates = Vec::<Update>::with_capacity(128);
869 let result = self.remove_with_dependents_exn_impl(cmd, &mut by_sender, &mut updates);
870
871 self.set_sender(by_sender);
872 self.apply_updates(updates);
873
874 result
875 }
876
877 fn remove_with_dependents_exn_impl(
878 &self,
879 cmd: &ValidCommandWithHash,
880 by_sender: &mut SenderState,
881 updates: &mut Vec<Update>,
882 ) -> Result<VecDeque<ValidCommandWithHash>, CommandError> {
883 let (sender_queue, reserved_currency_ref) = by_sender.state.as_mut().unwrap();
884 let unchecked = cmd.data.forget_check();
885
886 my_assert(!sender_queue.is_empty())?;
887
888 let cmd_nonce = unchecked.applicable_at_nonce();
889
890 let cmd_index = sender_queue
891 .iter()
892 .position(|cmd| {
893 let nonce = cmd.data.forget_check().applicable_at_nonce();
894 nonce == cmd_nonce
896 })
897 .unwrap();
898
899 let drop_queue = sender_queue.split_off(cmd_index);
900 let keep_queue = sender_queue;
901 my_assert(!drop_queue.is_empty())?;
902
903 let currency_to_remove = drop_queue.iter().try_fold(Amount::zero(), |acc, cmd| {
904 let consumed = currency_consumed(&cmd.data.forget_check())?;
905 Ok(consumed.checked_add(&acc).unwrap())
906 })?;
907
908 let reserved_currency = reserved_currency_ref
911 .checked_sub(¤cy_to_remove)
912 .unwrap();
913
914 updates.push(Update::RemoveAllByFeeAndHashAndExpiration {
915 commands: drop_queue.clone(),
916 });
917
918 if cmd_index == 0 {
919 updates.push(Update::RemoveFromApplicableByFee {
920 fee_per_wu: unchecked.fee_per_wu(),
921 command: cmd.clone(),
922 });
923 }
924
925 if !keep_queue.is_empty() {
928 *reserved_currency_ref = reserved_currency;
929 } else {
930 my_assert(reserved_currency.is_zero())?;
931 by_sender.state = None;
932 }
933
934 Ok(drop_queue)
935 }
936
937 fn apply_updates(&mut self, updates: Vec<Update>) {
938 for update in updates {
939 match update {
940 Update::Add {
941 command,
942 fee_per_wu,
943 add_to_applicable_by_fee,
944 } => self.update_add(command, fee_per_wu, add_to_applicable_by_fee),
945 Update::RemoveAllByFeeAndHashAndExpiration { commands } => {
946 self.update_remove_all_by_fee_and_hash_and_expiration(commands)
947 }
948 Update::RemoveFromApplicableByFee {
949 fee_per_wu,
950 command,
951 } => self.update_remove_from_applicable_by_fee(fee_per_wu, &command),
952 }
953 }
954 }
955
956 fn set_sender(&mut self, by_sender: SenderState) {
957 let SenderState { sender, state } = by_sender;
958
959 match state {
960 Some(state) => {
961 self.all_by_sender.insert(sender, state);
962 }
963 None => {
964 self.all_by_sender.remove(&sender);
965 }
966 }
967 }
968
969 fn add_from_gossip_exn(
970 &mut self,
971 global_slot_since_genesis: Slot,
972 current_global_slot: Slot,
973 cmd: &ValidCommandWithHash,
974 current_nonce: Nonce,
975 balance: Balance,
976 ) -> Result<(ValidCommandWithHash, VecDeque<ValidCommandWithHash>), CommandError> {
977 let sender = cmd.data.fee_payer();
978 let mut by_sender = SenderState {
979 state: self.all_by_sender.get(&sender).cloned(),
980 sender,
981 };
982
983 let mut updates = Vec::<Update>::with_capacity(128);
984 let result = self.add_from_gossip_exn_impl(
985 global_slot_since_genesis,
986 current_global_slot,
987 cmd,
988 current_nonce,
989 balance,
990 &mut by_sender,
991 &mut updates,
992 )?;
993
994 self.set_sender(by_sender);
995 self.apply_updates(updates);
996
997 Ok(result)
998 }
999
1000 fn add_from_gossip_exn_impl(
1001 &self,
1002 global_slot_since_genesis: Slot,
1003 current_global_slot: Slot,
1004 cmd: &ValidCommandWithHash,
1005 current_nonce: Nonce,
1006 balance: Balance,
1007 by_sender: &mut SenderState,
1008 updates: &mut Vec<Update>,
1009 ) -> Result<(ValidCommandWithHash, VecDeque<ValidCommandWithHash>), CommandError> {
1010 let IndexedPoolConfig { slot_tx_end, .. } = &self.config;
1011
1012 if !slot_tx_end
1013 .as_ref()
1014 .map(|slot_tx_end| current_global_slot < *slot_tx_end)
1015 .unwrap_or(true)
1016 {
1017 return Err(CommandError::AfterSlotTxEnd);
1018 }
1019
1020 let unchecked = cmd.data.forget_check();
1021 let fee = unchecked.fee();
1022 let fee_per_wu = unchecked.fee_per_wu();
1023 let cmd_applicable_at_nonce = unchecked.applicable_at_nonce();
1024
1025 let consumed = {
1026 self.check_expiry(global_slot_since_genesis, &unchecked)?;
1027 let consumed = currency_consumed(&unchecked).map_err(|_| CommandError::Overflow)?;
1028 if !unchecked.fee_token().is_default() {
1029 return Err(CommandError::UnwantedFeeToken {
1030 token_id: unchecked.fee_token(),
1031 });
1032 }
1033 consumed
1034 };
1035
1036 match by_sender.state.clone() {
1037 None => {
1038 if current_nonce != cmd_applicable_at_nonce {
1039 return Err(CommandError::InvalidNonce {
1040 account_nonce: current_nonce,
1041 expected: cmd_applicable_at_nonce,
1042 });
1043 }
1044 if consumed > balance.to_amount() {
1045 return Err(CommandError::InsufficientFunds { balance, consumed });
1046 }
1047
1048 let mut queue = Self::make_queue();
1049 queue.push_back(cmd.clone());
1050 by_sender.state = Some((queue, consumed));
1051
1052 updates.push(Update::Add {
1053 command: cmd.clone(),
1054 fee_per_wu,
1055 add_to_applicable_by_fee: true,
1056 });
1057
1058 Ok((cmd.clone(), Self::make_queue()))
1059 }
1060 Some((mut queued_cmds, reserved_currency)) => {
1061 my_assert(!queued_cmds.is_empty())?;
1062 let queue_applicable_at_nonce = {
1063 let first = queued_cmds.front().unwrap();
1064 first.data.forget_check().applicable_at_nonce()
1065 };
1066 let queue_target_nonce = {
1067 let last = queued_cmds.back().unwrap();
1068 last.data.forget_check().expected_target_nonce()
1069 };
1070 if queue_target_nonce == cmd_applicable_at_nonce {
1071 let reserved_currency = consumed
1072 .checked_add(&reserved_currency)
1073 .ok_or(CommandError::Overflow)?;
1074
1075 if reserved_currency > balance.to_amount() {
1076 return Err(CommandError::InsufficientFunds {
1077 balance,
1078 consumed: reserved_currency,
1079 });
1080 }
1081
1082 queued_cmds.push_back(cmd.clone());
1083
1084 updates.push(Update::Add {
1085 command: cmd.clone(),
1086 fee_per_wu,
1087 add_to_applicable_by_fee: false,
1088 });
1089
1090 by_sender.state = Some((queued_cmds, reserved_currency));
1091
1092 Ok((cmd.clone(), Self::make_queue()))
1093 } else if queue_applicable_at_nonce == current_nonce {
1094 if !cmd_applicable_at_nonce
1095 .between(&queue_applicable_at_nonce, &queue_target_nonce)
1096 {
1097 return Err(CommandError::InvalidNonce {
1098 account_nonce: cmd_applicable_at_nonce,
1099 expected: queue_applicable_at_nonce,
1100 });
1101 }
1102
1103 let replacement_index = queued_cmds
1104 .iter()
1105 .position(|cmd| {
1106 let cmd_applicable_at_nonce_prime =
1107 cmd.data.forget_check().applicable_at_nonce();
1108 cmd_applicable_at_nonce <= cmd_applicable_at_nonce_prime
1109 })
1110 .unwrap();
1111
1112 let drop_queue = queued_cmds.split_off(replacement_index);
1113
1114 let to_drop = drop_queue.front().unwrap().data.forget_check();
1115 my_assert(cmd_applicable_at_nonce <= to_drop.applicable_at_nonce())?;
1116
1117 {
1120 let replace_fee = to_drop.fee();
1121 if fee < replace_fee {
1122 return Err(CommandError::InsufficientReplaceFee { replace_fee, fee });
1123 }
1124 }
1125
1126 let dropped = self.remove_with_dependents_exn_impl(
1127 drop_queue.front().unwrap(),
1128 by_sender,
1129 updates,
1130 )?;
1131 my_assert(drop_queue == dropped)?;
1132
1133 let (cmd, _) = {
1134 let (v, dropped) = self.add_from_gossip_exn_impl(
1135 global_slot_since_genesis,
1136 current_global_slot,
1137 cmd,
1138 current_nonce,
1139 balance,
1140 by_sender,
1141 updates,
1142 )?;
1143 my_assert(dropped.is_empty())?;
1145 (v, dropped)
1146 };
1147
1148 let drop_head = dropped.front().cloned().unwrap();
1149 let mut drop_tail = dropped.into_iter().skip(1).peekable();
1150
1151 let mut increment = fee.checked_sub(&to_drop.fee()).unwrap();
1152 let mut dropped = None::<VecDeque<_>>;
1153 let mut current_nonce = current_nonce;
1154 let mut this_updates = Vec::with_capacity(128);
1155
1156 while let Some(cmd) = drop_tail.peek() {
1157 if dropped.is_some() {
1158 let cmd_unchecked = cmd.data.forget_check();
1159 let replace_fee = cmd_unchecked.fee();
1160
1161 increment = increment.checked_sub(&replace_fee).ok_or({
1162 CommandError::InsufficientReplaceFee {
1163 replace_fee,
1164 fee: increment,
1165 }
1166 })?;
1167 } else {
1168 current_nonce = current_nonce.succ();
1169 let by_sender_pre = by_sender.clone();
1170 this_updates.clear();
1171
1172 match self.add_from_gossip_exn_impl(
1173 global_slot_since_genesis,
1174 current_global_slot,
1175 cmd,
1176 current_nonce,
1177 balance,
1178 by_sender,
1179 &mut this_updates,
1180 ) {
1181 Ok((_cmd, dropped)) => {
1182 my_assert(dropped.is_empty())?;
1183 updates.append(&mut this_updates);
1184 }
1185 Err(_) => {
1186 *by_sender = by_sender_pre;
1187 dropped = Some(drop_tail.clone().skip(1).collect());
1188 continue; }
1190 }
1191 }
1192 let _ = drop_tail.next();
1193 }
1194
1195 if increment < REPLACE_FEE {
1196 return Err(CommandError::InsufficientReplaceFee {
1197 replace_fee: REPLACE_FEE,
1198 fee: increment,
1199 });
1200 }
1201
1202 let mut dropped = dropped.unwrap_or_else(Self::make_queue);
1203 dropped.push_front(drop_head);
1204
1205 Ok((cmd, dropped))
1206 } else {
1207 Err(CommandError::InvalidNonce {
1209 account_nonce: cmd_applicable_at_nonce,
1210 expected: queue_target_nonce,
1211 })
1212 }
1213 }
1214 }
1215 }
1216
1217 fn expired_by_global_slot(&self, global_slot_since_genesis: Slot) -> Vec<ValidCommandWithHash> {
1218 self.transactions_with_expiration
1219 .iter()
1220 .filter(|(slot, _cmd)| **slot < global_slot_since_genesis)
1221 .flat_map(|(_slot, cmd)| cmd.iter().cloned())
1222 .collect()
1223 }
1224
1225 fn expired(&self, global_slot_since_genesis: Slot) -> Vec<ValidCommandWithHash> {
1226 self.expired_by_global_slot(global_slot_since_genesis)
1227 }
1228
1229 fn remove_expired(
1230 &mut self,
1231 global_slot_since_genesis: Slot,
1232 ) -> Result<Vec<ValidCommandWithHash>, CommandError> {
1233 let mut dropped = Vec::with_capacity(128);
1234 for cmd in self.expired(global_slot_since_genesis) {
1235 if self.member(&cmd) {
1236 let removed = self.remove_with_dependents_exn(&cmd)?;
1237 dropped.extend(removed);
1238 }
1239 }
1240 Ok(dropped)
1241 }
1242
1243 fn remove_lowest_fee(&mut self) -> Result<VecDeque<ValidCommandWithHash>, CommandError> {
1244 let Some(set) = self.min_fee().and_then(|fee| self.all_by_fee.get(&fee)) else {
1245 return Ok(VecDeque::new());
1246 };
1247
1248 #[allow(clippy::mutable_key_type)]
1250 let bset: BTreeSet<_> = set.iter().collect();
1251 let min = bset.first().map(|min| (*min).clone()).unwrap();
1253
1254 self.remove_with_dependents_exn(&min)
1255 }
1256
1257 fn drop_until_sufficient_balance(
1260 mut queue: VecDeque<ValidCommandWithHash>,
1261 mut currency_reserved: Amount,
1262 current_balance: Amount,
1263 ) -> Result<
1264 (
1265 VecDeque<ValidCommandWithHash>,
1266 Amount,
1267 VecDeque<ValidCommandWithHash>,
1268 ),
1269 CommandError,
1270 > {
1271 let mut dropped_so_far = VecDeque::with_capacity(queue.len());
1272
1273 while currency_reserved > current_balance {
1274 let last = queue.pop_back().unwrap();
1275 let consumed = currency_consumed(&last.data.forget_check())?;
1276 dropped_so_far.push_back(last);
1277 currency_reserved = currency_reserved.checked_sub(&consumed).unwrap();
1278 }
1279
1280 Ok((queue, currency_reserved, dropped_so_far))
1281 }
1282
1283 fn revalidate<F>(
1284 &mut self,
1285 global_slot_since_genesis: Slot,
1286 kind: RevalidateKind,
1287 get_account: F,
1288 ) -> Result<Vec<ValidCommandWithHash>, CommandError>
1289 where
1290 F: Fn(&AccountId) -> Option<Account>,
1291 {
1292 let requires_revalidation = |account_id: &AccountId| match kind {
1293 RevalidateKind::EntirePool => true,
1294 RevalidateKind::Subset(set) => set.contains(account_id),
1295 };
1296
1297 let mut dropped = Vec::new();
1298
1299 for (sender, (mut queue, mut currency_reserved)) in self.all_by_sender.clone() {
1300 if !requires_revalidation(&sender) {
1301 continue;
1302 }
1303 let account: Account = get_account(&sender)
1304 .ok_or(CommandError::Custom(Cow::Borrowed("Account not find")))?;
1305 let current_balance = account
1306 .liquid_balance_at_slot(global_slot_since_genesis)
1307 .to_amount();
1308 let first_cmd = queue.front().cloned().unwrap();
1309 let first_nonce = first_cmd.data.forget_check().applicable_at_nonce();
1310
1311 if !(account.has_permission_to_send() && account.has_permission_to_increment_nonce())
1312 || account.nonce < first_nonce
1313 {
1314 let this_dropped = self.remove_with_dependents_exn(&first_cmd)?;
1315 dropped.extend(this_dropped);
1316 } else {
1317 let first_applicable_nonce_index = queue.iter().position(|cmd| {
1319 let nonce = cmd.data.forget_check().applicable_at_nonce();
1320 nonce == account.nonce
1321 });
1322
1323 let retained_for_nonce = match first_applicable_nonce_index {
1324 Some(index) => queue.split_off(index),
1325 None => Default::default(),
1326 };
1327 let dropped_for_nonce = queue;
1328
1329 for cmd in &dropped_for_nonce {
1330 currency_reserved = currency_reserved
1331 .checked_sub(¤cy_consumed(&cmd.data.forget_check())?)
1332 .unwrap();
1333 }
1334
1335 let (keep_queue, currency_reserved, dropped_for_balance) =
1336 Self::drop_until_sufficient_balance(
1337 retained_for_nonce,
1338 currency_reserved,
1339 current_balance,
1340 )?;
1341
1342 let keeping_prefix = dropped_for_nonce.is_empty();
1343 let keeping_suffix = dropped_for_balance.is_empty();
1344 let to_drop: Vec<_> = dropped_for_nonce
1345 .into_iter()
1346 .chain(dropped_for_balance)
1347 .collect();
1348
1349 match keep_queue.front().cloned() {
1350 _ if keeping_prefix && keeping_suffix => {
1351 }
1353 None => {
1354 self.remove_applicable_exn(&first_cmd);
1357 self.all_by_sender.remove(&sender);
1358 }
1359 Some(_) if keeping_prefix => {
1360 self.all_by_sender
1363 .insert(sender, (keep_queue, currency_reserved));
1364 }
1365 Some(first_kept) => {
1366 let first_kept_unchecked = first_kept.data.forget_check();
1369 self.all_by_sender
1370 .insert(sender, (keep_queue, currency_reserved));
1371 self.remove_applicable_exn(&first_cmd);
1372 Self::map_set_insert(
1373 &mut self.applicable_by_fee,
1374 first_kept_unchecked.fee_per_wu(),
1375 first_kept,
1376 );
1377 }
1378 }
1379 self.update_remove_all_by_fee_and_hash_and_expiration(to_drop.clone());
1380 dropped.extend(to_drop);
1381 }
1382 }
1383
1384 Ok(dropped)
1385 }
1386
1387 fn list_includable_transactions(&self, limit: usize) -> Vec<ValidCommandWithHash> {
1390 let mut txns = Vec::with_capacity(self.applicable_by_fee.len());
1391
1392 let mut applicable_by_fee = self.applicable_by_fee.clone();
1394 let mut all_by_sender = self.all_by_sender.clone();
1395
1396 while !applicable_by_fee.is_empty() && txns.len() < limit {
1397 let (fee, mut set) = applicable_by_fee
1398 .iter()
1399 .max_by_key(|(rate, _)| *rate)
1400 .map(|(rate, set)| (rate.clone(), set.clone()))
1401 .unwrap();
1402
1403 let txn = set.iter().min_by_key(|b| &b.hash).cloned().unwrap();
1405
1406 {
1407 set.remove(&txn);
1408 if set.is_empty() {
1409 applicable_by_fee.remove(&fee);
1410 } else {
1411 applicable_by_fee.insert(fee, set);
1412 }
1413 }
1414
1415 let sender = txn.data.forget_check().fee_payer();
1416
1417 let (sender_queue, _amount) = all_by_sender.get_mut(&sender).unwrap();
1418 let head_txn = sender_queue.pop_front().unwrap();
1419
1420 if txn.hash == head_txn.hash {
1421 match sender_queue.front().cloned() {
1422 None => {
1423 all_by_sender.remove(&sender);
1424 }
1425 Some(next_txn) => {
1426 let fee = next_txn.data.forget_check().fee_per_wu();
1427 applicable_by_fee.entry(fee).or_default().insert(next_txn);
1428 }
1429 }
1430 } else {
1431 mina_core::warn!(
1432 mina_core::log::system_time();
1433 kind = "transaction pool", message = "Sender queue is malformed");
1434 all_by_sender.remove(&sender);
1435 }
1436
1437 txns.push(txn);
1438 }
1439 txns
1440 }
1441
1442 fn transactions(&mut self, limit: usize) -> Vec<ValidCommandWithHash> {
1446 let mut txns = Vec::with_capacity(self.applicable_by_fee.len());
1447 loop {
1448 if self.applicable_by_fee.is_empty() {
1449 assert!(self.all_by_sender.is_empty());
1450 return txns;
1451 }
1452
1453 if txns.len() >= limit {
1454 return txns;
1455 }
1456
1457 let (fee, mut set) = self
1458 .applicable_by_fee
1459 .iter()
1460 .max_by_key(|(rate, _)| *rate)
1461 .map(|(rate, set)| (rate.clone(), set.clone()))
1462 .unwrap();
1463
1464 let txn = set.iter().min_by_key(|b| &b.hash).cloned().unwrap();
1466
1467 {
1468 set.remove(&txn);
1469 if set.is_empty() {
1470 self.applicable_by_fee.remove(&fee);
1471 } else {
1472 self.applicable_by_fee.insert(fee, set);
1473 }
1474 }
1475
1476 let sender = txn.data.forget_check().fee_payer();
1477
1478 let (sender_queue, _amount) = self.all_by_sender.get_mut(&sender).unwrap();
1479 let head_txn = sender_queue.pop_front().unwrap();
1480
1481 if txn.hash == head_txn.hash {
1482 match sender_queue.front().cloned() {
1483 None => {
1484 self.all_by_sender.remove(&sender);
1485 }
1486 Some(next_txn) => {
1487 let fee = next_txn.data.forget_check().fee_per_wu();
1488 self.applicable_by_fee
1489 .entry(fee)
1490 .or_default()
1491 .insert(next_txn);
1492 }
1493 }
1494 } else {
1495 mina_core::warn!(
1496 mina_core::log::system_time();
1497 kind = "transaction pool", message = "Sender queue is malformed");
1498 self.all_by_sender.remove(&sender);
1499 }
1500
1501 txns.push(txn);
1502 }
1503 }
1504
1505 fn get_all_transactions(&self) -> Vec<ValidCommandWithHash> {
1507 self.all_by_sender
1508 .values()
1509 .cloned()
1510 .flat_map(|(cmds, _)| cmds.into_iter())
1511 .collect()
1512 }
1513
1514 fn get_pending_amount_and_nonce(&self) -> HashMap<AccountId, (Option<Nonce>, Amount)> {
1515 self.all_by_sender
1517 .clone()
1518 .into_iter()
1519 .map(|(acc_id, (cmds, amount))| (acc_id, (cmds.back().unwrap().data.nonce(), amount)))
1520 .collect()
1521 }
1522}
1523
1524fn currency_consumed(cmd: &UserCommand) -> Result<Amount, CommandError> {
1525 use crate::scan_state::transaction_logic::signed_command::{Body::*, PaymentPayload};
1526
1527 let fee_amount = Amount::of_fee(&cmd.fee());
1528 let amount = match cmd {
1529 UserCommand::SignedCommand(c) => {
1530 match &c.payload.body {
1531 Payment(PaymentPayload { amount, .. }) => {
1532 *amount
1534 }
1535 StakeDelegation(_) => Amount::zero(),
1536 }
1537 }
1538 UserCommand::ZkAppCommand(_) => Amount::zero(),
1539 };
1540
1541 fee_amount
1542 .checked_add(&amount)
1543 .ok_or(CommandError::InvalidCurrencyConsumed)
1544}
1545
1546pub mod transaction_hash {
1547 use super::*;
1548
1549 pub fn hash_command(cmd: valid::UserCommand) -> ValidCommandWithHash {
1550 let hash = match &cmd {
1551 valid::UserCommand::SignedCommand(cmd) => {
1552 v2::MinaBaseSignedCommandStableV2::from(&**cmd)
1553 .hash()
1554 .unwrap()
1555 }
1556 valid::UserCommand::ZkAppCommand(cmd) => {
1557 let cmd = cmd.clone().forget();
1558 v2::MinaBaseZkappCommandTStableV1WireStableV1::from(&cmd)
1559 .hash()
1560 .unwrap()
1561 }
1562 };
1563
1564 WithHash { data: cmd, hash }
1565 }
1566}
1567
1568#[derive(Debug)]
1569pub enum ApplyDecision {
1570 Accept,
1571 Reject,
1572}
1573
1574const MAX_PER_15_SECONDS: usize = 10;
1575
1576#[derive(Clone, Debug, Serialize, Deserialize)]
1577pub struct TransactionPool {
1578 pub pool: IndexedPool,
1579 locally_generated_uncommitted: HashMap<ValidCommandWithHash, (redux::Timestamp, Batch)>,
1580 locally_generated_committed: HashMap<ValidCommandWithHash, (redux::Timestamp, Batch)>,
1581 current_batch: usize,
1582 remaining_in_batch: usize,
1583 pub config: Config,
1584 batcher: (),
1585 best_tip_diff_relay: Option<()>,
1586 verification_key_table: VkRefcountTable,
1587}
1588
1589impl TransactionPool {
1590 pub fn new(config: Config, consensus_constants: &ConsensusConstants) -> Self {
1591 Self {
1592 pool: IndexedPool::new(consensus_constants),
1593 locally_generated_uncommitted: Default::default(),
1594 locally_generated_committed: Default::default(),
1595 current_batch: 0,
1596 remaining_in_batch: 0,
1597 config,
1598 batcher: (),
1599 best_tip_diff_relay: None,
1600 verification_key_table: Default::default(),
1601 }
1602 }
1603
1604 pub fn size(&self) -> usize {
1605 self.pool.size()
1606 }
1607
1608 pub fn get_all_transactions(&self) -> Vec<ValidCommandWithHash> {
1609 self.pool.get_all_transactions()
1610 }
1611
1612 pub fn get_pending_amount_and_nonce(&self) -> HashMap<AccountId, (Option<Nonce>, Amount)> {
1613 self.pool.get_pending_amount_and_nonce()
1614 }
1615
1616 pub fn transactions(&mut self, limit: usize) -> Vec<ValidCommandWithHash> {
1617 self.pool.transactions(limit)
1618 }
1619
1620 pub fn list_includable_transactions(&self, limit: usize) -> Vec<ValidCommandWithHash> {
1621 self.pool.list_includable_transactions(limit)
1622 }
1623
1624 pub fn get_accounts_to_revalidate_on_new_best_tip(&self) -> BTreeSet<AccountId> {
1625 self.pool.all_by_sender.keys().cloned().collect()
1626 }
1627
1628 pub fn on_new_best_tip(
1629 &mut self,
1630 global_slot_since_genesis: Slot,
1631 accounts: &BTreeMap<AccountId, Account>,
1632 ) -> Result<Vec<ValidCommandWithHash>, CommandError> {
1633 let dropped = self.pool.revalidate(
1634 global_slot_since_genesis,
1635 RevalidateKind::EntirePool,
1636 |sender_id| {
1637 Some(
1638 accounts
1639 .get(sender_id)
1640 .cloned()
1641 .unwrap_or_else(Account::empty),
1642 )
1643 },
1644 )?;
1645
1646 let dropped_locally_generated = dropped
1647 .iter()
1648 .filter(|cmd| {
1649 let dropped_commited = self.locally_generated_committed.remove(cmd).is_some();
1650 let dropped_uncommited = self.locally_generated_uncommitted.remove(cmd).is_some();
1651 assert!(!(dropped_commited && dropped_uncommited));
1653 dropped_commited || dropped_uncommited
1654 })
1655 .collect::<Vec<_>>();
1656
1657 if !dropped_locally_generated.is_empty() {
1658 mina_core::info!(
1659 mina_core::log::system_time();
1660 kind = "transaction pool",
1661 message = "Dropped locally generated commands $cmds from pool when transition frontier was recreated.",
1662 dropped = format!("{dropped_locally_generated:?}")
1663 );
1664 }
1665
1666 Ok(dropped)
1667 }
1668
1669 fn has_sufficient_fee(&self, pool_max_size: usize, cmd: &valid::UserCommand) -> bool {
1670 match self.pool.min_fee() {
1671 None => true,
1672 Some(min_fee) => {
1673 if self.pool.size() >= pool_max_size {
1674 cmd.forget_check().fee_per_wu() > min_fee
1675 } else {
1676 true
1677 }
1678 }
1679 }
1680 }
1681
1682 fn drop_until_below_max_size(
1683 &mut self,
1684 pool_max_size: usize,
1685 ) -> Result<Vec<ValidCommandWithHash>, CommandError> {
1686 let mut list = Vec::new();
1687
1688 while self.pool.size() > pool_max_size {
1689 let dropped = self.pool.remove_lowest_fee()?;
1690 my_assert(!dropped.is_empty())?;
1691 list.extend(dropped)
1692 }
1693
1694 Ok(list)
1695 }
1696
1697 pub fn get_accounts_to_handle_transition_diff(
1698 &self,
1699 diff: &diff::BestTipDiff,
1700 ) -> (BTreeSet<AccountId>, BTreeSet<AccountId>) {
1701 let diff::BestTipDiff {
1702 new_commands,
1703 removed_commands,
1704 reorg_best_tip: _,
1705 } = diff;
1706
1707 let in_cmds = new_commands
1708 .iter()
1709 .chain(removed_commands)
1710 .flat_map(|cmd| cmd.data.forget_check().accounts_referenced())
1711 .collect::<BTreeSet<_>>();
1712
1713 let uncommitted = self
1714 .locally_generated_uncommitted
1715 .keys()
1716 .map(|cmd| cmd.data.fee_payer())
1717 .collect::<BTreeSet<_>>();
1718
1719 (in_cmds, uncommitted)
1720 }
1721
1722 pub fn handle_transition_frontier_diff(
1723 &mut self,
1724 global_slot_since_genesis: Slot,
1725 current_global_slot: Slot,
1726 diff: &diff::BestTipDiff,
1727 account_ids: &BTreeSet<AccountId>,
1728 accounts: &BTreeMap<AccountId, Account>,
1729 uncommited: &BTreeMap<AccountId, Account>,
1730 ) -> Result<(), String> {
1731 let diff::BestTipDiff {
1732 new_commands,
1733 removed_commands,
1734 reorg_best_tip: _,
1735 } = diff;
1736
1737 let (new_commands, removed_commands) = {
1739 let collect_hashed = |cmds: &[WithStatus<valid::UserCommand>]| {
1740 cmds.iter()
1741 .map(|cmd| transaction_hash::hash_command(cmd.data.clone()))
1742 .collect::<Vec<_>>()
1743 };
1744
1745 let mut new_commands = collect_hashed(new_commands);
1746 let mut removed_commands = collect_hashed(removed_commands);
1747
1748 #[allow(clippy::mutable_key_type)]
1749 let new_commands_set = new_commands.iter().collect::<HashSet<_>>();
1750 #[allow(clippy::mutable_key_type)]
1751 let removed_commands_set = removed_commands.iter().collect::<HashSet<_>>();
1752
1753 #[allow(clippy::mutable_key_type)]
1754 let duplicates = new_commands_set
1755 .intersection(&removed_commands_set)
1756 .map(|cmd| (*cmd).clone())
1757 .collect::<HashSet<_>>();
1758
1759 new_commands.retain(|cmd| !duplicates.contains(cmd));
1760 removed_commands.retain(|cmd| !duplicates.contains(cmd));
1761 (new_commands, removed_commands)
1762 };
1763
1764 let pool_max_size = self.config.pool_max_size;
1765
1766 self.verification_key_table.increment_list(&new_commands);
1767 self.verification_key_table
1768 .decrement_list(&removed_commands);
1769
1770 let mut dropped_backtrack = Vec::with_capacity(256);
1771 for cmd in removed_commands {
1772 if let Some(time_added) = self.locally_generated_committed.remove(&cmd) {
1773 self.locally_generated_uncommitted
1774 .insert(cmd.clone(), time_added);
1775 }
1776
1777 let dropped_seq = match self.pool.add_from_backtrack(
1778 global_slot_since_genesis,
1779 current_global_slot,
1780 cmd,
1781 ) {
1782 Ok(_) => self.drop_until_below_max_size(pool_max_size)?,
1783 Err(e) => return Err(format!("{:?}", e)),
1784 };
1785 dropped_backtrack.extend(dropped_seq);
1786 }
1787
1788 self.verification_key_table
1789 .decrement_hashed(&dropped_backtrack);
1790
1791 let locally_generated_dropped = dropped_backtrack
1792 .iter()
1793 .filter(|t| self.locally_generated_uncommitted.contains_key(t))
1794 .collect::<Vec<_>>();
1795
1796 let dropped_commands = {
1797 let accounts_to_check = account_ids;
1798 let existing_account_states_by_id = accounts;
1799
1800 let get_account = |id: &AccountId| {
1801 match existing_account_states_by_id.get(id) {
1802 Some(account) => Some(account.clone()),
1803 None => {
1804 if accounts_to_check.contains(id) {
1805 Some(Account::empty())
1806 } else {
1807 None
1808 }
1814 }
1815 }
1816 };
1817
1818 self.pool.revalidate(
1819 global_slot_since_genesis,
1820 RevalidateKind::Subset(accounts_to_check),
1821 get_account,
1822 )?
1823 };
1824
1825 let (committed_commands, dropped_commit_conflicts): (Vec<_>, Vec<_>) = {
1826 let command_hashes: HashSet<v2::TransactionHash> =
1827 new_commands.iter().map(|cmd| cmd.hash.clone()).collect();
1828
1829 dropped_commands
1830 .iter()
1831 .partition(|cmd| command_hashes.contains(&cmd.hash))
1832 };
1833
1834 for cmd in &committed_commands {
1835 self.verification_key_table.decrement_hashed([&**cmd]);
1836 if let Some(data) = self.locally_generated_uncommitted.remove(cmd) {
1837 let old = self
1838 .locally_generated_committed
1839 .insert((*cmd).clone(), data);
1840 my_assert(old.is_none())?;
1841 };
1842 }
1843
1844 let _commit_conflicts_locally_generated = dropped_commit_conflicts
1845 .iter()
1846 .filter(|cmd| self.locally_generated_uncommitted.remove(cmd).is_some());
1847
1848 for cmd in locally_generated_dropped {
1849 let remove_cmd = |this: &mut Self| {
1853 this.verification_key_table.decrement_hashed([cmd]);
1854 assert!(this.locally_generated_uncommitted.remove(cmd).is_some());
1855 };
1856
1857 if !self.locally_generated_committed.contains_key(cmd) {
1858 if !self.has_sufficient_fee(pool_max_size, &cmd.data) {
1859 remove_cmd(self)
1860 } else {
1861 let unchecked = &cmd.data;
1862 match uncommited.get(&unchecked.fee_payer()) {
1863 Some(account) => {
1864 match self.pool.add_from_gossip_exn(
1865 global_slot_since_genesis,
1866 current_global_slot,
1867 cmd,
1868 account.nonce,
1869 account.liquid_balance_at_slot(global_slot_since_genesis),
1870 ) {
1871 Err(_) => {
1872 remove_cmd(self);
1873 }
1874 Ok(_) => {
1875 self.verification_key_table.increment_hashed([cmd]);
1876 }
1877 }
1878 }
1879 None => {
1880 remove_cmd(self);
1881 }
1882 }
1883 }
1884 }
1885 }
1886
1887 let expired_commands = self.pool.remove_expired(global_slot_since_genesis)?;
1888 for cmd in &expired_commands {
1889 self.verification_key_table.decrement_hashed([cmd]);
1890 self.locally_generated_uncommitted.remove(cmd);
1891 }
1892
1893 Ok(())
1894 }
1895
1896 pub fn get_accounts_to_apply_diff(&self, diff: &diff::DiffVerified) -> BTreeSet<AccountId> {
1897 let fee_payer = |cmd: &ValidCommandWithHash| cmd.data.fee_payer();
1898 diff.list.iter().map(fee_payer).collect()
1899 }
1900
1901 fn apply(
1902 &mut self,
1903 time: redux::Timestamp,
1904 global_slot_since_genesis: Slot,
1905 current_global_slot: Slot,
1906 diff: &diff::DiffVerified,
1907 accounts: &BTreeMap<AccountId, Account>,
1908 is_sender_local: bool,
1909 ) -> Result<
1910 (
1911 ApplyDecision,
1912 Vec<ValidCommandWithHash>,
1913 Vec<(ValidCommandWithHash, diff::Error)>,
1914 HashSet<v2::TransactionHash>,
1915 ),
1916 String,
1917 > {
1918 let fee_payer = |cmd: &ValidCommandWithHash| cmd.data.fee_payer();
1919 let fee_payer_accounts = accounts;
1920
1921 let check_command = |pool: &IndexedPool, cmd: &ValidCommandWithHash| {
1922 if pool.member(cmd) {
1923 Err(diff::Error::Duplicate)
1924 } else {
1925 match fee_payer_accounts.get(&fee_payer(cmd)) {
1926 None => Err(diff::Error::FeePayerAccountNotFound),
1927 Some(account) => {
1928 if account.has_permission_to_send()
1929 && account.has_permission_to_increment_nonce()
1930 {
1931 Ok(())
1932 } else {
1933 Err(diff::Error::FeePayerNotPermittedToSend)
1934 }
1935 }
1936 }
1937 }
1938 };
1939
1940 let add_results = diff
1941 .list
1942 .iter()
1943 .map(|cmd| {
1944 let account = fee_payer_accounts
1945 .get(&fee_payer(cmd))
1946 .ok_or_else(|| "Fee payer not found".to_string())?;
1947
1948 let result: Result<_, diff::Error> = (|| {
1949 check_command(&self.pool, cmd)?;
1950
1951 match self.pool.add_from_gossip_exn(
1952 global_slot_since_genesis,
1953 current_global_slot,
1954 cmd,
1955 account.nonce,
1956 account.liquid_balance_at_slot(global_slot_since_genesis),
1957 ) {
1958 Ok(x) => Ok(x),
1959 Err(e) => Err(e.into()),
1960 }
1961 })();
1962
1963 match result {
1964 Ok((cmd, dropped)) => Ok(Ok((cmd, dropped))),
1965 Err(err) => Ok(Err((cmd, err))),
1966 }
1967 })
1968 .collect::<Result<Vec<Result<_, _>>, String>>()?;
1969
1970 let added_cmds = add_results
1971 .iter()
1972 .filter_map(|cmd| match cmd {
1973 Ok((cmd, _)) => Some(cmd),
1974 Err(_) => None,
1975 })
1976 .collect::<Vec<_>>();
1977
1978 let dropped_for_add = add_results
1979 .iter()
1980 .filter_map(|cmd| match cmd {
1981 Ok((_, dropped)) => Some(dropped),
1982 Err(_) => None,
1983 })
1984 .flatten()
1985 .collect::<Vec<_>>();
1986
1987 let dropped_for_size = self.drop_until_below_max_size(self.config.pool_max_size)?;
1988
1989 let all_dropped_cmds = dropped_for_add
1990 .iter()
1991 .copied()
1992 .chain(dropped_for_size.iter())
1993 .collect::<Vec<_>>();
1994
1995 {
1996 self.verification_key_table.increment_hashed(added_cmds);
1997 self.verification_key_table
1998 .decrement_hashed(all_dropped_cmds.iter().copied());
1999 };
2000
2001 let dropped_for_add_hashes: HashSet<&v2::TransactionHash> =
2002 dropped_for_add.iter().map(|cmd| &cmd.hash).collect();
2003 let dropped_for_size_hashes: HashSet<&v2::TransactionHash> =
2004 dropped_for_size.iter().map(|cmd| &cmd.hash).collect();
2005 let all_dropped_cmd_hashes: HashSet<v2::TransactionHash> = dropped_for_add_hashes
2006 .union(&dropped_for_size_hashes)
2007 .map(|hash| (*hash).clone())
2008 .collect();
2009
2010 if is_sender_local {
2015 for result in add_results.iter() {
2016 let Ok((cmd, _dropped)) = result else {
2017 continue;
2018 };
2019 if !all_dropped_cmd_hashes.contains(&cmd.hash) {
2020 self.register_locally_generated(time, cmd);
2021 }
2022 }
2023 }
2024
2025 let mut accepted = Vec::with_capacity(128);
2026 let mut rejected = Vec::with_capacity(128);
2027
2028 for result in &add_results {
2030 match result {
2031 Ok((cmd, _dropped)) => {
2032 if all_dropped_cmd_hashes.contains(&cmd.hash) {
2033 } else {
2035 accepted.push(cmd.clone());
2036 }
2037 }
2038 Err((cmd, error)) => {
2039 rejected.push(((*cmd).clone(), error.clone()));
2040 }
2041 }
2042 }
2043
2044 let decision = if rejected
2045 .iter()
2046 .any(|(_, error)| error.grounds_for_diff_rejection())
2047 {
2048 ApplyDecision::Reject
2049 } else {
2050 ApplyDecision::Accept
2051 };
2052
2053 Ok((decision, accepted, rejected, all_dropped_cmd_hashes))
2054 }
2055
2056 pub fn unsafe_apply(
2057 &mut self,
2058 time: redux::Timestamp,
2059 global_slot_since_genesis: Slot,
2060 current_global_slot: Slot,
2061 diff: &diff::DiffVerified,
2062 accounts: &BTreeMap<AccountId, Account>,
2063 is_sender_local: bool,
2064 ) -> Result<
2065 (
2066 ApplyDecision,
2067 Vec<ValidCommandWithHash>,
2068 Vec<(ValidCommandWithHash, diff::Error)>,
2069 HashSet<v2::TransactionHash>,
2070 ),
2071 String,
2072 > {
2073 let (decision, accepted, rejected, dropped) = self.apply(
2074 time,
2075 global_slot_since_genesis,
2076 current_global_slot,
2077 diff,
2078 accounts,
2079 is_sender_local,
2080 )?;
2081 Ok((decision, accepted, rejected, dropped))
2082 }
2083
2084 fn register_locally_generated(&mut self, time: redux::Timestamp, cmd: &ValidCommandWithHash) {
2085 match self.locally_generated_uncommitted.entry(cmd.clone()) {
2086 Entry::Occupied(mut entry) => {
2087 let (entry_time, _batch_num) = entry.get_mut();
2088 *entry_time = redux::Timestamp::global_now();
2089 }
2090 Entry::Vacant(entry) => {
2091 let batch_num = if self.remaining_in_batch > 0 {
2092 self.remaining_in_batch -= 1;
2093 self.current_batch
2094 } else {
2095 self.remaining_in_batch = MAX_PER_15_SECONDS - 1;
2096 self.current_batch += 1;
2097 self.current_batch
2098 };
2099 entry.insert((time, Batch::Of(batch_num)));
2100 }
2101 }
2102 }
2103
2104 pub fn prevalidate(&self, diff: diff::Diff) -> Result<diff::Diff, TransactionPoolErrors> {
2105 let well_formedness_errors: HashSet<_> = diff
2106 .list
2107 .iter()
2108 .flat_map(|cmd| match cmd.check_well_formedness() {
2109 Ok(()) => Vec::new(),
2110 Err(errors) => errors,
2111 })
2112 .collect();
2113
2114 if !well_formedness_errors.is_empty() {
2115 return Err(TransactionPoolErrors::BatchedErrors(
2116 well_formedness_errors
2117 .into_iter()
2118 .map(TransactionError::WellFormedness)
2119 .collect_vec(),
2120 ));
2121 }
2122
2123 Ok(diff)
2124 }
2125
2126 pub fn convert_diff_to_verifiable(
2127 &self,
2128 diff: diff::Diff,
2129 accounts: &BTreeMap<AccountId, Account>,
2130 ) -> Result<Vec<WithStatus<verifiable::UserCommand>>, TransactionPoolErrors> {
2131 let cs = diff
2132 .list
2133 .iter()
2134 .cloned()
2135 .map(|cmd| MaybeWithStatus { cmd, status: None })
2136 .collect::<Vec<_>>();
2137
2138 let diff = UserCommand::to_all_verifiable::<FromUnappliedSequence, _>(cs, |account_ids| {
2139 let mempool_vks: HashMap<_, _> = account_ids
2140 .iter()
2141 .map(|id| {
2142 let vks = self.verification_key_table.find_vks_by_account_id(id);
2143 let vks: HashMap<_, _> =
2144 vks.iter().map(|vk| (vk.hash(), (*vk).clone())).collect();
2145 (id.clone(), vks)
2146 })
2147 .collect();
2148
2149 let ledger_vks = UserCommand::load_vks_from_ledger_accounts(accounts);
2150 let ledger_vks: HashMap<_, _> = ledger_vks
2151 .into_iter()
2152 .map(|(id, vk)| {
2153 let mut map = HashMap::new();
2154 map.insert(vk.hash(), vk);
2155 (id, map)
2156 })
2157 .collect();
2158
2159 let new_map: HashMap<AccountId, HashMap<Fp, VerificationKeyWire>> = HashMap::new();
2160 let merged =
2161 mempool_vks
2162 .into_iter()
2163 .chain(ledger_vks)
2164 .fold(new_map, |mut accum, (id, map)| {
2165 let entry = accum.entry(id).or_default();
2166 for (hash, vk) in map {
2167 entry.insert(hash, vk);
2168 }
2169 accum
2170 });
2171
2172 from_unapplied_sequence::Cache::new(merged)
2173 })
2174 .map_err(TransactionPoolErrors::LoadingVK)?;
2175
2176 let diff = diff
2177 .into_iter()
2178 .map(|MaybeWithStatus { cmd, status: _ }| WithStatus {
2179 data: cmd,
2180 status: Applied,
2182 })
2183 .collect::<Vec<_>>();
2184
2185 Ok(diff)
2186 }
2187
2188 pub fn verify_proofs(
2189 &self,
2190 diff: Vec<WithStatus<verifiable::UserCommand>>,
2191 ) -> Result<Vec<valid::UserCommand>, TransactionPoolErrors> {
2192 let (verified, invalid): (Vec<_>, Vec<_>) = Verifier
2193 .verify_commands(diff, None)
2194 .into_iter()
2195 .partition(Result::is_ok);
2196
2197 let verified: Vec<_> = verified.into_iter().map(Result::unwrap).collect();
2198 let invalid: Vec<_> = invalid.into_iter().map(Result::unwrap_err).collect();
2199
2200 if !invalid.is_empty() {
2201 let transaction_pool_errors = invalid
2202 .into_iter()
2203 .map(TransactionError::Verifier)
2204 .collect();
2205 Err(TransactionPoolErrors::BatchedErrors(
2206 transaction_pool_errors,
2207 ))
2208 } else {
2209 Ok(verified)
2210 }
2211 }
2212
2213 fn get_rebroadcastable<F>(&mut self, has_timed_out: F) -> Vec<Vec<UserCommand>>
2214 where
2215 F: Fn(&redux::Timestamp) -> bool,
2216 {
2217 let log = |has_timed_out: bool, s: &str, cmd: &ValidCommandWithHash| -> bool {
2218 if has_timed_out {
2219 eprintln!("{}: {:?}", s, cmd);
2220 false
2221 } else {
2222 true
2223 }
2224 };
2225
2226 self.locally_generated_uncommitted
2227 .retain(|key, (time, _batch)| {
2228 log(
2229 has_timed_out(time),
2230 "No longer rebroadcasting uncommitted expired command",
2231 key,
2232 )
2233 });
2234 self.locally_generated_committed
2235 .retain(|key, (time, _batch)| {
2236 log(
2237 has_timed_out(time),
2238 "Removing committed locally generated expired command",
2239 key,
2240 )
2241 });
2242
2243 let mut rebroadcastable_txs = self
2244 .locally_generated_uncommitted
2245 .iter()
2246 .collect::<Vec<_>>();
2247
2248 rebroadcastable_txs.sort_by(|(txn1, (_, batch1)), (txn2, (_, batch2))| {
2249 use std::cmp::Ordering::Equal;
2250
2251 let get_nonce =
2252 |txn: &ValidCommandWithHash| txn.data.forget_check().applicable_at_nonce();
2253
2254 match batch1.cmp(batch2) {
2255 Equal => (),
2256 x => return x,
2257 }
2258 match get_nonce(txn1).cmp(&get_nonce(txn2)) {
2259 Equal => (),
2260 x => return x,
2261 }
2262 txn1.hash.cmp(&txn2.hash)
2263 });
2264
2265 rebroadcastable_txs
2266 .into_iter()
2267 .group_by(|(_txn, (_time, batch))| batch)
2268 .into_iter()
2269 .map(|(_batch, group_txns)| {
2270 group_txns
2271 .map(|(txn, _)| txn.data.forget_check())
2272 .collect::<Vec<_>>()
2273 })
2274 .collect::<Vec<_>>()
2275 }
2276}
2277
2278#[cfg(test)]
2279mod tests {
2280 use super::*;
2281
2282 #[test]
2284 fn test_map_merge() {
2285 let mut a = HashMap::new();
2286 a.insert(1, {
2287 let mut map = HashMap::new();
2288 map.insert(1, 10);
2289 map.insert(2, 12);
2290 map
2291 });
2292 let mut b = HashMap::new();
2293 b.insert(1, {
2294 let mut map = HashMap::new();
2295 map.insert(3, 20);
2296 map
2297 });
2298
2299 let new_map: HashMap<_, HashMap<_, _>> = HashMap::new();
2300 let merged = a
2301 .into_iter()
2302 .chain(b)
2303 .fold(new_map, |mut accum, (id, map)| {
2304 let entry = accum.entry(id).or_default();
2305 for (hash, vk) in map {
2306 entry.insert(hash, vk);
2307 }
2308 accum
2309 });
2310
2311 let one = merged.get(&1).unwrap();
2312 assert!(one.get(&1).is_some());
2313 assert!(one.get(&2).is_some());
2314 assert!(one.get(&3).is_some());
2315
2316 dbg!(merged);
2317 }
2318}