mina_tree/
transaction_pool.rs

1use backtrace::Backtrace;
2use serde::{Deserialize, Serialize};
3use std::{
4    borrow::{Borrow, Cow},
5    collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
6};
7
8use itertools::Itertools;
9use mina_curves::pasta::Fp;
10use mina_p2p_messages::{bigint::BigInt, v2};
11use openmina_core::{bug_condition, consensus::ConsensusConstants};
12
13use crate::{
14    scan_state::{
15        currency::{Amount, Balance, BlockTime, Fee, Magnitude, Nonce, Slot},
16        fee_rate::FeeRate,
17        transaction_logic::{
18            valid, verifiable,
19            zkapp_command::{
20                from_unapplied_sequence::{self, FromUnappliedSequence},
21                MaybeWithStatus, WithHash,
22            },
23            TransactionStatus::Applied,
24            UserCommand, WellFormednessError, WithStatus,
25        },
26    },
27    verifier::{Verifier, VerifierError},
28    Account, AccountId, BaseLedger, Mask, TokenId, VerificationKey, VerificationKeyWire,
29};
30
31#[derive(Debug, thiserror::Error)]
32pub enum TransactionPoolErrors {
33    /// Invalid transactions, rejeceted diffs, etc...
34    #[error("Transaction pool errors: {0:?}")]
35    BatchedErrors(Vec<TransactionError>),
36    #[error("{0:?}")]
37    LoadingVK(String),
38    /// Errors that should panic the node (bugs in implementation)
39    #[error("Unexpected error: {0}")]
40    Unexpected(String),
41}
42
43#[derive(Debug, thiserror::Error)]
44pub enum TransactionError {
45    #[error(transparent)]
46    Verifier(#[from] VerifierError),
47    #[error(transparent)]
48    WellFormedness(#[from] WellFormednessError),
49}
50
51impl From<String> for TransactionPoolErrors {
52    fn from(value: String) -> Self {
53        Self::Unexpected(value)
54    }
55}
56
57#[inline(never)]
58fn my_assert(v: bool) -> Result<(), CommandError> {
59    if !v {
60        let backtrace = Backtrace::new();
61        let s = format!("assert failed {:?}", backtrace);
62        bug_condition!("{:?}", s);
63        return Err(CommandError::Custom(Cow::Owned(s)));
64    }
65    Ok(())
66}
67
68mod consensus {
69    use crate::scan_state::currency::{BlockTimeSpan, Epoch, Length};
70
71    use super::*;
72
73    #[derive(Clone, Debug, Serialize, Deserialize)]
74    pub struct Constants {
75        k: Length,
76        delta: Length,
77        slots_per_sub_window: Length,
78        slots_per_window: Length,
79        sub_windows_per_window: Length,
80        slots_per_epoch: Length,
81        grace_period_slots: Length,
82        grace_period_end: Slot,
83        checkpoint_window_slots_per_year: Length,
84        checkpoint_window_size_in_slots: Length,
85        block_window_duration_ms: BlockTimeSpan,
86        slot_duration_ms: BlockTimeSpan,
87        epoch_duration: BlockTimeSpan,
88        delta_duration: BlockTimeSpan,
89        genesis_state_timestamp: BlockTime,
90    }
91
92    impl Constants {
93        // Keep this in sync with the implementation of ConsensusConstantsChecked::create
94        // in ledger/src/proofs/block.rs
95        pub fn create(constants: &ConsensusConstants) -> Self {
96            Constants {
97                k: Length::from_u32(constants.k),
98                delta: Length::from_u32(constants.delta),
99                slots_per_sub_window: Length::from_u32(constants.slots_per_sub_window),
100                slots_per_window: Length::from_u32(constants.slots_per_window),
101                sub_windows_per_window: Length::from_u32(constants.sub_windows_per_window),
102                slots_per_epoch: Length::from_u32(constants.slots_per_epoch),
103                grace_period_slots: Length::from_u32(constants.grace_period_slots),
104                grace_period_end: Slot::from_u32(constants.grace_period_end),
105                checkpoint_window_slots_per_year: Length::from_u32(
106                    constants.checkpoint_window_slots_per_year,
107                ),
108                checkpoint_window_size_in_slots: Length::from_u32(
109                    constants.checkpoint_window_size_in_slots,
110                ),
111                block_window_duration_ms: BlockTimeSpan::from_u64(
112                    constants.block_window_duration_ms,
113                ),
114                slot_duration_ms: BlockTimeSpan::from_u64(constants.slot_duration_ms),
115                epoch_duration: BlockTimeSpan::from_u64(constants.epoch_duration),
116                delta_duration: BlockTimeSpan::from_u64(constants.delta_duration),
117                genesis_state_timestamp: constants.genesis_state_timestamp.into(),
118            }
119        }
120    }
121
122    // Consensus epoch
123    impl Epoch {
124        fn of_time_exn(constants: &Constants, time: BlockTime) -> Result<Self, String> {
125            if time < constants.genesis_state_timestamp {
126                return Err(
127                    "Epoch.of_time: time is earlier than genesis block timestamp".to_string(),
128                );
129            }
130
131            let time_since_genesis = time.diff(constants.genesis_state_timestamp);
132            let epoch = time_since_genesis.to_ms() / constants.epoch_duration.to_ms();
133            let epoch: u32 = epoch.try_into().unwrap();
134
135            Ok(Self::from_u32(epoch))
136        }
137
138        fn start_time(constants: &Constants, epoch: Self) -> BlockTime {
139            let ms = constants
140                .genesis_state_timestamp
141                .to_span_since_epoch()
142                .to_ms()
143                + ((epoch.as_u32() as u64) * constants.epoch_duration.to_ms());
144            BlockTime::of_span_since_epoch(BlockTimeSpan::of_ms(ms))
145        }
146
147        pub fn epoch_and_slot_of_time_exn(
148            constants: &Constants,
149            time: BlockTime,
150        ) -> Result<(Self, Slot), String> {
151            let epoch = Self::of_time_exn(constants, time)?;
152            let time_since_epoch = time.diff(Self::start_time(constants, epoch));
153
154            let slot: u64 = time_since_epoch.to_ms() / constants.slot_duration_ms.to_ms();
155            let slot = Slot::from_u32(slot.try_into().unwrap());
156
157            Ok((epoch, slot))
158        }
159    }
160
161    /// TODO: Maybe rename to `ConsensusGlobalSlot` ?
162    pub struct GlobalSlot {
163        slot_number: Slot,
164        slots_per_epoch: Length,
165    }
166
167    impl GlobalSlot {
168        fn create(constants: &Constants, epoch: Epoch, slot: Slot) -> Self {
169            let slot_number = slot.as_u32() + (constants.slots_per_epoch.as_u32() * epoch.as_u32());
170            Self {
171                slot_number: Slot::from_u32(slot_number),
172                slots_per_epoch: constants.slots_per_epoch,
173            }
174        }
175
176        fn of_epoch_and_slot(constants: &Constants, (epoch, slot): (Epoch, Slot)) -> Self {
177            Self::create(constants, epoch, slot)
178        }
179
180        pub fn of_time_exn(constants: &Constants, time: BlockTime) -> Result<Self, String> {
181            Ok(Self::of_epoch_and_slot(
182                constants,
183                Epoch::epoch_and_slot_of_time_exn(constants, time)?,
184            ))
185        }
186
187        pub fn to_global_slot(&self) -> Slot {
188            let Self {
189                slot_number,
190                slots_per_epoch: _,
191            } = self;
192            *slot_number
193        }
194    }
195}
196
197/// Fee increase required to replace a transaction.
198const REPLACE_FEE: Fee = Fee::of_nanomina_int_exn(1);
199
200pub type ValidCommandWithHash = WithHash<valid::UserCommand, v2::TransactionHash>;
201
202pub mod diff {
203    use super::*;
204
205    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, strum_macros::Display)]
206    pub enum Error {
207        InsufficientReplaceFee,
208        Duplicate,
209        InvalidNonce,
210        InsufficientFunds,
211        Overflow,
212        BadToken,
213        UnwantedFeeToken,
214        Expired,
215        Overloaded,
216        FeePayerAccountNotFound,
217        FeePayerNotPermittedToSend,
218        AfterSlotTxEnd,
219        BacktrackNonceMismatch,
220        InvalidCurrencyConsumed,
221        Custom,
222    }
223
224    impl Error {
225        pub fn grounds_for_diff_rejection(&self) -> bool {
226            match self {
227                Error::InsufficientReplaceFee
228                | Error::Duplicate
229                | Error::InvalidNonce
230                | Error::InsufficientFunds
231                | Error::Expired
232                | Error::Overloaded
233                | Error::FeePayerAccountNotFound
234                | Error::FeePayerNotPermittedToSend
235                | Error::AfterSlotTxEnd
236                | Error::InvalidCurrencyConsumed
237                | Error::Custom
238                | Error::BacktrackNonceMismatch => false,
239                Error::Overflow | Error::BadToken | Error::UnwantedFeeToken => true,
240            }
241        }
242    }
243
244    #[derive(Debug)]
245    pub struct Diff {
246        pub list: Vec<UserCommand>,
247    }
248
249    #[derive(Serialize, Deserialize, Debug, Clone)]
250    pub struct DiffVerified {
251        pub list: Vec<ValidCommandWithHash>,
252    }
253
254    struct Rejected {
255        list: Vec<(UserCommand, Error)>,
256    }
257
258    #[derive(Serialize, Deserialize, Debug, Clone)]
259    pub struct BestTipDiff {
260        pub new_commands: Vec<WithStatus<valid::UserCommand>>,
261        pub removed_commands: Vec<WithStatus<valid::UserCommand>>,
262        pub reorg_best_tip: bool,
263    }
264}
265
266fn preload_accounts(
267    ledger: &Mask,
268    account_ids: &BTreeSet<AccountId>,
269) -> HashMap<AccountId, Account> {
270    account_ids
271        .iter()
272        .filter_map(|id| {
273            let addr = ledger.location_of_account(id)?;
274            let account = ledger.get(addr)?;
275            Some((id.clone(), *account))
276        })
277        .collect()
278}
279
280#[derive(Clone, Debug, Serialize, Deserialize)]
281pub struct Config {
282    pub trust_system: (),
283    pub pool_max_size: usize,
284    pub slot_tx_end: Option<Slot>,
285}
286
287/// Used to be able to de/serialize our `TransactionPool` in the state machine
288#[derive(Serialize, Deserialize)]
289struct VkRefcountTableBigInts {
290    verification_keys: Vec<(BigInt, (usize, WithHash<VerificationKey, BigInt>))>,
291    account_id_to_vks: Vec<(AccountId, Vec<(BigInt, usize)>)>,
292    vk_to_account_ids: Vec<(BigInt, Vec<(AccountId, usize)>)>,
293}
294impl From<VkRefcountTable> for VkRefcountTableBigInts {
295    fn from(value: VkRefcountTable) -> Self {
296        let VkRefcountTable {
297            verification_keys,
298            account_id_to_vks,
299            vk_to_account_ids,
300        } = value;
301        Self {
302            verification_keys: verification_keys
303                .into_iter()
304                .map(|(hash, (count, vk))| {
305                    assert_eq!(hash, vk.hash());
306                    let hash: BigInt = hash.into();
307                    (
308                        hash.clone(),
309                        (
310                            count,
311                            WithHash {
312                                data: vk.vk().clone(),
313                                hash,
314                            },
315                        ),
316                    )
317                })
318                .collect(),
319            account_id_to_vks: account_id_to_vks
320                .into_iter()
321                .map(|(id, map)| {
322                    (
323                        id,
324                        map.into_iter()
325                            .map(|(hash, count)| (hash.into(), count))
326                            .collect(),
327                    )
328                })
329                .collect(),
330            vk_to_account_ids: vk_to_account_ids
331                .into_iter()
332                .map(|(hash, map)| (hash.into(), map.into_iter().collect()))
333                .collect(),
334        }
335    }
336}
337impl From<VkRefcountTableBigInts> for VkRefcountTable {
338    fn from(value: VkRefcountTableBigInts) -> Self {
339        let VkRefcountTableBigInts {
340            verification_keys,
341            account_id_to_vks,
342            vk_to_account_ids,
343        } = value;
344        Self {
345            verification_keys: verification_keys
346                .into_iter()
347                .map(|(hash, (count, vk))| {
348                    assert_eq!(hash, vk.hash);
349                    let hash: Fp = hash.to_field().unwrap(); // We trust our serialized data
350                    (hash, (count, VerificationKeyWire::with_hash(vk.data, hash)))
351                })
352                .collect(),
353            account_id_to_vks: account_id_to_vks
354                .into_iter()
355                .map(|(id, map)| {
356                    let map = map
357                        .into_iter()
358                        .map(|(bigint, count)| (bigint.to_field::<Fp>().unwrap(), count)) // We trust our serialized data
359                        .collect();
360                    (id, map)
361                })
362                .collect(),
363            vk_to_account_ids: vk_to_account_ids
364                .into_iter()
365                .map(|(hash, map)| (hash.to_field().unwrap(), map.into_iter().collect())) // We trust our serialized data
366                .collect(),
367        }
368    }
369}
370
371#[derive(Clone, Debug, Default, Serialize, Deserialize)]
372#[serde(into = "VkRefcountTableBigInts")]
373#[serde(from = "VkRefcountTableBigInts")]
374struct VkRefcountTable {
375    verification_keys: HashMap<Fp, (usize, VerificationKeyWire)>,
376    account_id_to_vks: HashMap<AccountId, HashMap<Fp, usize>>,
377    vk_to_account_ids: HashMap<Fp, HashMap<AccountId, usize>>,
378}
379
380impl VkRefcountTable {
381    fn find_vk(&self, f: &Fp) -> Option<&(usize, VerificationKeyWire)> {
382        self.verification_keys.get(f)
383    }
384
385    fn find_vks_by_account_id(&self, account_id: &AccountId) -> Vec<&VerificationKeyWire> {
386        let Some(vks) = self.account_id_to_vks.get(account_id) else {
387            return Vec::new();
388        };
389
390        vks.iter()
391            .map(|(f, _)| self.find_vk(f).expect("malformed Vk_refcount_table.t"))
392            .map(|(_, vk)| vk)
393            .collect()
394    }
395
396    fn inc(&mut self, account_id: AccountId, vk: VerificationKeyWire) {
397        use std::collections::hash_map::Entry::{Occupied, Vacant};
398
399        match self.verification_keys.entry(vk.hash()) {
400            Vacant(e) => {
401                e.insert((1, vk.clone()));
402            }
403            Occupied(mut e) => {
404                let (count, _vk) = e.get_mut();
405                *count += 1;
406            }
407        }
408
409        let map = self
410            .account_id_to_vks
411            .entry(account_id.clone())
412            .or_default(); // or insert empty map
413        let count = map.entry(vk.hash()).or_default(); // or insert count 0
414        *count += 1;
415
416        let map = self.vk_to_account_ids.entry(vk.hash()).or_default(); // or insert empty map
417        let count = map.entry(account_id).or_default(); // or insert count 0
418        *count += 1;
419    }
420
421    fn dec(&mut self, account_id: AccountId, vk_hash: Fp) {
422        use std::collections::hash_map::Entry::{Occupied, Vacant};
423
424        match self.verification_keys.entry(vk_hash) {
425            Vacant(_e) => {
426                bug_condition!("vk_map: Unexpected error on self.verification_keys: vacant vk_hash")
427            }
428            Occupied(mut e) => {
429                let (count, _vk) = e.get_mut();
430                if *count == 1 {
431                    e.remove();
432                } else {
433                    *count = (*count).checked_sub(1).unwrap();
434                }
435            }
436        }
437
438        fn remove<K1, K2>(
439            key1: K1,
440            key2: K2,
441            table: &mut HashMap<K1, HashMap<K2, usize>>,
442        ) -> Result<(), &'static str>
443        where
444            K1: std::hash::Hash + Eq,
445            K2: std::hash::Hash + Eq,
446        {
447            match table.entry(key1) {
448                Vacant(_e) => return Err("vacant on key1"),
449                Occupied(mut e) => {
450                    let map = e.get_mut();
451                    match map.entry(key2) {
452                        Vacant(_e) => return Err("vacant on key2"),
453                        Occupied(mut e2) => {
454                            let count: &mut usize = e2.get_mut();
455                            if *count == 1 {
456                                e2.remove();
457                                e.remove();
458                            } else {
459                                *count = count.checked_sub(1).ok_or("invalid count state")?
460                            }
461                        }
462                    }
463                }
464            }
465            Ok(())
466        }
467
468        if let Err(e) = remove(account_id.clone(), vk_hash, &mut self.account_id_to_vks) {
469            bug_condition!(
470                "vk_map: Unexpected error on self.account_id_to_vks: {:?}",
471                e
472            );
473        }
474        if let Err(e) = remove(vk_hash, account_id.clone(), &mut self.vk_to_account_ids) {
475            bug_condition!(
476                "vk_map: Unexpected error on self.vk_to_account_ids: {:?}",
477                e
478            );
479        }
480    }
481
482    fn decrement_list(&mut self, list: &[ValidCommandWithHash]) {
483        list.iter().for_each(|c| {
484            for (id, vk) in c.data.forget_check().extract_vks() {
485                self.dec(id, vk.hash());
486            }
487        });
488    }
489
490    fn decrement_hashed<I, V>(&mut self, list: I)
491    where
492        I: IntoIterator<Item = V>,
493        V: Borrow<ValidCommandWithHash>,
494    {
495        list.into_iter().for_each(|c| {
496            for (id, vk) in c.borrow().data.forget_check().extract_vks() {
497                self.dec(id, vk.hash());
498            }
499        });
500    }
501
502    fn increment_hashed<I, V>(&mut self, list: I)
503    where
504        I: IntoIterator<Item = V>,
505        V: Borrow<ValidCommandWithHash>,
506    {
507        list.into_iter().for_each(|c| {
508            for (id, vk) in c.borrow().data.forget_check().extract_vks() {
509                self.inc(id, vk);
510            }
511        });
512    }
513
514    fn increment_list(&mut self, list: &[ValidCommandWithHash]) {
515        list.iter().for_each(|c| {
516            for (id, vk) in c.data.forget_check().extract_vks() {
517                self.inc(id, vk);
518            }
519        });
520    }
521}
522
523#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
524enum Batch {
525    Of(usize),
526}
527
528#[derive(Debug)]
529pub enum CommandError {
530    InvalidNonce {
531        account_nonce: Nonce,
532        expected: Nonce,
533    },
534    InsufficientFunds {
535        balance: Balance,
536        consumed: Amount,
537    },
538    /// NOTE: don't punish for this, attackers can induce nodes to banlist
539    ///       each other that way! *)
540    InsufficientReplaceFee {
541        replace_fee: Fee,
542        fee: Fee,
543    },
544    Overflow,
545    BadToken,
546    Expired {
547        valid_until: Slot,
548        global_slot_since_genesis: Slot,
549    },
550    UnwantedFeeToken {
551        token_id: TokenId,
552    },
553    AfterSlotTxEnd,
554    BacktrackNonceMismatch {
555        expected_nonce: Nonce,
556        first_nonce: Nonce,
557    },
558    InvalidCurrencyConsumed,
559    Custom(Cow<'static, str>),
560}
561
562impl From<CommandError> for String {
563    fn from(value: CommandError) -> Self {
564        format!("{:?}", value)
565    }
566}
567
568impl From<CommandError> for diff::Error {
569    fn from(value: CommandError) -> Self {
570        match value {
571            CommandError::InvalidNonce { .. } => diff::Error::InvalidNonce,
572            CommandError::InsufficientFunds { .. } => diff::Error::InsufficientFunds,
573            CommandError::InsufficientReplaceFee { .. } => diff::Error::InsufficientReplaceFee,
574            CommandError::Overflow => diff::Error::Overflow,
575            CommandError::BadToken => diff::Error::BadToken,
576            CommandError::Expired { .. } => diff::Error::Expired,
577            CommandError::UnwantedFeeToken { .. } => diff::Error::UnwantedFeeToken,
578            CommandError::AfterSlotTxEnd => diff::Error::AfterSlotTxEnd,
579            CommandError::BacktrackNonceMismatch { .. } => diff::Error::BacktrackNonceMismatch,
580            CommandError::InvalidCurrencyConsumed => diff::Error::InvalidCurrencyConsumed,
581            CommandError::Custom(_) => diff::Error::Custom,
582        }
583    }
584}
585
586#[derive(Clone, Debug, Serialize, Deserialize)]
587pub struct IndexedPoolConfig {
588    pub consensus_constants: consensus::Constants,
589    slot_tx_end: Option<Slot>,
590}
591
592// module Config = struct
593//   type t =
594//     { constraint_constants : Genesis_constants.Constraint_constants.t
595//     ; consensus_constants : Consensus.Constants.t
596//     ; time_controller : Block_time.Controller.t
597//     ; slot_tx_end : Mina_numbers.Global_slot_since_hard_fork.t option
598//     }
599//   [@@deriving sexp_of, equal, compare]
600// end
601
602#[derive(Clone, Debug, Serialize, Deserialize)]
603pub struct IndexedPool {
604    /// Transactions valid against the current ledger, indexed by fee per
605    /// weight unit.
606    applicable_by_fee: HashMap<FeeRate, HashSet<ValidCommandWithHash>>,
607    /// All pending transactions along with the total currency required to
608    /// execute them -- plus any currency spent from this account by
609    /// transactions from other accounts -- indexed by sender account.
610    /// Ordered by nonce inside the accounts.
611    all_by_sender: HashMap<AccountId, (VecDeque<ValidCommandWithHash>, Amount)>,
612    /// All transactions in the pool indexed by fee per weight unit.
613    all_by_fee: HashMap<FeeRate, HashSet<ValidCommandWithHash>>,
614    all_by_hash: HashMap<v2::TransactionHash, ValidCommandWithHash>,
615    /// Only transactions that have an expiry
616    transactions_with_expiration: HashMap<Slot, HashSet<ValidCommandWithHash>>,
617    size: usize,
618    pub config: IndexedPoolConfig,
619}
620
621enum Update {
622    Add {
623        command: ValidCommandWithHash,
624        fee_per_wu: FeeRate,
625        add_to_applicable_by_fee: bool,
626    },
627    RemoveAllByFeeAndHashAndExpiration {
628        commands: VecDeque<ValidCommandWithHash>,
629    },
630    RemoveFromApplicableByFee {
631        fee_per_wu: FeeRate,
632        command: ValidCommandWithHash,
633    },
634}
635
636#[derive(Clone)]
637struct SenderState {
638    sender: AccountId,
639    state: Option<(VecDeque<ValidCommandWithHash>, Amount)>,
640}
641
642pub enum RevalidateKind<'a> {
643    EntirePool,
644    Subset(&'a BTreeSet<AccountId>),
645}
646
647impl IndexedPool {
648    fn new(constants: &ConsensusConstants) -> Self {
649        Self {
650            applicable_by_fee: HashMap::new(),
651            all_by_sender: HashMap::new(),
652            all_by_fee: HashMap::new(),
653            all_by_hash: HashMap::new(),
654            transactions_with_expiration: HashMap::new(),
655            size: 0,
656            config: IndexedPoolConfig {
657                consensus_constants: consensus::Constants::create(constants),
658                slot_tx_end: None,
659            },
660        }
661    }
662
663    fn size(&self) -> usize {
664        self.size
665    }
666
667    fn min_fee(&self) -> Option<FeeRate> {
668        self.all_by_fee.keys().min().cloned()
669    }
670
671    fn member(&self, cmd: &ValidCommandWithHash) -> bool {
672        self.all_by_hash.contains_key(&cmd.hash)
673    }
674
675    pub fn get(&self, hash: &v2::TransactionHash) -> Option<&ValidCommandWithHash> {
676        self.all_by_hash.get(hash)
677    }
678
679    fn check_expiry(
680        &self,
681        global_slot_since_genesis: Slot,
682        cmd: &UserCommand,
683    ) -> Result<(), CommandError> {
684        let valid_until = cmd.valid_until();
685
686        if valid_until < global_slot_since_genesis {
687            return Err(CommandError::Expired {
688                valid_until,
689                global_slot_since_genesis,
690            });
691        }
692
693        Ok(())
694    }
695
696    /// Insert in a `HashMap<_, HashSet<_>>`
697    fn map_set_insert<K, V>(map: &mut HashMap<K, HashSet<V>>, key: K, value: V)
698    where
699        K: std::hash::Hash + PartialEq + Eq,
700        V: std::hash::Hash + PartialEq + Eq,
701    {
702        let entry = map.entry(key).or_default();
703        entry.insert(value);
704    }
705
706    /// Remove in a `HashMap<_, HashSet<_>>`
707    fn map_set_remove<K, V>(map: &mut HashMap<K, HashSet<V>>, key: K, value: &V)
708    where
709        K: std::hash::Hash + PartialEq + Eq,
710        V: std::hash::Hash + PartialEq + Eq,
711    {
712        let Entry::Occupied(mut entry) = map.entry(key) else {
713            return;
714        };
715        let set = entry.get_mut();
716        set.remove(value);
717        if set.is_empty() {
718            entry.remove();
719        }
720    }
721
722    fn update_expiration_map(&mut self, cmd: ValidCommandWithHash, is_add: bool) {
723        let user_cmd = cmd.data.forget_check();
724        let expiry = user_cmd.valid_until();
725        if expiry == Slot::max() {
726            return; // Do nothing
727        }
728        if is_add {
729            Self::map_set_insert(&mut self.transactions_with_expiration, expiry, cmd);
730        } else {
731            Self::map_set_remove(&mut self.transactions_with_expiration, expiry, &cmd);
732        }
733    }
734
735    fn remove_from_expiration_exn(&mut self, cmd: ValidCommandWithHash) {
736        self.update_expiration_map(cmd, false);
737    }
738
739    fn add_to_expiration(&mut self, cmd: ValidCommandWithHash) {
740        self.update_expiration_map(cmd, true);
741    }
742
743    /// Remove a command from the applicable_by_fee field. This may break an
744    /// invariant.
745    fn remove_applicable_exn(&mut self, cmd: &ValidCommandWithHash) {
746        let fee_per_wu = cmd.data.forget_check().fee_per_wu();
747        Self::map_set_remove(&mut self.applicable_by_fee, fee_per_wu, cmd);
748    }
749
750    fn make_queue<T>() -> VecDeque<T> {
751        VecDeque::with_capacity(256)
752    }
753
754    fn add_from_backtrack(
755        &mut self,
756        global_slot_since_genesis: Slot,
757        current_global_slot: Slot,
758        cmd: ValidCommandWithHash,
759    ) -> Result<(), CommandError> {
760        let IndexedPoolConfig { slot_tx_end, .. } = &self.config;
761
762        if !slot_tx_end
763            .as_ref()
764            .map(|slot_tx_end| current_global_slot < *slot_tx_end)
765            .unwrap_or(true)
766        {
767            return Err(CommandError::AfterSlotTxEnd);
768        }
769
770        let ValidCommandWithHash {
771            data: unchecked,
772            hash: cmd_hash,
773        } = &cmd;
774        let unchecked = unchecked.forget_check();
775
776        self.check_expiry(global_slot_since_genesis, &unchecked)?;
777
778        let fee_payer = unchecked.fee_payer();
779        let fee_per_wu = unchecked.fee_per_wu();
780
781        let consumed = currency_consumed(&unchecked)?;
782
783        match self.all_by_sender.get_mut(&fee_payer) {
784            None => {
785                {
786                    let mut queue = Self::make_queue();
787                    queue.push_back(cmd.clone());
788                    self.all_by_sender.insert(fee_payer, (queue, consumed));
789                }
790                Self::map_set_insert(&mut self.all_by_fee, fee_per_wu.clone(), cmd.clone());
791                self.all_by_hash.insert(cmd_hash.clone(), cmd.clone());
792                Self::map_set_insert(&mut self.applicable_by_fee, fee_per_wu.clone(), cmd.clone());
793                self.add_to_expiration(cmd);
794                self.size += 1;
795            }
796            Some((queue, currency_reserved)) => {
797                let first_queued = queue.front().cloned().unwrap();
798                let expected_nonce = unchecked.expected_target_nonce();
799                let first_nonce = first_queued.data.forget_check().applicable_at_nonce();
800
801                if expected_nonce != first_nonce {
802                    // Ocaml panics here as well
803                    //panic!("indexed pool nonces inconsistent when adding from backtrack.")
804                    return Err(CommandError::BacktrackNonceMismatch {
805                        expected_nonce,
806                        first_nonce,
807                    });
808                }
809
810                // update `self.all_by_sender`
811                {
812                    queue.push_front(cmd.clone());
813                    *currency_reserved = currency_reserved.checked_add(&consumed).unwrap();
814                }
815
816                self.remove_applicable_exn(&first_queued);
817
818                Self::map_set_insert(&mut self.applicable_by_fee, fee_per_wu.clone(), cmd.clone());
819                Self::map_set_insert(&mut self.all_by_fee, fee_per_wu.clone(), cmd.clone());
820                self.all_by_hash.insert(cmd_hash.clone(), cmd.clone());
821                self.add_to_expiration(cmd);
822                self.size += 1;
823            }
824        }
825        Ok(())
826    }
827
828    fn update_add(
829        &mut self,
830        cmd: ValidCommandWithHash,
831        fee_per_wu: FeeRate,
832        add_to_applicable_by_fee: bool,
833    ) {
834        if add_to_applicable_by_fee {
835            Self::map_set_insert(&mut self.applicable_by_fee, fee_per_wu.clone(), cmd.clone());
836        }
837
838        let cmd_hash = cmd.hash.clone();
839
840        Self::map_set_insert(&mut self.all_by_fee, fee_per_wu, cmd.clone());
841        self.all_by_hash.insert(cmd_hash, cmd.clone());
842        self.add_to_expiration(cmd);
843        self.size += 1;
844    }
845
846    /// Remove a command from the all_by_fee and all_by_hash fields, and decrement
847    /// size. This may break an invariant.
848    fn update_remove_all_by_fee_and_hash_and_expiration<I>(&mut self, cmds: I)
849    where
850        I: IntoIterator<Item = ValidCommandWithHash>,
851    {
852        for cmd in cmds {
853            let fee_per_wu = cmd.data.forget_check().fee_per_wu();
854            let cmd_hash = cmd.hash.clone();
855            Self::map_set_remove(&mut self.all_by_fee, fee_per_wu, &cmd);
856            self.all_by_hash.remove(&cmd_hash);
857            self.remove_from_expiration_exn(cmd);
858            self.size = self.size.checked_sub(1).unwrap();
859        }
860    }
861
862    fn update_remove_from_applicable_by_fee(
863        &mut self,
864        fee_per_wu: FeeRate,
865        command: &ValidCommandWithHash,
866    ) {
867        Self::map_set_remove(&mut self.applicable_by_fee, fee_per_wu, command)
868    }
869
870    fn remove_with_dependents_exn(
871        &mut self,
872        cmd: &ValidCommandWithHash,
873    ) -> Result<VecDeque<ValidCommandWithHash>, CommandError> {
874        let sender = cmd.data.fee_payer();
875        let mut by_sender = SenderState {
876            state: self.all_by_sender.get(&sender).cloned(),
877            sender,
878        };
879
880        let mut updates = Vec::<Update>::with_capacity(128);
881        let result = self.remove_with_dependents_exn_impl(cmd, &mut by_sender, &mut updates);
882
883        self.set_sender(by_sender);
884        self.apply_updates(updates);
885
886        result
887    }
888
889    fn remove_with_dependents_exn_impl(
890        &self,
891        cmd: &ValidCommandWithHash,
892        by_sender: &mut SenderState,
893        updates: &mut Vec<Update>,
894    ) -> Result<VecDeque<ValidCommandWithHash>, CommandError> {
895        let (sender_queue, reserved_currency_ref) = by_sender.state.as_mut().unwrap();
896        let unchecked = cmd.data.forget_check();
897
898        my_assert(!sender_queue.is_empty())?;
899
900        let cmd_nonce = unchecked.applicable_at_nonce();
901
902        let cmd_index = sender_queue
903            .iter()
904            .position(|cmd| {
905                let nonce = cmd.data.forget_check().applicable_at_nonce();
906                // we just compare nonce equality since the command we are looking for already exists in the sequence
907                nonce == cmd_nonce
908            })
909            .unwrap();
910
911        let drop_queue = sender_queue.split_off(cmd_index);
912        let keep_queue = sender_queue;
913        my_assert(!drop_queue.is_empty())?;
914
915        let currency_to_remove = drop_queue.iter().try_fold(Amount::zero(), |acc, cmd| {
916            let consumed = currency_consumed(&cmd.data.forget_check())?;
917            Ok(consumed.checked_add(&acc).unwrap())
918        })?;
919
920        // This is safe because the currency in a subset of the commands much be <=
921        // total currency in all the commands.
922        let reserved_currency = reserved_currency_ref
923            .checked_sub(&currency_to_remove)
924            .unwrap();
925
926        updates.push(Update::RemoveAllByFeeAndHashAndExpiration {
927            commands: drop_queue.clone(),
928        });
929
930        if cmd_index == 0 {
931            updates.push(Update::RemoveFromApplicableByFee {
932                fee_per_wu: unchecked.fee_per_wu(),
933                command: cmd.clone(),
934            });
935        }
936
937        // We re-fetch it to make the borrow checker happy
938        // let (keep_queue, reserved_currency_ref) = self.all_by_sender.get_mut(&sender).unwrap();
939        if !keep_queue.is_empty() {
940            *reserved_currency_ref = reserved_currency;
941        } else {
942            my_assert(reserved_currency.is_zero())?;
943            by_sender.state = None;
944        }
945
946        Ok(drop_queue)
947    }
948
949    fn apply_updates(&mut self, updates: Vec<Update>) {
950        for update in updates {
951            match update {
952                Update::Add {
953                    command,
954                    fee_per_wu,
955                    add_to_applicable_by_fee,
956                } => self.update_add(command, fee_per_wu, add_to_applicable_by_fee),
957                Update::RemoveAllByFeeAndHashAndExpiration { commands } => {
958                    self.update_remove_all_by_fee_and_hash_and_expiration(commands)
959                }
960                Update::RemoveFromApplicableByFee {
961                    fee_per_wu,
962                    command,
963                } => self.update_remove_from_applicable_by_fee(fee_per_wu, &command),
964            }
965        }
966    }
967
968    fn set_sender(&mut self, by_sender: SenderState) {
969        let SenderState { sender, state } = by_sender;
970
971        match state {
972            Some(state) => {
973                self.all_by_sender.insert(sender, state);
974            }
975            None => {
976                self.all_by_sender.remove(&sender);
977            }
978        }
979    }
980
981    fn add_from_gossip_exn(
982        &mut self,
983        global_slot_since_genesis: Slot,
984        current_global_slot: Slot,
985        cmd: &ValidCommandWithHash,
986        current_nonce: Nonce,
987        balance: Balance,
988    ) -> Result<(ValidCommandWithHash, VecDeque<ValidCommandWithHash>), CommandError> {
989        let sender = cmd.data.fee_payer();
990        let mut by_sender = SenderState {
991            state: self.all_by_sender.get(&sender).cloned(),
992            sender,
993        };
994
995        let mut updates = Vec::<Update>::with_capacity(128);
996        let result = self.add_from_gossip_exn_impl(
997            global_slot_since_genesis,
998            current_global_slot,
999            cmd,
1000            current_nonce,
1001            balance,
1002            &mut by_sender,
1003            &mut updates,
1004        )?;
1005
1006        self.set_sender(by_sender);
1007        self.apply_updates(updates);
1008
1009        Ok(result)
1010    }
1011
1012    fn add_from_gossip_exn_impl(
1013        &self,
1014        global_slot_since_genesis: Slot,
1015        current_global_slot: Slot,
1016        cmd: &ValidCommandWithHash,
1017        current_nonce: Nonce,
1018        balance: Balance,
1019        by_sender: &mut SenderState,
1020        updates: &mut Vec<Update>,
1021    ) -> Result<(ValidCommandWithHash, VecDeque<ValidCommandWithHash>), CommandError> {
1022        let IndexedPoolConfig { slot_tx_end, .. } = &self.config;
1023
1024        if !slot_tx_end
1025            .as_ref()
1026            .map(|slot_tx_end| current_global_slot < *slot_tx_end)
1027            .unwrap_or(true)
1028        {
1029            return Err(CommandError::AfterSlotTxEnd);
1030        }
1031
1032        let unchecked = cmd.data.forget_check();
1033        let fee = unchecked.fee();
1034        let fee_per_wu = unchecked.fee_per_wu();
1035        let cmd_applicable_at_nonce = unchecked.applicable_at_nonce();
1036
1037        let consumed = {
1038            self.check_expiry(global_slot_since_genesis, &unchecked)?;
1039            let consumed = currency_consumed(&unchecked).map_err(|_| CommandError::Overflow)?;
1040            if !unchecked.fee_token().is_default() {
1041                return Err(CommandError::UnwantedFeeToken {
1042                    token_id: unchecked.fee_token(),
1043                });
1044            }
1045            consumed
1046        };
1047
1048        match by_sender.state.clone() {
1049            None => {
1050                if current_nonce != cmd_applicable_at_nonce {
1051                    return Err(CommandError::InvalidNonce {
1052                        account_nonce: current_nonce,
1053                        expected: cmd_applicable_at_nonce,
1054                    });
1055                }
1056                if consumed > balance.to_amount() {
1057                    return Err(CommandError::InsufficientFunds { balance, consumed });
1058                }
1059
1060                let mut queue = Self::make_queue();
1061                queue.push_back(cmd.clone());
1062                by_sender.state = Some((queue, consumed));
1063
1064                updates.push(Update::Add {
1065                    command: cmd.clone(),
1066                    fee_per_wu,
1067                    add_to_applicable_by_fee: true,
1068                });
1069
1070                Ok((cmd.clone(), Self::make_queue()))
1071            }
1072            Some((mut queued_cmds, reserved_currency)) => {
1073                my_assert(!queued_cmds.is_empty())?;
1074                let queue_applicable_at_nonce = {
1075                    let first = queued_cmds.front().unwrap();
1076                    first.data.forget_check().applicable_at_nonce()
1077                };
1078                let queue_target_nonce = {
1079                    let last = queued_cmds.back().unwrap();
1080                    last.data.forget_check().expected_target_nonce()
1081                };
1082                if queue_target_nonce == cmd_applicable_at_nonce {
1083                    let reserved_currency = consumed
1084                        .checked_add(&reserved_currency)
1085                        .ok_or(CommandError::Overflow)?;
1086
1087                    if reserved_currency > balance.to_amount() {
1088                        return Err(CommandError::InsufficientFunds {
1089                            balance,
1090                            consumed: reserved_currency,
1091                        });
1092                    }
1093
1094                    queued_cmds.push_back(cmd.clone());
1095
1096                    updates.push(Update::Add {
1097                        command: cmd.clone(),
1098                        fee_per_wu,
1099                        add_to_applicable_by_fee: false,
1100                    });
1101
1102                    by_sender.state = Some((queued_cmds, reserved_currency));
1103
1104                    Ok((cmd.clone(), Self::make_queue()))
1105                } else if queue_applicable_at_nonce == current_nonce {
1106                    if !cmd_applicable_at_nonce
1107                        .between(&queue_applicable_at_nonce, &queue_target_nonce)
1108                    {
1109                        return Err(CommandError::InvalidNonce {
1110                            account_nonce: cmd_applicable_at_nonce,
1111                            expected: queue_applicable_at_nonce,
1112                        });
1113                    }
1114
1115                    let replacement_index = queued_cmds
1116                        .iter()
1117                        .position(|cmd| {
1118                            let cmd_applicable_at_nonce_prime =
1119                                cmd.data.forget_check().applicable_at_nonce();
1120                            cmd_applicable_at_nonce <= cmd_applicable_at_nonce_prime
1121                        })
1122                        .unwrap();
1123
1124                    let drop_queue = queued_cmds.split_off(replacement_index);
1125
1126                    let to_drop = drop_queue.front().unwrap().data.forget_check();
1127                    my_assert(cmd_applicable_at_nonce <= to_drop.applicable_at_nonce())?;
1128
1129                    // We check the fee increase twice because we need to be sure the
1130                    // subtraction is safe.
1131                    {
1132                        let replace_fee = to_drop.fee();
1133                        if fee < replace_fee {
1134                            return Err(CommandError::InsufficientReplaceFee { replace_fee, fee });
1135                        }
1136                    }
1137
1138                    let dropped = self.remove_with_dependents_exn_impl(
1139                        drop_queue.front().unwrap(),
1140                        by_sender,
1141                        updates,
1142                    )?;
1143                    my_assert(drop_queue == dropped)?;
1144
1145                    let (cmd, _) = {
1146                        let (v, dropped) = self.add_from_gossip_exn_impl(
1147                            global_slot_since_genesis,
1148                            current_global_slot,
1149                            cmd,
1150                            current_nonce,
1151                            balance,
1152                            by_sender,
1153                            updates,
1154                        )?;
1155                        // We've already removed them, so this should always be empty.
1156                        my_assert(dropped.is_empty())?;
1157                        (v, dropped)
1158                    };
1159
1160                    let drop_head = dropped.front().cloned().unwrap();
1161                    let mut drop_tail = dropped.into_iter().skip(1).peekable();
1162
1163                    let mut increment = fee.checked_sub(&to_drop.fee()).unwrap();
1164                    let mut dropped = None::<VecDeque<_>>;
1165                    let mut current_nonce = current_nonce;
1166                    let mut this_updates = Vec::with_capacity(128);
1167
1168                    while let Some(cmd) = drop_tail.peek() {
1169                        if dropped.is_some() {
1170                            let cmd_unchecked = cmd.data.forget_check();
1171                            let replace_fee = cmd_unchecked.fee();
1172
1173                            increment = increment.checked_sub(&replace_fee).ok_or({
1174                                CommandError::InsufficientReplaceFee {
1175                                    replace_fee,
1176                                    fee: increment,
1177                                }
1178                            })?;
1179                        } else {
1180                            current_nonce = current_nonce.succ();
1181                            let by_sender_pre = by_sender.clone();
1182                            this_updates.clear();
1183
1184                            match self.add_from_gossip_exn_impl(
1185                                global_slot_since_genesis,
1186                                current_global_slot,
1187                                cmd,
1188                                current_nonce,
1189                                balance,
1190                                by_sender,
1191                                &mut this_updates,
1192                            ) {
1193                                Ok((_cmd, dropped)) => {
1194                                    my_assert(dropped.is_empty())?;
1195                                    updates.append(&mut this_updates);
1196                                }
1197                                Err(_) => {
1198                                    *by_sender = by_sender_pre;
1199                                    dropped = Some(drop_tail.clone().skip(1).collect());
1200                                    continue; // Don't go to next
1201                                }
1202                            }
1203                        }
1204                        let _ = drop_tail.next();
1205                    }
1206
1207                    if increment < REPLACE_FEE {
1208                        return Err(CommandError::InsufficientReplaceFee {
1209                            replace_fee: REPLACE_FEE,
1210                            fee: increment,
1211                        });
1212                    }
1213
1214                    let mut dropped = dropped.unwrap_or_else(Self::make_queue);
1215                    dropped.push_front(drop_head);
1216
1217                    Ok((cmd, dropped))
1218                } else {
1219                    // Invalid nonce or duplicate transaction got in- either way error
1220                    Err(CommandError::InvalidNonce {
1221                        account_nonce: cmd_applicable_at_nonce,
1222                        expected: queue_target_nonce,
1223                    })
1224                }
1225            }
1226        }
1227    }
1228
1229    fn expired_by_global_slot(&self, global_slot_since_genesis: Slot) -> Vec<ValidCommandWithHash> {
1230        self.transactions_with_expiration
1231            .iter()
1232            .filter(|(slot, _cmd)| **slot < global_slot_since_genesis)
1233            .flat_map(|(_slot, cmd)| cmd.iter().cloned())
1234            .collect()
1235    }
1236
1237    fn expired(&self, global_slot_since_genesis: Slot) -> Vec<ValidCommandWithHash> {
1238        self.expired_by_global_slot(global_slot_since_genesis)
1239    }
1240
1241    fn remove_expired(
1242        &mut self,
1243        global_slot_since_genesis: Slot,
1244    ) -> Result<Vec<ValidCommandWithHash>, CommandError> {
1245        let mut dropped = Vec::with_capacity(128);
1246        for cmd in self.expired(global_slot_since_genesis) {
1247            if self.member(&cmd) {
1248                let removed = self.remove_with_dependents_exn(&cmd)?;
1249                dropped.extend(removed);
1250            }
1251        }
1252        Ok(dropped)
1253    }
1254
1255    fn remove_lowest_fee(&mut self) -> Result<VecDeque<ValidCommandWithHash>, CommandError> {
1256        let Some(set) = self.min_fee().and_then(|fee| self.all_by_fee.get(&fee)) else {
1257            return Ok(VecDeque::new());
1258        };
1259
1260        // TODO: Should `self.all_by_fee` be a `BTreeSet` instead ?
1261        #[allow(clippy::mutable_key_type)]
1262        let bset: BTreeSet<_> = set.iter().collect();
1263        // TODO: Not sure if OCaml compare the same way than we do
1264        let min = bset.first().map(|min| (*min).clone()).unwrap();
1265
1266        self.remove_with_dependents_exn(&min)
1267    }
1268
1269    /// Drop commands from the end of the queue until the total currency consumed is
1270    /// <= the current balance.
1271    fn drop_until_sufficient_balance(
1272        mut queue: VecDeque<ValidCommandWithHash>,
1273        mut currency_reserved: Amount,
1274        current_balance: Amount,
1275    ) -> Result<
1276        (
1277            VecDeque<ValidCommandWithHash>,
1278            Amount,
1279            VecDeque<ValidCommandWithHash>,
1280        ),
1281        CommandError,
1282    > {
1283        let mut dropped_so_far = VecDeque::with_capacity(queue.len());
1284
1285        while currency_reserved > current_balance {
1286            let last = queue.pop_back().unwrap();
1287            let consumed = currency_consumed(&last.data.forget_check())?;
1288            dropped_so_far.push_back(last);
1289            currency_reserved = currency_reserved.checked_sub(&consumed).unwrap();
1290        }
1291
1292        Ok((queue, currency_reserved, dropped_so_far))
1293    }
1294
1295    fn revalidate<F>(
1296        &mut self,
1297        global_slot_since_genesis: Slot,
1298        kind: RevalidateKind,
1299        get_account: F,
1300    ) -> Result<Vec<ValidCommandWithHash>, CommandError>
1301    where
1302        F: Fn(&AccountId) -> Option<Account>,
1303    {
1304        let requires_revalidation = |account_id: &AccountId| match kind {
1305            RevalidateKind::EntirePool => true,
1306            RevalidateKind::Subset(set) => set.contains(account_id),
1307        };
1308
1309        let mut dropped = Vec::new();
1310
1311        for (sender, (mut queue, mut currency_reserved)) in self.all_by_sender.clone() {
1312            if !requires_revalidation(&sender) {
1313                continue;
1314            }
1315            let account: Account = get_account(&sender)
1316                .ok_or(CommandError::Custom(Cow::Borrowed("Account not find")))?;
1317            let current_balance = account
1318                .liquid_balance_at_slot(global_slot_since_genesis)
1319                .to_amount();
1320            let first_cmd = queue.front().cloned().unwrap();
1321            let first_nonce = first_cmd.data.forget_check().applicable_at_nonce();
1322
1323            if !(account.has_permission_to_send() && account.has_permission_to_increment_nonce())
1324                || account.nonce < first_nonce
1325            {
1326                let this_dropped = self.remove_with_dependents_exn(&first_cmd)?;
1327                dropped.extend(this_dropped);
1328            } else {
1329                // current_nonce >= first_nonce
1330                let first_applicable_nonce_index = queue.iter().position(|cmd| {
1331                    let nonce = cmd.data.forget_check().applicable_at_nonce();
1332                    nonce == account.nonce
1333                });
1334
1335                let retained_for_nonce = match first_applicable_nonce_index {
1336                    Some(index) => queue.split_off(index),
1337                    None => Default::default(),
1338                };
1339                let dropped_for_nonce = queue;
1340
1341                for cmd in &dropped_for_nonce {
1342                    currency_reserved = currency_reserved
1343                        .checked_sub(&currency_consumed(&cmd.data.forget_check())?)
1344                        .unwrap();
1345                }
1346
1347                let (keep_queue, currency_reserved, dropped_for_balance) =
1348                    Self::drop_until_sufficient_balance(
1349                        retained_for_nonce,
1350                        currency_reserved,
1351                        current_balance,
1352                    )?;
1353
1354                let keeping_prefix = dropped_for_nonce.is_empty();
1355                let keeping_suffix = dropped_for_balance.is_empty();
1356                let to_drop: Vec<_> = dropped_for_nonce
1357                    .into_iter()
1358                    .chain(dropped_for_balance)
1359                    .collect();
1360
1361                match keep_queue.front().cloned() {
1362                    _ if keeping_prefix && keeping_suffix => {
1363                        // Nothing dropped, nothing needs to be updated
1364                    }
1365                    None => {
1366                        // We drop the entire queue, first element needs to be removed from
1367                        // applicable_by_fee
1368                        self.remove_applicable_exn(&first_cmd);
1369                        self.all_by_sender.remove(&sender);
1370                    }
1371                    Some(_) if keeping_prefix => {
1372                        // We drop only transactions from the end of queue, keeping
1373                        // the head untouched, no need to update applicable_by_fee
1374                        self.all_by_sender
1375                            .insert(sender, (keep_queue, currency_reserved));
1376                    }
1377                    Some(first_kept) => {
1378                        // We need to replace old queue head with the new queue head
1379                        // in applicable_by_fee
1380                        let first_kept_unchecked = first_kept.data.forget_check();
1381                        self.all_by_sender
1382                            .insert(sender, (keep_queue, currency_reserved));
1383                        self.remove_applicable_exn(&first_cmd);
1384                        Self::map_set_insert(
1385                            &mut self.applicable_by_fee,
1386                            first_kept_unchecked.fee_per_wu(),
1387                            first_kept,
1388                        );
1389                    }
1390                }
1391                self.update_remove_all_by_fee_and_hash_and_expiration(to_drop.clone());
1392                dropped.extend(to_drop);
1393            }
1394        }
1395
1396        Ok(dropped)
1397    }
1398
1399    // TODO(adonagy): clones too expensive? Optimize
1400    /// Same as `transactions`, but does not modify the mempool
1401    fn list_includable_transactions(&self, limit: usize) -> Vec<ValidCommandWithHash> {
1402        let mut txns = Vec::with_capacity(self.applicable_by_fee.len());
1403
1404        // get a copy of the maps as we are just listing the transactions
1405        let mut applicable_by_fee = self.applicable_by_fee.clone();
1406        let mut all_by_sender = self.all_by_sender.clone();
1407
1408        while !applicable_by_fee.is_empty() && txns.len() < limit {
1409            let (fee, mut set) = applicable_by_fee
1410                .iter()
1411                .max_by_key(|(rate, _)| *rate)
1412                .map(|(rate, set)| (rate.clone(), set.clone()))
1413                .unwrap();
1414
1415            // TODO: Check if OCaml compare using `hash` (order)
1416            let txn = set.iter().min_by_key(|b| &b.hash).cloned().unwrap();
1417
1418            {
1419                set.remove(&txn);
1420                if set.is_empty() {
1421                    applicable_by_fee.remove(&fee);
1422                } else {
1423                    applicable_by_fee.insert(fee, set);
1424                }
1425            }
1426
1427            let sender = txn.data.forget_check().fee_payer();
1428
1429            let (sender_queue, _amount) = all_by_sender.get_mut(&sender).unwrap();
1430            let head_txn = sender_queue.pop_front().unwrap();
1431
1432            if txn.hash == head_txn.hash {
1433                match sender_queue.front().cloned() {
1434                    None => {
1435                        all_by_sender.remove(&sender);
1436                    }
1437                    Some(next_txn) => {
1438                        let fee = next_txn.data.forget_check().fee_per_wu();
1439                        applicable_by_fee.entry(fee).or_default().insert(next_txn);
1440                    }
1441                }
1442            } else {
1443                openmina_core::warn!(
1444                    openmina_core::log::system_time();
1445                    kind = "transaction pool", message = "Sender queue is malformed");
1446                all_by_sender.remove(&sender);
1447            }
1448
1449            txns.push(txn);
1450        }
1451        txns
1452    }
1453
1454    // TODO(adonagy): Is it neede to remove txs from the pool directly here? If the produced block is injected
1455    // a BestTip update action will be dispatched and the pool can reorganize there
1456    /// Returns a sequence of commands in the pool in descending fee order
1457    fn transactions(&mut self, limit: usize) -> Vec<ValidCommandWithHash> {
1458        let mut txns = Vec::with_capacity(self.applicable_by_fee.len());
1459        loop {
1460            if self.applicable_by_fee.is_empty() {
1461                assert!(self.all_by_sender.is_empty());
1462                return txns;
1463            }
1464
1465            if txns.len() >= limit {
1466                return txns;
1467            }
1468
1469            let (fee, mut set) = self
1470                .applicable_by_fee
1471                .iter()
1472                .max_by_key(|(rate, _)| *rate)
1473                .map(|(rate, set)| (rate.clone(), set.clone()))
1474                .unwrap();
1475
1476            // TODO: Check if OCaml compare using `hash` (order)
1477            let txn = set.iter().min_by_key(|b| &b.hash).cloned().unwrap();
1478
1479            {
1480                set.remove(&txn);
1481                if set.is_empty() {
1482                    self.applicable_by_fee.remove(&fee);
1483                } else {
1484                    self.applicable_by_fee.insert(fee, set);
1485                }
1486            }
1487
1488            let sender = txn.data.forget_check().fee_payer();
1489
1490            let (sender_queue, _amount) = self.all_by_sender.get_mut(&sender).unwrap();
1491            let head_txn = sender_queue.pop_front().unwrap();
1492
1493            if txn.hash == head_txn.hash {
1494                match sender_queue.front().cloned() {
1495                    None => {
1496                        self.all_by_sender.remove(&sender);
1497                    }
1498                    Some(next_txn) => {
1499                        let fee = next_txn.data.forget_check().fee_per_wu();
1500                        self.applicable_by_fee
1501                            .entry(fee)
1502                            .or_default()
1503                            .insert(next_txn);
1504                    }
1505                }
1506            } else {
1507                openmina_core::warn!(
1508                    openmina_core::log::system_time();
1509                    kind = "transaction pool", message = "Sender queue is malformed");
1510                self.all_by_sender.remove(&sender);
1511            }
1512
1513            txns.push(txn);
1514        }
1515    }
1516
1517    /// Returns all the transactions in the pool
1518    fn get_all_transactions(&self) -> Vec<ValidCommandWithHash> {
1519        self.all_by_sender
1520            .values()
1521            .cloned()
1522            .flat_map(|(cmds, _)| cmds.into_iter())
1523            .collect()
1524    }
1525
1526    fn get_pending_amount_and_nonce(&self) -> HashMap<AccountId, (Option<Nonce>, Amount)> {
1527        // TODO(adonagy): clone too expensive here?
1528        self.all_by_sender
1529            .clone()
1530            .into_iter()
1531            .map(|(acc_id, (cmds, amount))| (acc_id, (cmds.back().unwrap().data.nonce(), amount)))
1532            .collect()
1533    }
1534}
1535
1536fn currency_consumed(cmd: &UserCommand) -> Result<Amount, CommandError> {
1537    use crate::scan_state::transaction_logic::signed_command::{Body::*, PaymentPayload};
1538
1539    let fee_amount = Amount::of_fee(&cmd.fee());
1540    let amount = match cmd {
1541        UserCommand::SignedCommand(c) => {
1542            match &c.payload.body {
1543                Payment(PaymentPayload { amount, .. }) => {
1544                    // The fee-payer is also the sender account, include the amount.
1545                    *amount
1546                }
1547                StakeDelegation(_) => Amount::zero(),
1548            }
1549        }
1550        UserCommand::ZkAppCommand(_) => Amount::zero(),
1551    };
1552
1553    fee_amount
1554        .checked_add(&amount)
1555        .ok_or(CommandError::InvalidCurrencyConsumed)
1556}
1557
1558pub mod transaction_hash {
1559    use super::*;
1560
1561    pub fn hash_command(cmd: valid::UserCommand) -> ValidCommandWithHash {
1562        let hash = match &cmd {
1563            valid::UserCommand::SignedCommand(cmd) => {
1564                v2::MinaBaseSignedCommandStableV2::from(&**cmd)
1565                    .hash()
1566                    .unwrap()
1567            }
1568            valid::UserCommand::ZkAppCommand(cmd) => {
1569                let cmd = cmd.clone().forget();
1570                v2::MinaBaseZkappCommandTStableV1WireStableV1::from(&cmd)
1571                    .hash()
1572                    .unwrap()
1573            }
1574        };
1575
1576        WithHash { data: cmd, hash }
1577    }
1578}
1579
1580#[derive(Debug)]
1581pub enum ApplyDecision {
1582    Accept,
1583    Reject,
1584}
1585
1586const MAX_PER_15_SECONDS: usize = 10;
1587
1588#[derive(Clone, Debug, Serialize, Deserialize)]
1589pub struct TransactionPool {
1590    pub pool: IndexedPool,
1591    locally_generated_uncommitted: HashMap<ValidCommandWithHash, (redux::Timestamp, Batch)>,
1592    locally_generated_committed: HashMap<ValidCommandWithHash, (redux::Timestamp, Batch)>,
1593    current_batch: usize,
1594    remaining_in_batch: usize,
1595    pub config: Config,
1596    batcher: (),
1597    best_tip_diff_relay: Option<()>,
1598    verification_key_table: VkRefcountTable,
1599}
1600
1601impl TransactionPool {
1602    pub fn new(config: Config, consensus_constants: &ConsensusConstants) -> Self {
1603        Self {
1604            pool: IndexedPool::new(consensus_constants),
1605            locally_generated_uncommitted: Default::default(),
1606            locally_generated_committed: Default::default(),
1607            current_batch: 0,
1608            remaining_in_batch: 0,
1609            config,
1610            batcher: (),
1611            best_tip_diff_relay: None,
1612            verification_key_table: Default::default(),
1613        }
1614    }
1615
1616    pub fn size(&self) -> usize {
1617        self.pool.size()
1618    }
1619
1620    pub fn get_all_transactions(&self) -> Vec<ValidCommandWithHash> {
1621        self.pool.get_all_transactions()
1622    }
1623
1624    pub fn get_pending_amount_and_nonce(&self) -> HashMap<AccountId, (Option<Nonce>, Amount)> {
1625        self.pool.get_pending_amount_and_nonce()
1626    }
1627
1628    pub fn transactions(&mut self, limit: usize) -> Vec<ValidCommandWithHash> {
1629        self.pool.transactions(limit)
1630    }
1631
1632    pub fn list_includable_transactions(&self, limit: usize) -> Vec<ValidCommandWithHash> {
1633        self.pool.list_includable_transactions(limit)
1634    }
1635
1636    pub fn get_accounts_to_revalidate_on_new_best_tip(&self) -> BTreeSet<AccountId> {
1637        self.pool.all_by_sender.keys().cloned().collect()
1638    }
1639
1640    pub fn on_new_best_tip(
1641        &mut self,
1642        global_slot_since_genesis: Slot,
1643        accounts: &BTreeMap<AccountId, Account>,
1644    ) -> Result<Vec<ValidCommandWithHash>, CommandError> {
1645        let dropped = self.pool.revalidate(
1646            global_slot_since_genesis,
1647            RevalidateKind::EntirePool,
1648            |sender_id| {
1649                Some(
1650                    accounts
1651                        .get(sender_id)
1652                        .cloned()
1653                        .unwrap_or_else(Account::empty),
1654                )
1655            },
1656        )?;
1657
1658        let dropped_locally_generated = dropped
1659            .iter()
1660            .filter(|cmd| {
1661                let dropped_commited = self.locally_generated_committed.remove(cmd).is_some();
1662                let dropped_uncommited = self.locally_generated_uncommitted.remove(cmd).is_some();
1663                // Nothing should be in both tables.
1664                assert!(!(dropped_commited && dropped_uncommited));
1665                dropped_commited || dropped_uncommited
1666            })
1667            .collect::<Vec<_>>();
1668
1669        if !dropped_locally_generated.is_empty() {
1670            openmina_core::info!(
1671                openmina_core::log::system_time();
1672                kind = "transaction pool",
1673                message = "Dropped locally generated commands $cmds from pool when transition frontier was recreated.",
1674                dropped = format!("{dropped_locally_generated:?}")
1675            );
1676        }
1677
1678        Ok(dropped)
1679    }
1680
1681    fn has_sufficient_fee(&self, pool_max_size: usize, cmd: &valid::UserCommand) -> bool {
1682        match self.pool.min_fee() {
1683            None => true,
1684            Some(min_fee) => {
1685                if self.pool.size() >= pool_max_size {
1686                    cmd.forget_check().fee_per_wu() > min_fee
1687                } else {
1688                    true
1689                }
1690            }
1691        }
1692    }
1693
1694    fn drop_until_below_max_size(
1695        &mut self,
1696        pool_max_size: usize,
1697    ) -> Result<Vec<ValidCommandWithHash>, CommandError> {
1698        let mut list = Vec::new();
1699
1700        while self.pool.size() > pool_max_size {
1701            let dropped = self.pool.remove_lowest_fee()?;
1702            my_assert(!dropped.is_empty())?;
1703            list.extend(dropped)
1704        }
1705
1706        Ok(list)
1707    }
1708
1709    pub fn get_accounts_to_handle_transition_diff(
1710        &self,
1711        diff: &diff::BestTipDiff,
1712    ) -> (BTreeSet<AccountId>, BTreeSet<AccountId>) {
1713        let diff::BestTipDiff {
1714            new_commands,
1715            removed_commands,
1716            reorg_best_tip: _,
1717        } = diff;
1718
1719        let in_cmds = new_commands
1720            .iter()
1721            .chain(removed_commands)
1722            .flat_map(|cmd| cmd.data.forget_check().accounts_referenced())
1723            .collect::<BTreeSet<_>>();
1724
1725        let uncommitted = self
1726            .locally_generated_uncommitted
1727            .keys()
1728            .map(|cmd| cmd.data.fee_payer())
1729            .collect::<BTreeSet<_>>();
1730
1731        (in_cmds, uncommitted)
1732    }
1733
1734    pub fn handle_transition_frontier_diff(
1735        &mut self,
1736        global_slot_since_genesis: Slot,
1737        current_global_slot: Slot,
1738        diff: &diff::BestTipDiff,
1739        account_ids: &BTreeSet<AccountId>,
1740        accounts: &BTreeMap<AccountId, Account>,
1741        uncommited: &BTreeMap<AccountId, Account>,
1742    ) -> Result<(), String> {
1743        let diff::BestTipDiff {
1744            new_commands,
1745            removed_commands,
1746            reorg_best_tip: _,
1747        } = diff;
1748
1749        // Remove duplicates
1750        let (new_commands, removed_commands) = {
1751            let collect_hashed = |cmds: &[WithStatus<valid::UserCommand>]| {
1752                cmds.iter()
1753                    .map(|cmd| transaction_hash::hash_command(cmd.data.clone()))
1754                    .collect::<Vec<_>>()
1755            };
1756
1757            let mut new_commands = collect_hashed(new_commands);
1758            let mut removed_commands = collect_hashed(removed_commands);
1759
1760            #[allow(clippy::mutable_key_type)]
1761            let new_commands_set = new_commands.iter().collect::<HashSet<_>>();
1762            #[allow(clippy::mutable_key_type)]
1763            let removed_commands_set = removed_commands.iter().collect::<HashSet<_>>();
1764
1765            #[allow(clippy::mutable_key_type)]
1766            let duplicates = new_commands_set
1767                .intersection(&removed_commands_set)
1768                .map(|cmd| (*cmd).clone())
1769                .collect::<HashSet<_>>();
1770
1771            new_commands.retain(|cmd| !duplicates.contains(cmd));
1772            removed_commands.retain(|cmd| !duplicates.contains(cmd));
1773            (new_commands, removed_commands)
1774        };
1775
1776        let pool_max_size = self.config.pool_max_size;
1777
1778        self.verification_key_table.increment_list(&new_commands);
1779        self.verification_key_table
1780            .decrement_list(&removed_commands);
1781
1782        let mut dropped_backtrack = Vec::with_capacity(256);
1783        for cmd in removed_commands {
1784            if let Some(time_added) = self.locally_generated_committed.remove(&cmd) {
1785                self.locally_generated_uncommitted
1786                    .insert(cmd.clone(), time_added);
1787            }
1788
1789            let dropped_seq = match self.pool.add_from_backtrack(
1790                global_slot_since_genesis,
1791                current_global_slot,
1792                cmd,
1793            ) {
1794                Ok(_) => self.drop_until_below_max_size(pool_max_size)?,
1795                Err(e) => return Err(format!("{:?}", e)),
1796            };
1797            dropped_backtrack.extend(dropped_seq);
1798        }
1799
1800        self.verification_key_table
1801            .decrement_hashed(&dropped_backtrack);
1802
1803        let locally_generated_dropped = dropped_backtrack
1804            .iter()
1805            .filter(|t| self.locally_generated_uncommitted.contains_key(t))
1806            .collect::<Vec<_>>();
1807
1808        let dropped_commands = {
1809            let accounts_to_check = account_ids;
1810            let existing_account_states_by_id = accounts;
1811
1812            let get_account = |id: &AccountId| {
1813                match existing_account_states_by_id.get(id) {
1814                    Some(account) => Some(account.clone()),
1815                    None => {
1816                        if accounts_to_check.contains(id) {
1817                            Some(Account::empty())
1818                        } else {
1819                            None
1820                            // OCaml panic too, with same message
1821                            // panic!(
1822                            //     "did not expect Indexed_pool.revalidate to call \
1823                            //         get_account on account not in accounts_to_check"
1824                            // )
1825                        }
1826                    }
1827                }
1828            };
1829
1830            self.pool.revalidate(
1831                global_slot_since_genesis,
1832                RevalidateKind::Subset(accounts_to_check),
1833                get_account,
1834            )?
1835        };
1836
1837        let (committed_commands, dropped_commit_conflicts): (Vec<_>, Vec<_>) = {
1838            let command_hashes: HashSet<v2::TransactionHash> =
1839                new_commands.iter().map(|cmd| cmd.hash.clone()).collect();
1840
1841            dropped_commands
1842                .iter()
1843                .partition(|cmd| command_hashes.contains(&cmd.hash))
1844        };
1845
1846        for cmd in &committed_commands {
1847            self.verification_key_table.decrement_hashed([&**cmd]);
1848            if let Some(data) = self.locally_generated_uncommitted.remove(cmd) {
1849                let old = self
1850                    .locally_generated_committed
1851                    .insert((*cmd).clone(), data);
1852                my_assert(old.is_none())?;
1853            };
1854        }
1855
1856        let _commit_conflicts_locally_generated = dropped_commit_conflicts
1857            .iter()
1858            .filter(|cmd| self.locally_generated_uncommitted.remove(cmd).is_some());
1859
1860        for cmd in locally_generated_dropped {
1861            // If the dropped transaction was included in the winning chain, it'll
1862            // be in locally_generated_committed. If it wasn't, try re-adding to
1863            // the pool.
1864            let remove_cmd = |this: &mut Self| {
1865                this.verification_key_table.decrement_hashed([cmd]);
1866                assert!(this.locally_generated_uncommitted.remove(cmd).is_some());
1867            };
1868
1869            if !self.locally_generated_committed.contains_key(cmd) {
1870                if !self.has_sufficient_fee(pool_max_size, &cmd.data) {
1871                    remove_cmd(self)
1872                } else {
1873                    let unchecked = &cmd.data;
1874                    match uncommited.get(&unchecked.fee_payer()) {
1875                        Some(account) => {
1876                            match self.pool.add_from_gossip_exn(
1877                                global_slot_since_genesis,
1878                                current_global_slot,
1879                                cmd,
1880                                account.nonce,
1881                                account.liquid_balance_at_slot(global_slot_since_genesis),
1882                            ) {
1883                                Err(_) => {
1884                                    remove_cmd(self);
1885                                }
1886                                Ok(_) => {
1887                                    self.verification_key_table.increment_hashed([cmd]);
1888                                }
1889                            }
1890                        }
1891                        None => {
1892                            remove_cmd(self);
1893                        }
1894                    }
1895                }
1896            }
1897        }
1898
1899        let expired_commands = self.pool.remove_expired(global_slot_since_genesis)?;
1900        for cmd in &expired_commands {
1901            self.verification_key_table.decrement_hashed([cmd]);
1902            self.locally_generated_uncommitted.remove(cmd);
1903        }
1904
1905        Ok(())
1906    }
1907
1908    pub fn get_accounts_to_apply_diff(&self, diff: &diff::DiffVerified) -> BTreeSet<AccountId> {
1909        let fee_payer = |cmd: &ValidCommandWithHash| cmd.data.fee_payer();
1910        diff.list.iter().map(fee_payer).collect()
1911    }
1912
1913    fn apply(
1914        &mut self,
1915        time: redux::Timestamp,
1916        global_slot_since_genesis: Slot,
1917        current_global_slot: Slot,
1918        diff: &diff::DiffVerified,
1919        accounts: &BTreeMap<AccountId, Account>,
1920        is_sender_local: bool,
1921    ) -> Result<
1922        (
1923            ApplyDecision,
1924            Vec<ValidCommandWithHash>,
1925            Vec<(ValidCommandWithHash, diff::Error)>,
1926            HashSet<v2::TransactionHash>,
1927        ),
1928        String,
1929    > {
1930        let fee_payer = |cmd: &ValidCommandWithHash| cmd.data.fee_payer();
1931        let fee_payer_accounts = accounts;
1932
1933        let check_command = |pool: &IndexedPool, cmd: &ValidCommandWithHash| {
1934            if pool.member(cmd) {
1935                Err(diff::Error::Duplicate)
1936            } else {
1937                match fee_payer_accounts.get(&fee_payer(cmd)) {
1938                    None => Err(diff::Error::FeePayerAccountNotFound),
1939                    Some(account) => {
1940                        if account.has_permission_to_send()
1941                            && account.has_permission_to_increment_nonce()
1942                        {
1943                            Ok(())
1944                        } else {
1945                            Err(diff::Error::FeePayerNotPermittedToSend)
1946                        }
1947                    }
1948                }
1949            }
1950        };
1951
1952        let add_results = diff
1953            .list
1954            .iter()
1955            .map(|cmd| {
1956                let account = fee_payer_accounts
1957                    .get(&fee_payer(cmd))
1958                    .ok_or_else(|| "Fee payer not found".to_string())?;
1959
1960                let result: Result<_, diff::Error> = (|| {
1961                    check_command(&self.pool, cmd)?;
1962
1963                    match self.pool.add_from_gossip_exn(
1964                        global_slot_since_genesis,
1965                        current_global_slot,
1966                        cmd,
1967                        account.nonce,
1968                        account.liquid_balance_at_slot(global_slot_since_genesis),
1969                    ) {
1970                        Ok(x) => Ok(x),
1971                        Err(e) => Err(e.into()),
1972                    }
1973                })();
1974
1975                match result {
1976                    Ok((cmd, dropped)) => Ok(Ok((cmd, dropped))),
1977                    Err(err) => Ok(Err((cmd, err))),
1978                }
1979            })
1980            .collect::<Result<Vec<Result<_, _>>, String>>()?;
1981
1982        let added_cmds = add_results
1983            .iter()
1984            .filter_map(|cmd| match cmd {
1985                Ok((cmd, _)) => Some(cmd),
1986                Err(_) => None,
1987            })
1988            .collect::<Vec<_>>();
1989
1990        let dropped_for_add = add_results
1991            .iter()
1992            .filter_map(|cmd| match cmd {
1993                Ok((_, dropped)) => Some(dropped),
1994                Err(_) => None,
1995            })
1996            .flatten()
1997            .collect::<Vec<_>>();
1998
1999        let dropped_for_size = self.drop_until_below_max_size(self.config.pool_max_size)?;
2000
2001        let all_dropped_cmds = dropped_for_add
2002            .iter()
2003            .copied()
2004            .chain(dropped_for_size.iter())
2005            .collect::<Vec<_>>();
2006
2007        {
2008            self.verification_key_table.increment_hashed(added_cmds);
2009            self.verification_key_table
2010                .decrement_hashed(all_dropped_cmds.iter().copied());
2011        };
2012
2013        let dropped_for_add_hashes: HashSet<&v2::TransactionHash> =
2014            dropped_for_add.iter().map(|cmd| &cmd.hash).collect();
2015        let dropped_for_size_hashes: HashSet<&v2::TransactionHash> =
2016            dropped_for_size.iter().map(|cmd| &cmd.hash).collect();
2017        let all_dropped_cmd_hashes: HashSet<v2::TransactionHash> = dropped_for_add_hashes
2018            .union(&dropped_for_size_hashes)
2019            .map(|hash| (*hash).clone())
2020            .collect();
2021
2022        // let locally_generated_dropped = all_dropped_cmds
2023        //     .iter()
2024        //     .filter(|cmd| self.locally_generated_uncommitted.remove(cmd).is_some());
2025
2026        if is_sender_local {
2027            for result in add_results.iter() {
2028                let Ok((cmd, _dropped)) = result else {
2029                    continue;
2030                };
2031                if !all_dropped_cmd_hashes.contains(&cmd.hash) {
2032                    self.register_locally_generated(time, cmd);
2033                }
2034            }
2035        }
2036
2037        let mut accepted = Vec::with_capacity(128);
2038        let mut rejected = Vec::with_capacity(128);
2039
2040        // TODO: Re-work this to avoid cloning ?
2041        for result in &add_results {
2042            match result {
2043                Ok((cmd, _dropped)) => {
2044                    if all_dropped_cmd_hashes.contains(&cmd.hash) {
2045                        // ignored (dropped)
2046                    } else {
2047                        accepted.push(cmd.clone());
2048                    }
2049                }
2050                Err((cmd, error)) => {
2051                    rejected.push(((*cmd).clone(), error.clone()));
2052                }
2053            }
2054        }
2055
2056        let decision = if rejected
2057            .iter()
2058            .any(|(_, error)| error.grounds_for_diff_rejection())
2059        {
2060            ApplyDecision::Reject
2061        } else {
2062            ApplyDecision::Accept
2063        };
2064
2065        Ok((decision, accepted, rejected, all_dropped_cmd_hashes))
2066    }
2067
2068    pub fn unsafe_apply(
2069        &mut self,
2070        time: redux::Timestamp,
2071        global_slot_since_genesis: Slot,
2072        current_global_slot: Slot,
2073        diff: &diff::DiffVerified,
2074        accounts: &BTreeMap<AccountId, Account>,
2075        is_sender_local: bool,
2076    ) -> Result<
2077        (
2078            ApplyDecision,
2079            Vec<ValidCommandWithHash>,
2080            Vec<(ValidCommandWithHash, diff::Error)>,
2081            HashSet<v2::TransactionHash>,
2082        ),
2083        String,
2084    > {
2085        let (decision, accepted, rejected, dropped) = self.apply(
2086            time,
2087            global_slot_since_genesis,
2088            current_global_slot,
2089            diff,
2090            accounts,
2091            is_sender_local,
2092        )?;
2093        Ok((decision, accepted, rejected, dropped))
2094    }
2095
2096    fn register_locally_generated(&mut self, time: redux::Timestamp, cmd: &ValidCommandWithHash) {
2097        match self.locally_generated_uncommitted.entry(cmd.clone()) {
2098            Entry::Occupied(mut entry) => {
2099                let (entry_time, _batch_num) = entry.get_mut();
2100                *entry_time = redux::Timestamp::global_now();
2101            }
2102            Entry::Vacant(entry) => {
2103                let batch_num = if self.remaining_in_batch > 0 {
2104                    self.remaining_in_batch -= 1;
2105                    self.current_batch
2106                } else {
2107                    self.remaining_in_batch = MAX_PER_15_SECONDS - 1;
2108                    self.current_batch += 1;
2109                    self.current_batch
2110                };
2111                entry.insert((time, Batch::Of(batch_num)));
2112            }
2113        }
2114    }
2115
2116    pub fn prevalidate(&self, diff: diff::Diff) -> Result<diff::Diff, TransactionPoolErrors> {
2117        let well_formedness_errors: HashSet<_> = diff
2118            .list
2119            .iter()
2120            .flat_map(|cmd| match cmd.check_well_formedness() {
2121                Ok(()) => Vec::new(),
2122                Err(errors) => errors,
2123            })
2124            .collect();
2125
2126        if !well_formedness_errors.is_empty() {
2127            return Err(TransactionPoolErrors::BatchedErrors(
2128                well_formedness_errors
2129                    .into_iter()
2130                    .map(TransactionError::WellFormedness)
2131                    .collect_vec(),
2132            ));
2133        }
2134
2135        Ok(diff)
2136    }
2137
2138    pub fn convert_diff_to_verifiable(
2139        &self,
2140        diff: diff::Diff,
2141        accounts: &BTreeMap<AccountId, Account>,
2142    ) -> Result<Vec<WithStatus<verifiable::UserCommand>>, TransactionPoolErrors> {
2143        let cs = diff
2144            .list
2145            .iter()
2146            .cloned()
2147            .map(|cmd| MaybeWithStatus { cmd, status: None })
2148            .collect::<Vec<_>>();
2149
2150        let diff = UserCommand::to_all_verifiable::<FromUnappliedSequence, _>(cs, |account_ids| {
2151            let mempool_vks: HashMap<_, _> = account_ids
2152                .iter()
2153                .map(|id| {
2154                    let vks = self.verification_key_table.find_vks_by_account_id(id);
2155                    let vks: HashMap<_, _> =
2156                        vks.iter().map(|vk| (vk.hash(), (*vk).clone())).collect();
2157                    (id.clone(), vks)
2158                })
2159                .collect();
2160
2161            let ledger_vks = UserCommand::load_vks_from_ledger_accounts(accounts);
2162            let ledger_vks: HashMap<_, _> = ledger_vks
2163                .into_iter()
2164                .map(|(id, vk)| {
2165                    let mut map = HashMap::new();
2166                    map.insert(vk.hash(), vk);
2167                    (id, map)
2168                })
2169                .collect();
2170
2171            let new_map: HashMap<AccountId, HashMap<Fp, VerificationKeyWire>> = HashMap::new();
2172            let merged =
2173                mempool_vks
2174                    .into_iter()
2175                    .chain(ledger_vks)
2176                    .fold(new_map, |mut accum, (id, map)| {
2177                        let entry = accum.entry(id).or_default();
2178                        for (hash, vk) in map {
2179                            entry.insert(hash, vk);
2180                        }
2181                        accum
2182                    });
2183
2184            from_unapplied_sequence::Cache::new(merged)
2185        })
2186        .map_err(TransactionPoolErrors::LoadingVK)?;
2187
2188        let diff = diff
2189            .into_iter()
2190            .map(|MaybeWithStatus { cmd, status: _ }| WithStatus {
2191                data: cmd,
2192                // TODO: is this correct?
2193                status: Applied,
2194            })
2195            .collect::<Vec<_>>();
2196
2197        Ok(diff)
2198    }
2199
2200    pub fn verify_proofs(
2201        &self,
2202        diff: Vec<WithStatus<verifiable::UserCommand>>,
2203    ) -> Result<Vec<valid::UserCommand>, TransactionPoolErrors> {
2204        let (verified, invalid): (Vec<_>, Vec<_>) = Verifier
2205            .verify_commands(diff, None)
2206            .into_iter()
2207            .partition(Result::is_ok);
2208
2209        let verified: Vec<_> = verified.into_iter().map(Result::unwrap).collect();
2210        let invalid: Vec<_> = invalid.into_iter().map(Result::unwrap_err).collect();
2211
2212        if !invalid.is_empty() {
2213            let transaction_pool_errors = invalid
2214                .into_iter()
2215                .map(TransactionError::Verifier)
2216                .collect();
2217            Err(TransactionPoolErrors::BatchedErrors(
2218                transaction_pool_errors,
2219            ))
2220        } else {
2221            Ok(verified)
2222        }
2223    }
2224
2225    fn get_rebroadcastable<F>(&mut self, has_timed_out: F) -> Vec<Vec<UserCommand>>
2226    where
2227        F: Fn(&redux::Timestamp) -> bool,
2228    {
2229        let log = |has_timed_out: bool, s: &str, cmd: &ValidCommandWithHash| -> bool {
2230            if has_timed_out {
2231                eprintln!("{}: {:?}", s, cmd);
2232                false
2233            } else {
2234                true
2235            }
2236        };
2237
2238        self.locally_generated_uncommitted
2239            .retain(|key, (time, _batch)| {
2240                log(
2241                    has_timed_out(time),
2242                    "No longer rebroadcasting uncommitted expired command",
2243                    key,
2244                )
2245            });
2246        self.locally_generated_committed
2247            .retain(|key, (time, _batch)| {
2248                log(
2249                    has_timed_out(time),
2250                    "Removing committed locally generated expired command",
2251                    key,
2252                )
2253            });
2254
2255        let mut rebroadcastable_txs = self
2256            .locally_generated_uncommitted
2257            .iter()
2258            .collect::<Vec<_>>();
2259
2260        rebroadcastable_txs.sort_by(|(txn1, (_, batch1)), (txn2, (_, batch2))| {
2261            use std::cmp::Ordering::Equal;
2262
2263            let get_nonce =
2264                |txn: &ValidCommandWithHash| txn.data.forget_check().applicable_at_nonce();
2265
2266            match batch1.cmp(batch2) {
2267                Equal => (),
2268                x => return x,
2269            }
2270            match get_nonce(txn1).cmp(&get_nonce(txn2)) {
2271                Equal => (),
2272                x => return x,
2273            }
2274            txn1.hash.cmp(&txn2.hash)
2275        });
2276
2277        rebroadcastable_txs
2278            .into_iter()
2279            .group_by(|(_txn, (_time, batch))| batch)
2280            .into_iter()
2281            .map(|(_batch, group_txns)| {
2282                group_txns
2283                    .map(|(txn, _)| txn.data.forget_check())
2284                    .collect::<Vec<_>>()
2285            })
2286            .collect::<Vec<_>>()
2287    }
2288}
2289
2290#[cfg(test)]
2291mod tests {
2292    use super::*;
2293
2294    /// Make sure that the merge in `TransactionPool::verify` is correct
2295    #[test]
2296    fn test_map_merge() {
2297        let mut a = HashMap::new();
2298        a.insert(1, {
2299            let mut map = HashMap::new();
2300            map.insert(1, 10);
2301            map.insert(2, 12);
2302            map
2303        });
2304        let mut b = HashMap::new();
2305        b.insert(1, {
2306            let mut map = HashMap::new();
2307            map.insert(3, 20);
2308            map
2309        });
2310
2311        let new_map: HashMap<_, HashMap<_, _>> = HashMap::new();
2312        let merged = a
2313            .into_iter()
2314            .chain(b)
2315            .fold(new_map, |mut accum, (id, map)| {
2316                let entry = accum.entry(id).or_default();
2317                for (hash, vk) in map {
2318                    entry.insert(hash, vk);
2319                }
2320                accum
2321            });
2322
2323        let one = merged.get(&1).unwrap();
2324        assert!(one.get(&1).is_some());
2325        assert!(one.get(&2).is_some());
2326        assert!(one.get(&3).is_some());
2327
2328        dbg!(merged);
2329    }
2330}