mina_tree/
transaction_pool.rs

1use crate::{
2    scan_state::{
3        currency::{Amount, Balance, BlockTime, Fee, Magnitude, Nonce, Slot},
4        fee_rate::FeeRate,
5        transaction_logic::{
6            valid, verifiable,
7            zkapp_command::{
8                from_unapplied_sequence::{self, FromUnappliedSequence},
9                MaybeWithStatus, WithHash,
10            },
11            TransactionStatus::Applied,
12            UserCommand, WellFormednessError, WithStatus,
13        },
14    },
15    verifier::{Verifier, VerifierError},
16    Account, AccountId, BaseLedger, Mask, TokenId, VerificationKey, VerificationKeyWire,
17};
18use backtrace::Backtrace;
19use itertools::Itertools;
20use mina_core::{bug_condition, consensus::ConsensusConstants};
21use mina_curves::pasta::Fp;
22use mina_p2p_messages::{bigint::BigInt, v2};
23use serde::{Deserialize, Serialize};
24use std::{
25    borrow::{Borrow, Cow},
26    collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
27};
28
29#[derive(Debug, thiserror::Error)]
30pub enum TransactionPoolErrors {
31    /// Invalid transactions, rejeceted diffs, etc...
32    #[error("Transaction pool errors: {0:?}")]
33    BatchedErrors(Vec<TransactionError>),
34    #[error("{0:?}")]
35    LoadingVK(String),
36    /// Errors that should panic the node (bugs in implementation)
37    #[error("Unexpected error: {0}")]
38    Unexpected(String),
39}
40
41#[derive(Debug, thiserror::Error)]
42pub enum TransactionError {
43    #[error(transparent)]
44    Verifier(#[from] VerifierError),
45    #[error(transparent)]
46    WellFormedness(#[from] WellFormednessError),
47}
48
49impl From<String> for TransactionPoolErrors {
50    fn from(value: String) -> Self {
51        Self::Unexpected(value)
52    }
53}
54
55#[inline(never)]
56fn my_assert(v: bool) -> Result<(), CommandError> {
57    if !v {
58        let backtrace = Backtrace::new();
59        let s = format!("assert failed {:?}", backtrace);
60        bug_condition!("{:?}", s);
61        return Err(CommandError::Custom(Cow::Owned(s)));
62    }
63    Ok(())
64}
65
66mod consensus {
67    use crate::scan_state::currency::{BlockTimeSpan, Epoch, Length};
68
69    use super::*;
70
71    #[derive(Clone, Debug, Serialize, Deserialize)]
72    pub struct Constants {
73        k: Length,
74        delta: Length,
75        slots_per_sub_window: Length,
76        slots_per_window: Length,
77        sub_windows_per_window: Length,
78        slots_per_epoch: Length,
79        grace_period_slots: Length,
80        grace_period_end: Slot,
81        checkpoint_window_slots_per_year: Length,
82        checkpoint_window_size_in_slots: Length,
83        block_window_duration_ms: BlockTimeSpan,
84        slot_duration_ms: BlockTimeSpan,
85        epoch_duration: BlockTimeSpan,
86        delta_duration: BlockTimeSpan,
87        genesis_state_timestamp: BlockTime,
88    }
89
90    impl Constants {
91        // Keep this in sync with the implementation of ConsensusConstantsChecked::create
92        // in ledger/src/proofs/block.rs
93        pub fn create(constants: &ConsensusConstants) -> Self {
94            Constants {
95                k: Length::from_u32(constants.k),
96                delta: Length::from_u32(constants.delta),
97                slots_per_sub_window: Length::from_u32(constants.slots_per_sub_window),
98                slots_per_window: Length::from_u32(constants.slots_per_window),
99                sub_windows_per_window: Length::from_u32(constants.sub_windows_per_window),
100                slots_per_epoch: Length::from_u32(constants.slots_per_epoch),
101                grace_period_slots: Length::from_u32(constants.grace_period_slots),
102                grace_period_end: Slot::from_u32(constants.grace_period_end),
103                checkpoint_window_slots_per_year: Length::from_u32(
104                    constants.checkpoint_window_slots_per_year,
105                ),
106                checkpoint_window_size_in_slots: Length::from_u32(
107                    constants.checkpoint_window_size_in_slots,
108                ),
109                block_window_duration_ms: BlockTimeSpan::from_u64(
110                    constants.block_window_duration_ms,
111                ),
112                slot_duration_ms: BlockTimeSpan::from_u64(constants.slot_duration_ms),
113                epoch_duration: BlockTimeSpan::from_u64(constants.epoch_duration),
114                delta_duration: BlockTimeSpan::from_u64(constants.delta_duration),
115                genesis_state_timestamp: constants.genesis_state_timestamp.into(),
116            }
117        }
118    }
119
120    // Consensus epoch
121    impl Epoch {
122        fn of_time_exn(constants: &Constants, time: BlockTime) -> Result<Self, String> {
123            if time < constants.genesis_state_timestamp {
124                return Err(
125                    "Epoch.of_time: time is earlier than genesis block timestamp".to_string(),
126                );
127            }
128
129            let time_since_genesis = time.diff(constants.genesis_state_timestamp);
130            let epoch = time_since_genesis.to_ms() / constants.epoch_duration.to_ms();
131            let epoch: u32 = epoch.try_into().unwrap();
132
133            Ok(Self::from_u32(epoch))
134        }
135
136        fn start_time(constants: &Constants, epoch: Self) -> BlockTime {
137            let ms = constants
138                .genesis_state_timestamp
139                .to_span_since_epoch()
140                .to_ms()
141                + ((epoch.as_u32() as u64) * constants.epoch_duration.to_ms());
142            BlockTime::of_span_since_epoch(BlockTimeSpan::of_ms(ms))
143        }
144
145        pub fn epoch_and_slot_of_time_exn(
146            constants: &Constants,
147            time: BlockTime,
148        ) -> Result<(Self, Slot), String> {
149            let epoch = Self::of_time_exn(constants, time)?;
150            let time_since_epoch = time.diff(Self::start_time(constants, epoch));
151
152            let slot: u64 = time_since_epoch.to_ms() / constants.slot_duration_ms.to_ms();
153            let slot = Slot::from_u32(slot.try_into().unwrap());
154
155            Ok((epoch, slot))
156        }
157    }
158
159    /// TODO: Maybe rename to `ConsensusGlobalSlot` ?
160    pub struct GlobalSlot {
161        slot_number: Slot,
162        slots_per_epoch: Length,
163    }
164
165    impl GlobalSlot {
166        fn create(constants: &Constants, epoch: Epoch, slot: Slot) -> Self {
167            let slot_number = slot.as_u32() + (constants.slots_per_epoch.as_u32() * epoch.as_u32());
168            Self {
169                slot_number: Slot::from_u32(slot_number),
170                slots_per_epoch: constants.slots_per_epoch,
171            }
172        }
173
174        fn of_epoch_and_slot(constants: &Constants, (epoch, slot): (Epoch, Slot)) -> Self {
175            Self::create(constants, epoch, slot)
176        }
177
178        pub fn of_time_exn(constants: &Constants, time: BlockTime) -> Result<Self, String> {
179            Ok(Self::of_epoch_and_slot(
180                constants,
181                Epoch::epoch_and_slot_of_time_exn(constants, time)?,
182            ))
183        }
184
185        pub fn to_global_slot(&self) -> Slot {
186            let Self {
187                slot_number,
188                slots_per_epoch: _,
189            } = self;
190            *slot_number
191        }
192    }
193}
194
195/// Fee increase required to replace a transaction.
196const REPLACE_FEE: Fee = Fee::of_nanomina_int_exn(1);
197
198pub type ValidCommandWithHash = WithHash<valid::UserCommand, v2::TransactionHash>;
199
200pub mod diff {
201    use super::*;
202
203    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, strum_macros::Display)]
204    pub enum Error {
205        InsufficientReplaceFee,
206        Duplicate,
207        InvalidNonce,
208        InsufficientFunds,
209        Overflow,
210        BadToken,
211        UnwantedFeeToken,
212        Expired,
213        Overloaded,
214        FeePayerAccountNotFound,
215        FeePayerNotPermittedToSend,
216        AfterSlotTxEnd,
217        BacktrackNonceMismatch,
218        InvalidCurrencyConsumed,
219        Custom,
220    }
221
222    impl Error {
223        pub fn grounds_for_diff_rejection(&self) -> bool {
224            match self {
225                Error::InsufficientReplaceFee
226                | Error::Duplicate
227                | Error::InvalidNonce
228                | Error::InsufficientFunds
229                | Error::Expired
230                | Error::Overloaded
231                | Error::FeePayerAccountNotFound
232                | Error::FeePayerNotPermittedToSend
233                | Error::AfterSlotTxEnd
234                | Error::InvalidCurrencyConsumed
235                | Error::Custom
236                | Error::BacktrackNonceMismatch => false,
237                Error::Overflow | Error::BadToken | Error::UnwantedFeeToken => true,
238            }
239        }
240    }
241
242    #[derive(Debug)]
243    pub struct Diff {
244        pub list: Vec<UserCommand>,
245    }
246
247    #[derive(Serialize, Deserialize, Debug, Clone)]
248    pub struct DiffVerified {
249        pub list: Vec<ValidCommandWithHash>,
250    }
251
252    struct Rejected {
253        list: Vec<(UserCommand, Error)>,
254    }
255
256    #[derive(Serialize, Deserialize, Debug, Clone)]
257    pub struct BestTipDiff {
258        pub new_commands: Vec<WithStatus<valid::UserCommand>>,
259        pub removed_commands: Vec<WithStatus<valid::UserCommand>>,
260        pub reorg_best_tip: bool,
261    }
262}
263
264fn preload_accounts(
265    ledger: &Mask,
266    account_ids: &BTreeSet<AccountId>,
267) -> HashMap<AccountId, Account> {
268    account_ids
269        .iter()
270        .filter_map(|id| {
271            let addr = ledger.location_of_account(id)?;
272            let account = ledger.get(addr)?;
273            Some((id.clone(), *account))
274        })
275        .collect()
276}
277
278#[derive(Clone, Debug, Serialize, Deserialize)]
279pub struct Config {
280    pub trust_system: (),
281    pub pool_max_size: usize,
282    pub slot_tx_end: Option<Slot>,
283}
284
285/// Used to be able to de/serialize our `TransactionPool` in the state machine
286#[derive(Serialize, Deserialize)]
287struct VkRefcountTableBigInts {
288    verification_keys: Vec<(BigInt, (usize, WithHash<VerificationKey, BigInt>))>,
289    account_id_to_vks: Vec<(AccountId, Vec<(BigInt, usize)>)>,
290    vk_to_account_ids: Vec<(BigInt, Vec<(AccountId, usize)>)>,
291}
292impl From<VkRefcountTable> for VkRefcountTableBigInts {
293    fn from(value: VkRefcountTable) -> Self {
294        let VkRefcountTable {
295            verification_keys,
296            account_id_to_vks,
297            vk_to_account_ids,
298        } = value;
299        Self {
300            verification_keys: verification_keys
301                .into_iter()
302                .map(|(hash, (count, vk))| {
303                    assert_eq!(hash, vk.hash());
304                    let hash: BigInt = hash.into();
305                    (
306                        hash.clone(),
307                        (
308                            count,
309                            WithHash {
310                                data: vk.vk().clone(),
311                                hash,
312                            },
313                        ),
314                    )
315                })
316                .collect(),
317            account_id_to_vks: account_id_to_vks
318                .into_iter()
319                .map(|(id, map)| {
320                    (
321                        id,
322                        map.into_iter()
323                            .map(|(hash, count)| (hash.into(), count))
324                            .collect(),
325                    )
326                })
327                .collect(),
328            vk_to_account_ids: vk_to_account_ids
329                .into_iter()
330                .map(|(hash, map)| (hash.into(), map.into_iter().collect()))
331                .collect(),
332        }
333    }
334}
335impl From<VkRefcountTableBigInts> for VkRefcountTable {
336    fn from(value: VkRefcountTableBigInts) -> Self {
337        let VkRefcountTableBigInts {
338            verification_keys,
339            account_id_to_vks,
340            vk_to_account_ids,
341        } = value;
342        Self {
343            verification_keys: verification_keys
344                .into_iter()
345                .map(|(hash, (count, vk))| {
346                    assert_eq!(hash, vk.hash);
347                    let hash: Fp = hash.to_field().unwrap(); // We trust our serialized data
348                    (hash, (count, VerificationKeyWire::with_hash(vk.data, hash)))
349                })
350                .collect(),
351            account_id_to_vks: account_id_to_vks
352                .into_iter()
353                .map(|(id, map)| {
354                    let map = map
355                        .into_iter()
356                        .map(|(bigint, count)| (bigint.to_field::<Fp>().unwrap(), count)) // We trust our serialized data
357                        .collect();
358                    (id, map)
359                })
360                .collect(),
361            vk_to_account_ids: vk_to_account_ids
362                .into_iter()
363                .map(|(hash, map)| (hash.to_field().unwrap(), map.into_iter().collect())) // We trust our serialized data
364                .collect(),
365        }
366    }
367}
368
369#[derive(Clone, Debug, Default, Serialize, Deserialize)]
370#[serde(into = "VkRefcountTableBigInts")]
371#[serde(from = "VkRefcountTableBigInts")]
372struct VkRefcountTable {
373    verification_keys: HashMap<Fp, (usize, VerificationKeyWire)>,
374    account_id_to_vks: HashMap<AccountId, HashMap<Fp, usize>>,
375    vk_to_account_ids: HashMap<Fp, HashMap<AccountId, usize>>,
376}
377
378impl VkRefcountTable {
379    fn find_vk(&self, f: &Fp) -> Option<&(usize, VerificationKeyWire)> {
380        self.verification_keys.get(f)
381    }
382
383    fn find_vks_by_account_id(&self, account_id: &AccountId) -> Vec<&VerificationKeyWire> {
384        let Some(vks) = self.account_id_to_vks.get(account_id) else {
385            return Vec::new();
386        };
387
388        vks.iter()
389            .map(|(f, _)| self.find_vk(f).expect("malformed Vk_refcount_table.t"))
390            .map(|(_, vk)| vk)
391            .collect()
392    }
393
394    fn inc(&mut self, account_id: AccountId, vk: VerificationKeyWire) {
395        use std::collections::hash_map::Entry::{Occupied, Vacant};
396
397        match self.verification_keys.entry(vk.hash()) {
398            Vacant(e) => {
399                e.insert((1, vk.clone()));
400            }
401            Occupied(mut e) => {
402                let (count, _vk) = e.get_mut();
403                *count += 1;
404            }
405        }
406
407        let map = self
408            .account_id_to_vks
409            .entry(account_id.clone())
410            .or_default(); // or insert empty map
411        let count = map.entry(vk.hash()).or_default(); // or insert count 0
412        *count += 1;
413
414        let map = self.vk_to_account_ids.entry(vk.hash()).or_default(); // or insert empty map
415        let count = map.entry(account_id).or_default(); // or insert count 0
416        *count += 1;
417    }
418
419    fn dec(&mut self, account_id: AccountId, vk_hash: Fp) {
420        use std::collections::hash_map::Entry::{Occupied, Vacant};
421
422        match self.verification_keys.entry(vk_hash) {
423            Vacant(_e) => {
424                bug_condition!("vk_map: Unexpected error on self.verification_keys: vacant vk_hash")
425            }
426            Occupied(mut e) => {
427                let (count, _vk) = e.get_mut();
428                if *count == 1 {
429                    e.remove();
430                } else {
431                    *count = (*count).checked_sub(1).unwrap();
432                }
433            }
434        }
435
436        fn remove<K1, K2>(
437            key1: K1,
438            key2: K2,
439            table: &mut HashMap<K1, HashMap<K2, usize>>,
440        ) -> Result<(), &'static str>
441        where
442            K1: std::hash::Hash + Eq,
443            K2: std::hash::Hash + Eq,
444        {
445            match table.entry(key1) {
446                Vacant(_e) => return Err("vacant on key1"),
447                Occupied(mut e) => {
448                    let map = e.get_mut();
449                    match map.entry(key2) {
450                        Vacant(_e) => return Err("vacant on key2"),
451                        Occupied(mut e2) => {
452                            let count: &mut usize = e2.get_mut();
453                            if *count == 1 {
454                                e2.remove();
455                                e.remove();
456                            } else {
457                                *count = count.checked_sub(1).ok_or("invalid count state")?
458                            }
459                        }
460                    }
461                }
462            }
463            Ok(())
464        }
465
466        if let Err(e) = remove(account_id.clone(), vk_hash, &mut self.account_id_to_vks) {
467            bug_condition!(
468                "vk_map: Unexpected error on self.account_id_to_vks: {:?}",
469                e
470            );
471        }
472        if let Err(e) = remove(vk_hash, account_id.clone(), &mut self.vk_to_account_ids) {
473            bug_condition!(
474                "vk_map: Unexpected error on self.vk_to_account_ids: {:?}",
475                e
476            );
477        }
478    }
479
480    fn decrement_list(&mut self, list: &[ValidCommandWithHash]) {
481        list.iter().for_each(|c| {
482            for (id, vk) in c.data.forget_check().extract_vks() {
483                self.dec(id, vk.hash());
484            }
485        });
486    }
487
488    fn decrement_hashed<I, V>(&mut self, list: I)
489    where
490        I: IntoIterator<Item = V>,
491        V: Borrow<ValidCommandWithHash>,
492    {
493        list.into_iter().for_each(|c| {
494            for (id, vk) in c.borrow().data.forget_check().extract_vks() {
495                self.dec(id, vk.hash());
496            }
497        });
498    }
499
500    fn increment_hashed<I, V>(&mut self, list: I)
501    where
502        I: IntoIterator<Item = V>,
503        V: Borrow<ValidCommandWithHash>,
504    {
505        list.into_iter().for_each(|c| {
506            for (id, vk) in c.borrow().data.forget_check().extract_vks() {
507                self.inc(id, vk);
508            }
509        });
510    }
511
512    fn increment_list(&mut self, list: &[ValidCommandWithHash]) {
513        list.iter().for_each(|c| {
514            for (id, vk) in c.data.forget_check().extract_vks() {
515                self.inc(id, vk);
516            }
517        });
518    }
519}
520
521#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
522enum Batch {
523    Of(usize),
524}
525
526#[derive(Debug)]
527pub enum CommandError {
528    InvalidNonce {
529        account_nonce: Nonce,
530        expected: Nonce,
531    },
532    InsufficientFunds {
533        balance: Balance,
534        consumed: Amount,
535    },
536    /// NOTE: don't punish for this, attackers can induce nodes to banlist
537    ///       each other that way! *)
538    InsufficientReplaceFee {
539        replace_fee: Fee,
540        fee: Fee,
541    },
542    Overflow,
543    BadToken,
544    Expired {
545        valid_until: Slot,
546        global_slot_since_genesis: Slot,
547    },
548    UnwantedFeeToken {
549        token_id: TokenId,
550    },
551    AfterSlotTxEnd,
552    BacktrackNonceMismatch {
553        expected_nonce: Nonce,
554        first_nonce: Nonce,
555    },
556    InvalidCurrencyConsumed,
557    Custom(Cow<'static, str>),
558}
559
560impl From<CommandError> for String {
561    fn from(value: CommandError) -> Self {
562        format!("{:?}", value)
563    }
564}
565
566impl From<CommandError> for diff::Error {
567    fn from(value: CommandError) -> Self {
568        match value {
569            CommandError::InvalidNonce { .. } => diff::Error::InvalidNonce,
570            CommandError::InsufficientFunds { .. } => diff::Error::InsufficientFunds,
571            CommandError::InsufficientReplaceFee { .. } => diff::Error::InsufficientReplaceFee,
572            CommandError::Overflow => diff::Error::Overflow,
573            CommandError::BadToken => diff::Error::BadToken,
574            CommandError::Expired { .. } => diff::Error::Expired,
575            CommandError::UnwantedFeeToken { .. } => diff::Error::UnwantedFeeToken,
576            CommandError::AfterSlotTxEnd => diff::Error::AfterSlotTxEnd,
577            CommandError::BacktrackNonceMismatch { .. } => diff::Error::BacktrackNonceMismatch,
578            CommandError::InvalidCurrencyConsumed => diff::Error::InvalidCurrencyConsumed,
579            CommandError::Custom(_) => diff::Error::Custom,
580        }
581    }
582}
583
584#[derive(Clone, Debug, Serialize, Deserialize)]
585pub struct IndexedPoolConfig {
586    pub consensus_constants: consensus::Constants,
587    slot_tx_end: Option<Slot>,
588}
589
590#[derive(Clone, Debug, Serialize, Deserialize)]
591pub struct IndexedPool {
592    /// Transactions valid against the current ledger, indexed by fee per
593    /// weight unit.
594    applicable_by_fee: HashMap<FeeRate, HashSet<ValidCommandWithHash>>,
595    /// All pending transactions along with the total currency required to
596    /// execute them -- plus any currency spent from this account by
597    /// transactions from other accounts -- indexed by sender account.
598    /// Ordered by nonce inside the accounts.
599    all_by_sender: HashMap<AccountId, (VecDeque<ValidCommandWithHash>, Amount)>,
600    /// All transactions in the pool indexed by fee per weight unit.
601    all_by_fee: HashMap<FeeRate, HashSet<ValidCommandWithHash>>,
602    all_by_hash: HashMap<v2::TransactionHash, ValidCommandWithHash>,
603    /// Only transactions that have an expiry
604    transactions_with_expiration: HashMap<Slot, HashSet<ValidCommandWithHash>>,
605    size: usize,
606    pub config: IndexedPoolConfig,
607}
608
609enum Update {
610    Add {
611        command: ValidCommandWithHash,
612        fee_per_wu: FeeRate,
613        add_to_applicable_by_fee: bool,
614    },
615    RemoveAllByFeeAndHashAndExpiration {
616        commands: VecDeque<ValidCommandWithHash>,
617    },
618    RemoveFromApplicableByFee {
619        fee_per_wu: FeeRate,
620        command: ValidCommandWithHash,
621    },
622}
623
624#[derive(Clone)]
625struct SenderState {
626    sender: AccountId,
627    state: Option<(VecDeque<ValidCommandWithHash>, Amount)>,
628}
629
630pub enum RevalidateKind<'a> {
631    EntirePool,
632    Subset(&'a BTreeSet<AccountId>),
633}
634
635impl IndexedPool {
636    fn new(constants: &ConsensusConstants) -> Self {
637        Self {
638            applicable_by_fee: HashMap::new(),
639            all_by_sender: HashMap::new(),
640            all_by_fee: HashMap::new(),
641            all_by_hash: HashMap::new(),
642            transactions_with_expiration: HashMap::new(),
643            size: 0,
644            config: IndexedPoolConfig {
645                consensus_constants: consensus::Constants::create(constants),
646                slot_tx_end: None,
647            },
648        }
649    }
650
651    fn size(&self) -> usize {
652        self.size
653    }
654
655    fn min_fee(&self) -> Option<FeeRate> {
656        self.all_by_fee.keys().min().cloned()
657    }
658
659    fn member(&self, cmd: &ValidCommandWithHash) -> bool {
660        self.all_by_hash.contains_key(&cmd.hash)
661    }
662
663    pub fn get(&self, hash: &v2::TransactionHash) -> Option<&ValidCommandWithHash> {
664        self.all_by_hash.get(hash)
665    }
666
667    fn check_expiry(
668        &self,
669        global_slot_since_genesis: Slot,
670        cmd: &UserCommand,
671    ) -> Result<(), CommandError> {
672        let valid_until = cmd.valid_until();
673
674        if valid_until < global_slot_since_genesis {
675            return Err(CommandError::Expired {
676                valid_until,
677                global_slot_since_genesis,
678            });
679        }
680
681        Ok(())
682    }
683
684    /// Insert in a `HashMap<_, HashSet<_>>`
685    fn map_set_insert<K, V>(map: &mut HashMap<K, HashSet<V>>, key: K, value: V)
686    where
687        K: std::hash::Hash + PartialEq + Eq,
688        V: std::hash::Hash + PartialEq + Eq,
689    {
690        let entry = map.entry(key).or_default();
691        entry.insert(value);
692    }
693
694    /// Remove in a `HashMap<_, HashSet<_>>`
695    fn map_set_remove<K, V>(map: &mut HashMap<K, HashSet<V>>, key: K, value: &V)
696    where
697        K: std::hash::Hash + PartialEq + Eq,
698        V: std::hash::Hash + PartialEq + Eq,
699    {
700        let Entry::Occupied(mut entry) = map.entry(key) else {
701            return;
702        };
703        let set = entry.get_mut();
704        set.remove(value);
705        if set.is_empty() {
706            entry.remove();
707        }
708    }
709
710    fn update_expiration_map(&mut self, cmd: ValidCommandWithHash, is_add: bool) {
711        let user_cmd = cmd.data.forget_check();
712        let expiry = user_cmd.valid_until();
713        if expiry == Slot::max() {
714            return; // Do nothing
715        }
716        if is_add {
717            Self::map_set_insert(&mut self.transactions_with_expiration, expiry, cmd);
718        } else {
719            Self::map_set_remove(&mut self.transactions_with_expiration, expiry, &cmd);
720        }
721    }
722
723    fn remove_from_expiration_exn(&mut self, cmd: ValidCommandWithHash) {
724        self.update_expiration_map(cmd, false);
725    }
726
727    fn add_to_expiration(&mut self, cmd: ValidCommandWithHash) {
728        self.update_expiration_map(cmd, true);
729    }
730
731    /// Remove a command from the applicable_by_fee field. This may break an
732    /// invariant.
733    fn remove_applicable_exn(&mut self, cmd: &ValidCommandWithHash) {
734        let fee_per_wu = cmd.data.forget_check().fee_per_wu();
735        Self::map_set_remove(&mut self.applicable_by_fee, fee_per_wu, cmd);
736    }
737
738    fn make_queue<T>() -> VecDeque<T> {
739        VecDeque::with_capacity(256)
740    }
741
742    fn add_from_backtrack(
743        &mut self,
744        global_slot_since_genesis: Slot,
745        current_global_slot: Slot,
746        cmd: ValidCommandWithHash,
747    ) -> Result<(), CommandError> {
748        let IndexedPoolConfig { slot_tx_end, .. } = &self.config;
749
750        if !slot_tx_end
751            .as_ref()
752            .map(|slot_tx_end| current_global_slot < *slot_tx_end)
753            .unwrap_or(true)
754        {
755            return Err(CommandError::AfterSlotTxEnd);
756        }
757
758        let ValidCommandWithHash {
759            data: unchecked,
760            hash: cmd_hash,
761        } = &cmd;
762        let unchecked = unchecked.forget_check();
763
764        self.check_expiry(global_slot_since_genesis, &unchecked)?;
765
766        let fee_payer = unchecked.fee_payer();
767        let fee_per_wu = unchecked.fee_per_wu();
768
769        let consumed = currency_consumed(&unchecked)?;
770
771        match self.all_by_sender.get_mut(&fee_payer) {
772            None => {
773                {
774                    let mut queue = Self::make_queue();
775                    queue.push_back(cmd.clone());
776                    self.all_by_sender.insert(fee_payer, (queue, consumed));
777                }
778                Self::map_set_insert(&mut self.all_by_fee, fee_per_wu.clone(), cmd.clone());
779                self.all_by_hash.insert(cmd_hash.clone(), cmd.clone());
780                Self::map_set_insert(&mut self.applicable_by_fee, fee_per_wu.clone(), cmd.clone());
781                self.add_to_expiration(cmd);
782                self.size += 1;
783            }
784            Some((queue, currency_reserved)) => {
785                let first_queued = queue.front().cloned().unwrap();
786                let expected_nonce = unchecked.expected_target_nonce();
787                let first_nonce = first_queued.data.forget_check().applicable_at_nonce();
788
789                if expected_nonce != first_nonce {
790                    // Ocaml panics here as well
791                    //panic!("indexed pool nonces inconsistent when adding from backtrack.")
792                    return Err(CommandError::BacktrackNonceMismatch {
793                        expected_nonce,
794                        first_nonce,
795                    });
796                }
797
798                // update `self.all_by_sender`
799                {
800                    queue.push_front(cmd.clone());
801                    *currency_reserved = currency_reserved.checked_add(&consumed).unwrap();
802                }
803
804                self.remove_applicable_exn(&first_queued);
805
806                Self::map_set_insert(&mut self.applicable_by_fee, fee_per_wu.clone(), cmd.clone());
807                Self::map_set_insert(&mut self.all_by_fee, fee_per_wu.clone(), cmd.clone());
808                self.all_by_hash.insert(cmd_hash.clone(), cmd.clone());
809                self.add_to_expiration(cmd);
810                self.size += 1;
811            }
812        }
813        Ok(())
814    }
815
816    fn update_add(
817        &mut self,
818        cmd: ValidCommandWithHash,
819        fee_per_wu: FeeRate,
820        add_to_applicable_by_fee: bool,
821    ) {
822        if add_to_applicable_by_fee {
823            Self::map_set_insert(&mut self.applicable_by_fee, fee_per_wu.clone(), cmd.clone());
824        }
825
826        let cmd_hash = cmd.hash.clone();
827
828        Self::map_set_insert(&mut self.all_by_fee, fee_per_wu, cmd.clone());
829        self.all_by_hash.insert(cmd_hash, cmd.clone());
830        self.add_to_expiration(cmd);
831        self.size += 1;
832    }
833
834    /// Remove a command from the all_by_fee and all_by_hash fields, and decrement
835    /// size. This may break an invariant.
836    fn update_remove_all_by_fee_and_hash_and_expiration<I>(&mut self, cmds: I)
837    where
838        I: IntoIterator<Item = ValidCommandWithHash>,
839    {
840        for cmd in cmds {
841            let fee_per_wu = cmd.data.forget_check().fee_per_wu();
842            let cmd_hash = cmd.hash.clone();
843            Self::map_set_remove(&mut self.all_by_fee, fee_per_wu, &cmd);
844            self.all_by_hash.remove(&cmd_hash);
845            self.remove_from_expiration_exn(cmd);
846            self.size = self.size.checked_sub(1).unwrap();
847        }
848    }
849
850    fn update_remove_from_applicable_by_fee(
851        &mut self,
852        fee_per_wu: FeeRate,
853        command: &ValidCommandWithHash,
854    ) {
855        Self::map_set_remove(&mut self.applicable_by_fee, fee_per_wu, command)
856    }
857
858    fn remove_with_dependents_exn(
859        &mut self,
860        cmd: &ValidCommandWithHash,
861    ) -> Result<VecDeque<ValidCommandWithHash>, CommandError> {
862        let sender = cmd.data.fee_payer();
863        let mut by_sender = SenderState {
864            state: self.all_by_sender.get(&sender).cloned(),
865            sender,
866        };
867
868        let mut updates = Vec::<Update>::with_capacity(128);
869        let result = self.remove_with_dependents_exn_impl(cmd, &mut by_sender, &mut updates);
870
871        self.set_sender(by_sender);
872        self.apply_updates(updates);
873
874        result
875    }
876
877    fn remove_with_dependents_exn_impl(
878        &self,
879        cmd: &ValidCommandWithHash,
880        by_sender: &mut SenderState,
881        updates: &mut Vec<Update>,
882    ) -> Result<VecDeque<ValidCommandWithHash>, CommandError> {
883        let (sender_queue, reserved_currency_ref) = by_sender.state.as_mut().unwrap();
884        let unchecked = cmd.data.forget_check();
885
886        my_assert(!sender_queue.is_empty())?;
887
888        let cmd_nonce = unchecked.applicable_at_nonce();
889
890        let cmd_index = sender_queue
891            .iter()
892            .position(|cmd| {
893                let nonce = cmd.data.forget_check().applicable_at_nonce();
894                // we just compare nonce equality since the command we are looking for already exists in the sequence
895                nonce == cmd_nonce
896            })
897            .unwrap();
898
899        let drop_queue = sender_queue.split_off(cmd_index);
900        let keep_queue = sender_queue;
901        my_assert(!drop_queue.is_empty())?;
902
903        let currency_to_remove = drop_queue.iter().try_fold(Amount::zero(), |acc, cmd| {
904            let consumed = currency_consumed(&cmd.data.forget_check())?;
905            Ok(consumed.checked_add(&acc).unwrap())
906        })?;
907
908        // This is safe because the currency in a subset of the commands much be <=
909        // total currency in all the commands.
910        let reserved_currency = reserved_currency_ref
911            .checked_sub(&currency_to_remove)
912            .unwrap();
913
914        updates.push(Update::RemoveAllByFeeAndHashAndExpiration {
915            commands: drop_queue.clone(),
916        });
917
918        if cmd_index == 0 {
919            updates.push(Update::RemoveFromApplicableByFee {
920                fee_per_wu: unchecked.fee_per_wu(),
921                command: cmd.clone(),
922            });
923        }
924
925        // We re-fetch it to make the borrow checker happy
926        // let (keep_queue, reserved_currency_ref) = self.all_by_sender.get_mut(&sender).unwrap();
927        if !keep_queue.is_empty() {
928            *reserved_currency_ref = reserved_currency;
929        } else {
930            my_assert(reserved_currency.is_zero())?;
931            by_sender.state = None;
932        }
933
934        Ok(drop_queue)
935    }
936
937    fn apply_updates(&mut self, updates: Vec<Update>) {
938        for update in updates {
939            match update {
940                Update::Add {
941                    command,
942                    fee_per_wu,
943                    add_to_applicable_by_fee,
944                } => self.update_add(command, fee_per_wu, add_to_applicable_by_fee),
945                Update::RemoveAllByFeeAndHashAndExpiration { commands } => {
946                    self.update_remove_all_by_fee_and_hash_and_expiration(commands)
947                }
948                Update::RemoveFromApplicableByFee {
949                    fee_per_wu,
950                    command,
951                } => self.update_remove_from_applicable_by_fee(fee_per_wu, &command),
952            }
953        }
954    }
955
956    fn set_sender(&mut self, by_sender: SenderState) {
957        let SenderState { sender, state } = by_sender;
958
959        match state {
960            Some(state) => {
961                self.all_by_sender.insert(sender, state);
962            }
963            None => {
964                self.all_by_sender.remove(&sender);
965            }
966        }
967    }
968
969    fn add_from_gossip_exn(
970        &mut self,
971        global_slot_since_genesis: Slot,
972        current_global_slot: Slot,
973        cmd: &ValidCommandWithHash,
974        current_nonce: Nonce,
975        balance: Balance,
976    ) -> Result<(ValidCommandWithHash, VecDeque<ValidCommandWithHash>), CommandError> {
977        let sender = cmd.data.fee_payer();
978        let mut by_sender = SenderState {
979            state: self.all_by_sender.get(&sender).cloned(),
980            sender,
981        };
982
983        let mut updates = Vec::<Update>::with_capacity(128);
984        let result = self.add_from_gossip_exn_impl(
985            global_slot_since_genesis,
986            current_global_slot,
987            cmd,
988            current_nonce,
989            balance,
990            &mut by_sender,
991            &mut updates,
992        )?;
993
994        self.set_sender(by_sender);
995        self.apply_updates(updates);
996
997        Ok(result)
998    }
999
1000    fn add_from_gossip_exn_impl(
1001        &self,
1002        global_slot_since_genesis: Slot,
1003        current_global_slot: Slot,
1004        cmd: &ValidCommandWithHash,
1005        current_nonce: Nonce,
1006        balance: Balance,
1007        by_sender: &mut SenderState,
1008        updates: &mut Vec<Update>,
1009    ) -> Result<(ValidCommandWithHash, VecDeque<ValidCommandWithHash>), CommandError> {
1010        let IndexedPoolConfig { slot_tx_end, .. } = &self.config;
1011
1012        if !slot_tx_end
1013            .as_ref()
1014            .map(|slot_tx_end| current_global_slot < *slot_tx_end)
1015            .unwrap_or(true)
1016        {
1017            return Err(CommandError::AfterSlotTxEnd);
1018        }
1019
1020        let unchecked = cmd.data.forget_check();
1021        let fee = unchecked.fee();
1022        let fee_per_wu = unchecked.fee_per_wu();
1023        let cmd_applicable_at_nonce = unchecked.applicable_at_nonce();
1024
1025        let consumed = {
1026            self.check_expiry(global_slot_since_genesis, &unchecked)?;
1027            let consumed = currency_consumed(&unchecked).map_err(|_| CommandError::Overflow)?;
1028            if !unchecked.fee_token().is_default() {
1029                return Err(CommandError::UnwantedFeeToken {
1030                    token_id: unchecked.fee_token(),
1031                });
1032            }
1033            consumed
1034        };
1035
1036        match by_sender.state.clone() {
1037            None => {
1038                if current_nonce != cmd_applicable_at_nonce {
1039                    return Err(CommandError::InvalidNonce {
1040                        account_nonce: current_nonce,
1041                        expected: cmd_applicable_at_nonce,
1042                    });
1043                }
1044                if consumed > balance.to_amount() {
1045                    return Err(CommandError::InsufficientFunds { balance, consumed });
1046                }
1047
1048                let mut queue = Self::make_queue();
1049                queue.push_back(cmd.clone());
1050                by_sender.state = Some((queue, consumed));
1051
1052                updates.push(Update::Add {
1053                    command: cmd.clone(),
1054                    fee_per_wu,
1055                    add_to_applicable_by_fee: true,
1056                });
1057
1058                Ok((cmd.clone(), Self::make_queue()))
1059            }
1060            Some((mut queued_cmds, reserved_currency)) => {
1061                my_assert(!queued_cmds.is_empty())?;
1062                let queue_applicable_at_nonce = {
1063                    let first = queued_cmds.front().unwrap();
1064                    first.data.forget_check().applicable_at_nonce()
1065                };
1066                let queue_target_nonce = {
1067                    let last = queued_cmds.back().unwrap();
1068                    last.data.forget_check().expected_target_nonce()
1069                };
1070                if queue_target_nonce == cmd_applicable_at_nonce {
1071                    let reserved_currency = consumed
1072                        .checked_add(&reserved_currency)
1073                        .ok_or(CommandError::Overflow)?;
1074
1075                    if reserved_currency > balance.to_amount() {
1076                        return Err(CommandError::InsufficientFunds {
1077                            balance,
1078                            consumed: reserved_currency,
1079                        });
1080                    }
1081
1082                    queued_cmds.push_back(cmd.clone());
1083
1084                    updates.push(Update::Add {
1085                        command: cmd.clone(),
1086                        fee_per_wu,
1087                        add_to_applicable_by_fee: false,
1088                    });
1089
1090                    by_sender.state = Some((queued_cmds, reserved_currency));
1091
1092                    Ok((cmd.clone(), Self::make_queue()))
1093                } else if queue_applicable_at_nonce == current_nonce {
1094                    if !cmd_applicable_at_nonce
1095                        .between(&queue_applicable_at_nonce, &queue_target_nonce)
1096                    {
1097                        return Err(CommandError::InvalidNonce {
1098                            account_nonce: cmd_applicable_at_nonce,
1099                            expected: queue_applicable_at_nonce,
1100                        });
1101                    }
1102
1103                    let replacement_index = queued_cmds
1104                        .iter()
1105                        .position(|cmd| {
1106                            let cmd_applicable_at_nonce_prime =
1107                                cmd.data.forget_check().applicable_at_nonce();
1108                            cmd_applicable_at_nonce <= cmd_applicable_at_nonce_prime
1109                        })
1110                        .unwrap();
1111
1112                    let drop_queue = queued_cmds.split_off(replacement_index);
1113
1114                    let to_drop = drop_queue.front().unwrap().data.forget_check();
1115                    my_assert(cmd_applicable_at_nonce <= to_drop.applicable_at_nonce())?;
1116
1117                    // We check the fee increase twice because we need to be sure the
1118                    // subtraction is safe.
1119                    {
1120                        let replace_fee = to_drop.fee();
1121                        if fee < replace_fee {
1122                            return Err(CommandError::InsufficientReplaceFee { replace_fee, fee });
1123                        }
1124                    }
1125
1126                    let dropped = self.remove_with_dependents_exn_impl(
1127                        drop_queue.front().unwrap(),
1128                        by_sender,
1129                        updates,
1130                    )?;
1131                    my_assert(drop_queue == dropped)?;
1132
1133                    let (cmd, _) = {
1134                        let (v, dropped) = self.add_from_gossip_exn_impl(
1135                            global_slot_since_genesis,
1136                            current_global_slot,
1137                            cmd,
1138                            current_nonce,
1139                            balance,
1140                            by_sender,
1141                            updates,
1142                        )?;
1143                        // We've already removed them, so this should always be empty.
1144                        my_assert(dropped.is_empty())?;
1145                        (v, dropped)
1146                    };
1147
1148                    let drop_head = dropped.front().cloned().unwrap();
1149                    let mut drop_tail = dropped.into_iter().skip(1).peekable();
1150
1151                    let mut increment = fee.checked_sub(&to_drop.fee()).unwrap();
1152                    let mut dropped = None::<VecDeque<_>>;
1153                    let mut current_nonce = current_nonce;
1154                    let mut this_updates = Vec::with_capacity(128);
1155
1156                    while let Some(cmd) = drop_tail.peek() {
1157                        if dropped.is_some() {
1158                            let cmd_unchecked = cmd.data.forget_check();
1159                            let replace_fee = cmd_unchecked.fee();
1160
1161                            increment = increment.checked_sub(&replace_fee).ok_or({
1162                                CommandError::InsufficientReplaceFee {
1163                                    replace_fee,
1164                                    fee: increment,
1165                                }
1166                            })?;
1167                        } else {
1168                            current_nonce = current_nonce.succ();
1169                            let by_sender_pre = by_sender.clone();
1170                            this_updates.clear();
1171
1172                            match self.add_from_gossip_exn_impl(
1173                                global_slot_since_genesis,
1174                                current_global_slot,
1175                                cmd,
1176                                current_nonce,
1177                                balance,
1178                                by_sender,
1179                                &mut this_updates,
1180                            ) {
1181                                Ok((_cmd, dropped)) => {
1182                                    my_assert(dropped.is_empty())?;
1183                                    updates.append(&mut this_updates);
1184                                }
1185                                Err(_) => {
1186                                    *by_sender = by_sender_pre;
1187                                    dropped = Some(drop_tail.clone().skip(1).collect());
1188                                    continue; // Don't go to next
1189                                }
1190                            }
1191                        }
1192                        let _ = drop_tail.next();
1193                    }
1194
1195                    if increment < REPLACE_FEE {
1196                        return Err(CommandError::InsufficientReplaceFee {
1197                            replace_fee: REPLACE_FEE,
1198                            fee: increment,
1199                        });
1200                    }
1201
1202                    let mut dropped = dropped.unwrap_or_else(Self::make_queue);
1203                    dropped.push_front(drop_head);
1204
1205                    Ok((cmd, dropped))
1206                } else {
1207                    // Invalid nonce or duplicate transaction got in- either way error
1208                    Err(CommandError::InvalidNonce {
1209                        account_nonce: cmd_applicable_at_nonce,
1210                        expected: queue_target_nonce,
1211                    })
1212                }
1213            }
1214        }
1215    }
1216
1217    fn expired_by_global_slot(&self, global_slot_since_genesis: Slot) -> Vec<ValidCommandWithHash> {
1218        self.transactions_with_expiration
1219            .iter()
1220            .filter(|(slot, _cmd)| **slot < global_slot_since_genesis)
1221            .flat_map(|(_slot, cmd)| cmd.iter().cloned())
1222            .collect()
1223    }
1224
1225    fn expired(&self, global_slot_since_genesis: Slot) -> Vec<ValidCommandWithHash> {
1226        self.expired_by_global_slot(global_slot_since_genesis)
1227    }
1228
1229    fn remove_expired(
1230        &mut self,
1231        global_slot_since_genesis: Slot,
1232    ) -> Result<Vec<ValidCommandWithHash>, CommandError> {
1233        let mut dropped = Vec::with_capacity(128);
1234        for cmd in self.expired(global_slot_since_genesis) {
1235            if self.member(&cmd) {
1236                let removed = self.remove_with_dependents_exn(&cmd)?;
1237                dropped.extend(removed);
1238            }
1239        }
1240        Ok(dropped)
1241    }
1242
1243    fn remove_lowest_fee(&mut self) -> Result<VecDeque<ValidCommandWithHash>, CommandError> {
1244        let Some(set) = self.min_fee().and_then(|fee| self.all_by_fee.get(&fee)) else {
1245            return Ok(VecDeque::new());
1246        };
1247
1248        // TODO: Should `self.all_by_fee` be a `BTreeSet` instead ?
1249        #[allow(clippy::mutable_key_type)]
1250        let bset: BTreeSet<_> = set.iter().collect();
1251        // TODO: Not sure if OCaml compare the same way than we do
1252        let min = bset.first().map(|min| (*min).clone()).unwrap();
1253
1254        self.remove_with_dependents_exn(&min)
1255    }
1256
1257    /// Drop commands from the end of the queue until the total currency consumed is
1258    /// <= the current balance.
1259    fn drop_until_sufficient_balance(
1260        mut queue: VecDeque<ValidCommandWithHash>,
1261        mut currency_reserved: Amount,
1262        current_balance: Amount,
1263    ) -> Result<
1264        (
1265            VecDeque<ValidCommandWithHash>,
1266            Amount,
1267            VecDeque<ValidCommandWithHash>,
1268        ),
1269        CommandError,
1270    > {
1271        let mut dropped_so_far = VecDeque::with_capacity(queue.len());
1272
1273        while currency_reserved > current_balance {
1274            let last = queue.pop_back().unwrap();
1275            let consumed = currency_consumed(&last.data.forget_check())?;
1276            dropped_so_far.push_back(last);
1277            currency_reserved = currency_reserved.checked_sub(&consumed).unwrap();
1278        }
1279
1280        Ok((queue, currency_reserved, dropped_so_far))
1281    }
1282
1283    fn revalidate<F>(
1284        &mut self,
1285        global_slot_since_genesis: Slot,
1286        kind: RevalidateKind,
1287        get_account: F,
1288    ) -> Result<Vec<ValidCommandWithHash>, CommandError>
1289    where
1290        F: Fn(&AccountId) -> Option<Account>,
1291    {
1292        let requires_revalidation = |account_id: &AccountId| match kind {
1293            RevalidateKind::EntirePool => true,
1294            RevalidateKind::Subset(set) => set.contains(account_id),
1295        };
1296
1297        let mut dropped = Vec::new();
1298
1299        for (sender, (mut queue, mut currency_reserved)) in self.all_by_sender.clone() {
1300            if !requires_revalidation(&sender) {
1301                continue;
1302            }
1303            let account: Account = get_account(&sender)
1304                .ok_or(CommandError::Custom(Cow::Borrowed("Account not find")))?;
1305            let current_balance = account
1306                .liquid_balance_at_slot(global_slot_since_genesis)
1307                .to_amount();
1308            let first_cmd = queue.front().cloned().unwrap();
1309            let first_nonce = first_cmd.data.forget_check().applicable_at_nonce();
1310
1311            if !(account.has_permission_to_send() && account.has_permission_to_increment_nonce())
1312                || account.nonce < first_nonce
1313            {
1314                let this_dropped = self.remove_with_dependents_exn(&first_cmd)?;
1315                dropped.extend(this_dropped);
1316            } else {
1317                // current_nonce >= first_nonce
1318                let first_applicable_nonce_index = queue.iter().position(|cmd| {
1319                    let nonce = cmd.data.forget_check().applicable_at_nonce();
1320                    nonce == account.nonce
1321                });
1322
1323                let retained_for_nonce = match first_applicable_nonce_index {
1324                    Some(index) => queue.split_off(index),
1325                    None => Default::default(),
1326                };
1327                let dropped_for_nonce = queue;
1328
1329                for cmd in &dropped_for_nonce {
1330                    currency_reserved = currency_reserved
1331                        .checked_sub(&currency_consumed(&cmd.data.forget_check())?)
1332                        .unwrap();
1333                }
1334
1335                let (keep_queue, currency_reserved, dropped_for_balance) =
1336                    Self::drop_until_sufficient_balance(
1337                        retained_for_nonce,
1338                        currency_reserved,
1339                        current_balance,
1340                    )?;
1341
1342                let keeping_prefix = dropped_for_nonce.is_empty();
1343                let keeping_suffix = dropped_for_balance.is_empty();
1344                let to_drop: Vec<_> = dropped_for_nonce
1345                    .into_iter()
1346                    .chain(dropped_for_balance)
1347                    .collect();
1348
1349                match keep_queue.front().cloned() {
1350                    _ if keeping_prefix && keeping_suffix => {
1351                        // Nothing dropped, nothing needs to be updated
1352                    }
1353                    None => {
1354                        // We drop the entire queue, first element needs to be removed from
1355                        // applicable_by_fee
1356                        self.remove_applicable_exn(&first_cmd);
1357                        self.all_by_sender.remove(&sender);
1358                    }
1359                    Some(_) if keeping_prefix => {
1360                        // We drop only transactions from the end of queue, keeping
1361                        // the head untouched, no need to update applicable_by_fee
1362                        self.all_by_sender
1363                            .insert(sender, (keep_queue, currency_reserved));
1364                    }
1365                    Some(first_kept) => {
1366                        // We need to replace old queue head with the new queue head
1367                        // in applicable_by_fee
1368                        let first_kept_unchecked = first_kept.data.forget_check();
1369                        self.all_by_sender
1370                            .insert(sender, (keep_queue, currency_reserved));
1371                        self.remove_applicable_exn(&first_cmd);
1372                        Self::map_set_insert(
1373                            &mut self.applicable_by_fee,
1374                            first_kept_unchecked.fee_per_wu(),
1375                            first_kept,
1376                        );
1377                    }
1378                }
1379                self.update_remove_all_by_fee_and_hash_and_expiration(to_drop.clone());
1380                dropped.extend(to_drop);
1381            }
1382        }
1383
1384        Ok(dropped)
1385    }
1386
1387    // TODO(adonagy): clones too expensive? Optimize
1388    /// Same as `transactions`, but does not modify the mempool
1389    fn list_includable_transactions(&self, limit: usize) -> Vec<ValidCommandWithHash> {
1390        let mut txns = Vec::with_capacity(self.applicable_by_fee.len());
1391
1392        // get a copy of the maps as we are just listing the transactions
1393        let mut applicable_by_fee = self.applicable_by_fee.clone();
1394        let mut all_by_sender = self.all_by_sender.clone();
1395
1396        while !applicable_by_fee.is_empty() && txns.len() < limit {
1397            let (fee, mut set) = applicable_by_fee
1398                .iter()
1399                .max_by_key(|(rate, _)| *rate)
1400                .map(|(rate, set)| (rate.clone(), set.clone()))
1401                .unwrap();
1402
1403            // TODO: Check if OCaml compare using `hash` (order)
1404            let txn = set.iter().min_by_key(|b| &b.hash).cloned().unwrap();
1405
1406            {
1407                set.remove(&txn);
1408                if set.is_empty() {
1409                    applicable_by_fee.remove(&fee);
1410                } else {
1411                    applicable_by_fee.insert(fee, set);
1412                }
1413            }
1414
1415            let sender = txn.data.forget_check().fee_payer();
1416
1417            let (sender_queue, _amount) = all_by_sender.get_mut(&sender).unwrap();
1418            let head_txn = sender_queue.pop_front().unwrap();
1419
1420            if txn.hash == head_txn.hash {
1421                match sender_queue.front().cloned() {
1422                    None => {
1423                        all_by_sender.remove(&sender);
1424                    }
1425                    Some(next_txn) => {
1426                        let fee = next_txn.data.forget_check().fee_per_wu();
1427                        applicable_by_fee.entry(fee).or_default().insert(next_txn);
1428                    }
1429                }
1430            } else {
1431                mina_core::warn!(
1432                    mina_core::log::system_time();
1433                    kind = "transaction pool", message = "Sender queue is malformed");
1434                all_by_sender.remove(&sender);
1435            }
1436
1437            txns.push(txn);
1438        }
1439        txns
1440    }
1441
1442    // TODO(adonagy): Is it neede to remove txs from the pool directly here? If the produced block is injected
1443    // a BestTip update action will be dispatched and the pool can reorganize there
1444    /// Returns a sequence of commands in the pool in descending fee order
1445    fn transactions(&mut self, limit: usize) -> Vec<ValidCommandWithHash> {
1446        let mut txns = Vec::with_capacity(self.applicable_by_fee.len());
1447        loop {
1448            if self.applicable_by_fee.is_empty() {
1449                assert!(self.all_by_sender.is_empty());
1450                return txns;
1451            }
1452
1453            if txns.len() >= limit {
1454                return txns;
1455            }
1456
1457            let (fee, mut set) = self
1458                .applicable_by_fee
1459                .iter()
1460                .max_by_key(|(rate, _)| *rate)
1461                .map(|(rate, set)| (rate.clone(), set.clone()))
1462                .unwrap();
1463
1464            // TODO: Check if OCaml compare using `hash` (order)
1465            let txn = set.iter().min_by_key(|b| &b.hash).cloned().unwrap();
1466
1467            {
1468                set.remove(&txn);
1469                if set.is_empty() {
1470                    self.applicable_by_fee.remove(&fee);
1471                } else {
1472                    self.applicable_by_fee.insert(fee, set);
1473                }
1474            }
1475
1476            let sender = txn.data.forget_check().fee_payer();
1477
1478            let (sender_queue, _amount) = self.all_by_sender.get_mut(&sender).unwrap();
1479            let head_txn = sender_queue.pop_front().unwrap();
1480
1481            if txn.hash == head_txn.hash {
1482                match sender_queue.front().cloned() {
1483                    None => {
1484                        self.all_by_sender.remove(&sender);
1485                    }
1486                    Some(next_txn) => {
1487                        let fee = next_txn.data.forget_check().fee_per_wu();
1488                        self.applicable_by_fee
1489                            .entry(fee)
1490                            .or_default()
1491                            .insert(next_txn);
1492                    }
1493                }
1494            } else {
1495                mina_core::warn!(
1496                    mina_core::log::system_time();
1497                    kind = "transaction pool", message = "Sender queue is malformed");
1498                self.all_by_sender.remove(&sender);
1499            }
1500
1501            txns.push(txn);
1502        }
1503    }
1504
1505    /// Returns all the transactions in the pool
1506    fn get_all_transactions(&self) -> Vec<ValidCommandWithHash> {
1507        self.all_by_sender
1508            .values()
1509            .cloned()
1510            .flat_map(|(cmds, _)| cmds.into_iter())
1511            .collect()
1512    }
1513
1514    fn get_pending_amount_and_nonce(&self) -> HashMap<AccountId, (Option<Nonce>, Amount)> {
1515        // TODO(adonagy): clone too expensive here?
1516        self.all_by_sender
1517            .clone()
1518            .into_iter()
1519            .map(|(acc_id, (cmds, amount))| (acc_id, (cmds.back().unwrap().data.nonce(), amount)))
1520            .collect()
1521    }
1522}
1523
1524fn currency_consumed(cmd: &UserCommand) -> Result<Amount, CommandError> {
1525    use crate::scan_state::transaction_logic::signed_command::{Body::*, PaymentPayload};
1526
1527    let fee_amount = Amount::of_fee(&cmd.fee());
1528    let amount = match cmd {
1529        UserCommand::SignedCommand(c) => {
1530            match &c.payload.body {
1531                Payment(PaymentPayload { amount, .. }) => {
1532                    // The fee-payer is also the sender account, include the amount.
1533                    *amount
1534                }
1535                StakeDelegation(_) => Amount::zero(),
1536            }
1537        }
1538        UserCommand::ZkAppCommand(_) => Amount::zero(),
1539    };
1540
1541    fee_amount
1542        .checked_add(&amount)
1543        .ok_or(CommandError::InvalidCurrencyConsumed)
1544}
1545
1546pub mod transaction_hash {
1547    use super::*;
1548
1549    pub fn hash_command(cmd: valid::UserCommand) -> ValidCommandWithHash {
1550        let hash = match &cmd {
1551            valid::UserCommand::SignedCommand(cmd) => {
1552                v2::MinaBaseSignedCommandStableV2::from(&**cmd)
1553                    .hash()
1554                    .unwrap()
1555            }
1556            valid::UserCommand::ZkAppCommand(cmd) => {
1557                let cmd = cmd.clone().forget();
1558                v2::MinaBaseZkappCommandTStableV1WireStableV1::from(&cmd)
1559                    .hash()
1560                    .unwrap()
1561            }
1562        };
1563
1564        WithHash { data: cmd, hash }
1565    }
1566}
1567
1568#[derive(Debug)]
1569pub enum ApplyDecision {
1570    Accept,
1571    Reject,
1572}
1573
1574const MAX_PER_15_SECONDS: usize = 10;
1575
1576#[derive(Clone, Debug, Serialize, Deserialize)]
1577pub struct TransactionPool {
1578    pub pool: IndexedPool,
1579    locally_generated_uncommitted: HashMap<ValidCommandWithHash, (redux::Timestamp, Batch)>,
1580    locally_generated_committed: HashMap<ValidCommandWithHash, (redux::Timestamp, Batch)>,
1581    current_batch: usize,
1582    remaining_in_batch: usize,
1583    pub config: Config,
1584    batcher: (),
1585    best_tip_diff_relay: Option<()>,
1586    verification_key_table: VkRefcountTable,
1587}
1588
1589impl TransactionPool {
1590    pub fn new(config: Config, consensus_constants: &ConsensusConstants) -> Self {
1591        Self {
1592            pool: IndexedPool::new(consensus_constants),
1593            locally_generated_uncommitted: Default::default(),
1594            locally_generated_committed: Default::default(),
1595            current_batch: 0,
1596            remaining_in_batch: 0,
1597            config,
1598            batcher: (),
1599            best_tip_diff_relay: None,
1600            verification_key_table: Default::default(),
1601        }
1602    }
1603
1604    pub fn size(&self) -> usize {
1605        self.pool.size()
1606    }
1607
1608    pub fn get_all_transactions(&self) -> Vec<ValidCommandWithHash> {
1609        self.pool.get_all_transactions()
1610    }
1611
1612    pub fn get_pending_amount_and_nonce(&self) -> HashMap<AccountId, (Option<Nonce>, Amount)> {
1613        self.pool.get_pending_amount_and_nonce()
1614    }
1615
1616    pub fn transactions(&mut self, limit: usize) -> Vec<ValidCommandWithHash> {
1617        self.pool.transactions(limit)
1618    }
1619
1620    pub fn list_includable_transactions(&self, limit: usize) -> Vec<ValidCommandWithHash> {
1621        self.pool.list_includable_transactions(limit)
1622    }
1623
1624    pub fn get_accounts_to_revalidate_on_new_best_tip(&self) -> BTreeSet<AccountId> {
1625        self.pool.all_by_sender.keys().cloned().collect()
1626    }
1627
1628    pub fn on_new_best_tip(
1629        &mut self,
1630        global_slot_since_genesis: Slot,
1631        accounts: &BTreeMap<AccountId, Account>,
1632    ) -> Result<Vec<ValidCommandWithHash>, CommandError> {
1633        let dropped = self.pool.revalidate(
1634            global_slot_since_genesis,
1635            RevalidateKind::EntirePool,
1636            |sender_id| {
1637                Some(
1638                    accounts
1639                        .get(sender_id)
1640                        .cloned()
1641                        .unwrap_or_else(Account::empty),
1642                )
1643            },
1644        )?;
1645
1646        let dropped_locally_generated = dropped
1647            .iter()
1648            .filter(|cmd| {
1649                let dropped_commited = self.locally_generated_committed.remove(cmd).is_some();
1650                let dropped_uncommited = self.locally_generated_uncommitted.remove(cmd).is_some();
1651                // Nothing should be in both tables.
1652                assert!(!(dropped_commited && dropped_uncommited));
1653                dropped_commited || dropped_uncommited
1654            })
1655            .collect::<Vec<_>>();
1656
1657        if !dropped_locally_generated.is_empty() {
1658            mina_core::info!(
1659                mina_core::log::system_time();
1660                kind = "transaction pool",
1661                message = "Dropped locally generated commands $cmds from pool when transition frontier was recreated.",
1662                dropped = format!("{dropped_locally_generated:?}")
1663            );
1664        }
1665
1666        Ok(dropped)
1667    }
1668
1669    fn has_sufficient_fee(&self, pool_max_size: usize, cmd: &valid::UserCommand) -> bool {
1670        match self.pool.min_fee() {
1671            None => true,
1672            Some(min_fee) => {
1673                if self.pool.size() >= pool_max_size {
1674                    cmd.forget_check().fee_per_wu() > min_fee
1675                } else {
1676                    true
1677                }
1678            }
1679        }
1680    }
1681
1682    fn drop_until_below_max_size(
1683        &mut self,
1684        pool_max_size: usize,
1685    ) -> Result<Vec<ValidCommandWithHash>, CommandError> {
1686        let mut list = Vec::new();
1687
1688        while self.pool.size() > pool_max_size {
1689            let dropped = self.pool.remove_lowest_fee()?;
1690            my_assert(!dropped.is_empty())?;
1691            list.extend(dropped)
1692        }
1693
1694        Ok(list)
1695    }
1696
1697    pub fn get_accounts_to_handle_transition_diff(
1698        &self,
1699        diff: &diff::BestTipDiff,
1700    ) -> (BTreeSet<AccountId>, BTreeSet<AccountId>) {
1701        let diff::BestTipDiff {
1702            new_commands,
1703            removed_commands,
1704            reorg_best_tip: _,
1705        } = diff;
1706
1707        let in_cmds = new_commands
1708            .iter()
1709            .chain(removed_commands)
1710            .flat_map(|cmd| cmd.data.forget_check().accounts_referenced())
1711            .collect::<BTreeSet<_>>();
1712
1713        let uncommitted = self
1714            .locally_generated_uncommitted
1715            .keys()
1716            .map(|cmd| cmd.data.fee_payer())
1717            .collect::<BTreeSet<_>>();
1718
1719        (in_cmds, uncommitted)
1720    }
1721
1722    pub fn handle_transition_frontier_diff(
1723        &mut self,
1724        global_slot_since_genesis: Slot,
1725        current_global_slot: Slot,
1726        diff: &diff::BestTipDiff,
1727        account_ids: &BTreeSet<AccountId>,
1728        accounts: &BTreeMap<AccountId, Account>,
1729        uncommited: &BTreeMap<AccountId, Account>,
1730    ) -> Result<(), String> {
1731        let diff::BestTipDiff {
1732            new_commands,
1733            removed_commands,
1734            reorg_best_tip: _,
1735        } = diff;
1736
1737        // Remove duplicates
1738        let (new_commands, removed_commands) = {
1739            let collect_hashed = |cmds: &[WithStatus<valid::UserCommand>]| {
1740                cmds.iter()
1741                    .map(|cmd| transaction_hash::hash_command(cmd.data.clone()))
1742                    .collect::<Vec<_>>()
1743            };
1744
1745            let mut new_commands = collect_hashed(new_commands);
1746            let mut removed_commands = collect_hashed(removed_commands);
1747
1748            #[allow(clippy::mutable_key_type)]
1749            let new_commands_set = new_commands.iter().collect::<HashSet<_>>();
1750            #[allow(clippy::mutable_key_type)]
1751            let removed_commands_set = removed_commands.iter().collect::<HashSet<_>>();
1752
1753            #[allow(clippy::mutable_key_type)]
1754            let duplicates = new_commands_set
1755                .intersection(&removed_commands_set)
1756                .map(|cmd| (*cmd).clone())
1757                .collect::<HashSet<_>>();
1758
1759            new_commands.retain(|cmd| !duplicates.contains(cmd));
1760            removed_commands.retain(|cmd| !duplicates.contains(cmd));
1761            (new_commands, removed_commands)
1762        };
1763
1764        let pool_max_size = self.config.pool_max_size;
1765
1766        self.verification_key_table.increment_list(&new_commands);
1767        self.verification_key_table
1768            .decrement_list(&removed_commands);
1769
1770        let mut dropped_backtrack = Vec::with_capacity(256);
1771        for cmd in removed_commands {
1772            if let Some(time_added) = self.locally_generated_committed.remove(&cmd) {
1773                self.locally_generated_uncommitted
1774                    .insert(cmd.clone(), time_added);
1775            }
1776
1777            let dropped_seq = match self.pool.add_from_backtrack(
1778                global_slot_since_genesis,
1779                current_global_slot,
1780                cmd,
1781            ) {
1782                Ok(_) => self.drop_until_below_max_size(pool_max_size)?,
1783                Err(e) => return Err(format!("{:?}", e)),
1784            };
1785            dropped_backtrack.extend(dropped_seq);
1786        }
1787
1788        self.verification_key_table
1789            .decrement_hashed(&dropped_backtrack);
1790
1791        let locally_generated_dropped = dropped_backtrack
1792            .iter()
1793            .filter(|t| self.locally_generated_uncommitted.contains_key(t))
1794            .collect::<Vec<_>>();
1795
1796        let dropped_commands = {
1797            let accounts_to_check = account_ids;
1798            let existing_account_states_by_id = accounts;
1799
1800            let get_account = |id: &AccountId| {
1801                match existing_account_states_by_id.get(id) {
1802                    Some(account) => Some(account.clone()),
1803                    None => {
1804                        if accounts_to_check.contains(id) {
1805                            Some(Account::empty())
1806                        } else {
1807                            None
1808                            // OCaml panic too, with same message
1809                            // panic!(
1810                            //     "did not expect Indexed_pool.revalidate to call \
1811                            //         get_account on account not in accounts_to_check"
1812                            // )
1813                        }
1814                    }
1815                }
1816            };
1817
1818            self.pool.revalidate(
1819                global_slot_since_genesis,
1820                RevalidateKind::Subset(accounts_to_check),
1821                get_account,
1822            )?
1823        };
1824
1825        let (committed_commands, dropped_commit_conflicts): (Vec<_>, Vec<_>) = {
1826            let command_hashes: HashSet<v2::TransactionHash> =
1827                new_commands.iter().map(|cmd| cmd.hash.clone()).collect();
1828
1829            dropped_commands
1830                .iter()
1831                .partition(|cmd| command_hashes.contains(&cmd.hash))
1832        };
1833
1834        for cmd in &committed_commands {
1835            self.verification_key_table.decrement_hashed([&**cmd]);
1836            if let Some(data) = self.locally_generated_uncommitted.remove(cmd) {
1837                let old = self
1838                    .locally_generated_committed
1839                    .insert((*cmd).clone(), data);
1840                my_assert(old.is_none())?;
1841            };
1842        }
1843
1844        let _commit_conflicts_locally_generated = dropped_commit_conflicts
1845            .iter()
1846            .filter(|cmd| self.locally_generated_uncommitted.remove(cmd).is_some());
1847
1848        for cmd in locally_generated_dropped {
1849            // If the dropped transaction was included in the winning chain, it'll
1850            // be in locally_generated_committed. If it wasn't, try re-adding to
1851            // the pool.
1852            let remove_cmd = |this: &mut Self| {
1853                this.verification_key_table.decrement_hashed([cmd]);
1854                assert!(this.locally_generated_uncommitted.remove(cmd).is_some());
1855            };
1856
1857            if !self.locally_generated_committed.contains_key(cmd) {
1858                if !self.has_sufficient_fee(pool_max_size, &cmd.data) {
1859                    remove_cmd(self)
1860                } else {
1861                    let unchecked = &cmd.data;
1862                    match uncommited.get(&unchecked.fee_payer()) {
1863                        Some(account) => {
1864                            match self.pool.add_from_gossip_exn(
1865                                global_slot_since_genesis,
1866                                current_global_slot,
1867                                cmd,
1868                                account.nonce,
1869                                account.liquid_balance_at_slot(global_slot_since_genesis),
1870                            ) {
1871                                Err(_) => {
1872                                    remove_cmd(self);
1873                                }
1874                                Ok(_) => {
1875                                    self.verification_key_table.increment_hashed([cmd]);
1876                                }
1877                            }
1878                        }
1879                        None => {
1880                            remove_cmd(self);
1881                        }
1882                    }
1883                }
1884            }
1885        }
1886
1887        let expired_commands = self.pool.remove_expired(global_slot_since_genesis)?;
1888        for cmd in &expired_commands {
1889            self.verification_key_table.decrement_hashed([cmd]);
1890            self.locally_generated_uncommitted.remove(cmd);
1891        }
1892
1893        Ok(())
1894    }
1895
1896    pub fn get_accounts_to_apply_diff(&self, diff: &diff::DiffVerified) -> BTreeSet<AccountId> {
1897        let fee_payer = |cmd: &ValidCommandWithHash| cmd.data.fee_payer();
1898        diff.list.iter().map(fee_payer).collect()
1899    }
1900
1901    fn apply(
1902        &mut self,
1903        time: redux::Timestamp,
1904        global_slot_since_genesis: Slot,
1905        current_global_slot: Slot,
1906        diff: &diff::DiffVerified,
1907        accounts: &BTreeMap<AccountId, Account>,
1908        is_sender_local: bool,
1909    ) -> Result<
1910        (
1911            ApplyDecision,
1912            Vec<ValidCommandWithHash>,
1913            Vec<(ValidCommandWithHash, diff::Error)>,
1914            HashSet<v2::TransactionHash>,
1915        ),
1916        String,
1917    > {
1918        let fee_payer = |cmd: &ValidCommandWithHash| cmd.data.fee_payer();
1919        let fee_payer_accounts = accounts;
1920
1921        let check_command = |pool: &IndexedPool, cmd: &ValidCommandWithHash| {
1922            if pool.member(cmd) {
1923                Err(diff::Error::Duplicate)
1924            } else {
1925                match fee_payer_accounts.get(&fee_payer(cmd)) {
1926                    None => Err(diff::Error::FeePayerAccountNotFound),
1927                    Some(account) => {
1928                        if account.has_permission_to_send()
1929                            && account.has_permission_to_increment_nonce()
1930                        {
1931                            Ok(())
1932                        } else {
1933                            Err(diff::Error::FeePayerNotPermittedToSend)
1934                        }
1935                    }
1936                }
1937            }
1938        };
1939
1940        let add_results = diff
1941            .list
1942            .iter()
1943            .map(|cmd| {
1944                let account = fee_payer_accounts
1945                    .get(&fee_payer(cmd))
1946                    .ok_or_else(|| "Fee payer not found".to_string())?;
1947
1948                let result: Result<_, diff::Error> = (|| {
1949                    check_command(&self.pool, cmd)?;
1950
1951                    match self.pool.add_from_gossip_exn(
1952                        global_slot_since_genesis,
1953                        current_global_slot,
1954                        cmd,
1955                        account.nonce,
1956                        account.liquid_balance_at_slot(global_slot_since_genesis),
1957                    ) {
1958                        Ok(x) => Ok(x),
1959                        Err(e) => Err(e.into()),
1960                    }
1961                })();
1962
1963                match result {
1964                    Ok((cmd, dropped)) => Ok(Ok((cmd, dropped))),
1965                    Err(err) => Ok(Err((cmd, err))),
1966                }
1967            })
1968            .collect::<Result<Vec<Result<_, _>>, String>>()?;
1969
1970        let added_cmds = add_results
1971            .iter()
1972            .filter_map(|cmd| match cmd {
1973                Ok((cmd, _)) => Some(cmd),
1974                Err(_) => None,
1975            })
1976            .collect::<Vec<_>>();
1977
1978        let dropped_for_add = add_results
1979            .iter()
1980            .filter_map(|cmd| match cmd {
1981                Ok((_, dropped)) => Some(dropped),
1982                Err(_) => None,
1983            })
1984            .flatten()
1985            .collect::<Vec<_>>();
1986
1987        let dropped_for_size = self.drop_until_below_max_size(self.config.pool_max_size)?;
1988
1989        let all_dropped_cmds = dropped_for_add
1990            .iter()
1991            .copied()
1992            .chain(dropped_for_size.iter())
1993            .collect::<Vec<_>>();
1994
1995        {
1996            self.verification_key_table.increment_hashed(added_cmds);
1997            self.verification_key_table
1998                .decrement_hashed(all_dropped_cmds.iter().copied());
1999        };
2000
2001        let dropped_for_add_hashes: HashSet<&v2::TransactionHash> =
2002            dropped_for_add.iter().map(|cmd| &cmd.hash).collect();
2003        let dropped_for_size_hashes: HashSet<&v2::TransactionHash> =
2004            dropped_for_size.iter().map(|cmd| &cmd.hash).collect();
2005        let all_dropped_cmd_hashes: HashSet<v2::TransactionHash> = dropped_for_add_hashes
2006            .union(&dropped_for_size_hashes)
2007            .map(|hash| (*hash).clone())
2008            .collect();
2009
2010        // let locally_generated_dropped = all_dropped_cmds
2011        //     .iter()
2012        //     .filter(|cmd| self.locally_generated_uncommitted.remove(cmd).is_some());
2013
2014        if is_sender_local {
2015            for result in add_results.iter() {
2016                let Ok((cmd, _dropped)) = result else {
2017                    continue;
2018                };
2019                if !all_dropped_cmd_hashes.contains(&cmd.hash) {
2020                    self.register_locally_generated(time, cmd);
2021                }
2022            }
2023        }
2024
2025        let mut accepted = Vec::with_capacity(128);
2026        let mut rejected = Vec::with_capacity(128);
2027
2028        // TODO: Re-work this to avoid cloning ?
2029        for result in &add_results {
2030            match result {
2031                Ok((cmd, _dropped)) => {
2032                    if all_dropped_cmd_hashes.contains(&cmd.hash) {
2033                        // ignored (dropped)
2034                    } else {
2035                        accepted.push(cmd.clone());
2036                    }
2037                }
2038                Err((cmd, error)) => {
2039                    rejected.push(((*cmd).clone(), error.clone()));
2040                }
2041            }
2042        }
2043
2044        let decision = if rejected
2045            .iter()
2046            .any(|(_, error)| error.grounds_for_diff_rejection())
2047        {
2048            ApplyDecision::Reject
2049        } else {
2050            ApplyDecision::Accept
2051        };
2052
2053        Ok((decision, accepted, rejected, all_dropped_cmd_hashes))
2054    }
2055
2056    pub fn unsafe_apply(
2057        &mut self,
2058        time: redux::Timestamp,
2059        global_slot_since_genesis: Slot,
2060        current_global_slot: Slot,
2061        diff: &diff::DiffVerified,
2062        accounts: &BTreeMap<AccountId, Account>,
2063        is_sender_local: bool,
2064    ) -> Result<
2065        (
2066            ApplyDecision,
2067            Vec<ValidCommandWithHash>,
2068            Vec<(ValidCommandWithHash, diff::Error)>,
2069            HashSet<v2::TransactionHash>,
2070        ),
2071        String,
2072    > {
2073        let (decision, accepted, rejected, dropped) = self.apply(
2074            time,
2075            global_slot_since_genesis,
2076            current_global_slot,
2077            diff,
2078            accounts,
2079            is_sender_local,
2080        )?;
2081        Ok((decision, accepted, rejected, dropped))
2082    }
2083
2084    fn register_locally_generated(&mut self, time: redux::Timestamp, cmd: &ValidCommandWithHash) {
2085        match self.locally_generated_uncommitted.entry(cmd.clone()) {
2086            Entry::Occupied(mut entry) => {
2087                let (entry_time, _batch_num) = entry.get_mut();
2088                *entry_time = redux::Timestamp::global_now();
2089            }
2090            Entry::Vacant(entry) => {
2091                let batch_num = if self.remaining_in_batch > 0 {
2092                    self.remaining_in_batch -= 1;
2093                    self.current_batch
2094                } else {
2095                    self.remaining_in_batch = MAX_PER_15_SECONDS - 1;
2096                    self.current_batch += 1;
2097                    self.current_batch
2098                };
2099                entry.insert((time, Batch::Of(batch_num)));
2100            }
2101        }
2102    }
2103
2104    pub fn prevalidate(&self, diff: diff::Diff) -> Result<diff::Diff, TransactionPoolErrors> {
2105        let well_formedness_errors: HashSet<_> = diff
2106            .list
2107            .iter()
2108            .flat_map(|cmd| match cmd.check_well_formedness() {
2109                Ok(()) => Vec::new(),
2110                Err(errors) => errors,
2111            })
2112            .collect();
2113
2114        if !well_formedness_errors.is_empty() {
2115            return Err(TransactionPoolErrors::BatchedErrors(
2116                well_formedness_errors
2117                    .into_iter()
2118                    .map(TransactionError::WellFormedness)
2119                    .collect_vec(),
2120            ));
2121        }
2122
2123        Ok(diff)
2124    }
2125
2126    pub fn convert_diff_to_verifiable(
2127        &self,
2128        diff: diff::Diff,
2129        accounts: &BTreeMap<AccountId, Account>,
2130    ) -> Result<Vec<WithStatus<verifiable::UserCommand>>, TransactionPoolErrors> {
2131        let cs = diff
2132            .list
2133            .iter()
2134            .cloned()
2135            .map(|cmd| MaybeWithStatus { cmd, status: None })
2136            .collect::<Vec<_>>();
2137
2138        let diff = UserCommand::to_all_verifiable::<FromUnappliedSequence, _>(cs, |account_ids| {
2139            let mempool_vks: HashMap<_, _> = account_ids
2140                .iter()
2141                .map(|id| {
2142                    let vks = self.verification_key_table.find_vks_by_account_id(id);
2143                    let vks: HashMap<_, _> =
2144                        vks.iter().map(|vk| (vk.hash(), (*vk).clone())).collect();
2145                    (id.clone(), vks)
2146                })
2147                .collect();
2148
2149            let ledger_vks = UserCommand::load_vks_from_ledger_accounts(accounts);
2150            let ledger_vks: HashMap<_, _> = ledger_vks
2151                .into_iter()
2152                .map(|(id, vk)| {
2153                    let mut map = HashMap::new();
2154                    map.insert(vk.hash(), vk);
2155                    (id, map)
2156                })
2157                .collect();
2158
2159            let new_map: HashMap<AccountId, HashMap<Fp, VerificationKeyWire>> = HashMap::new();
2160            let merged =
2161                mempool_vks
2162                    .into_iter()
2163                    .chain(ledger_vks)
2164                    .fold(new_map, |mut accum, (id, map)| {
2165                        let entry = accum.entry(id).or_default();
2166                        for (hash, vk) in map {
2167                            entry.insert(hash, vk);
2168                        }
2169                        accum
2170                    });
2171
2172            from_unapplied_sequence::Cache::new(merged)
2173        })
2174        .map_err(TransactionPoolErrors::LoadingVK)?;
2175
2176        let diff = diff
2177            .into_iter()
2178            .map(|MaybeWithStatus { cmd, status: _ }| WithStatus {
2179                data: cmd,
2180                // TODO: is this correct?
2181                status: Applied,
2182            })
2183            .collect::<Vec<_>>();
2184
2185        Ok(diff)
2186    }
2187
2188    pub fn verify_proofs(
2189        &self,
2190        diff: Vec<WithStatus<verifiable::UserCommand>>,
2191    ) -> Result<Vec<valid::UserCommand>, TransactionPoolErrors> {
2192        let (verified, invalid): (Vec<_>, Vec<_>) = Verifier
2193            .verify_commands(diff, None)
2194            .into_iter()
2195            .partition(Result::is_ok);
2196
2197        let verified: Vec<_> = verified.into_iter().map(Result::unwrap).collect();
2198        let invalid: Vec<_> = invalid.into_iter().map(Result::unwrap_err).collect();
2199
2200        if !invalid.is_empty() {
2201            let transaction_pool_errors = invalid
2202                .into_iter()
2203                .map(TransactionError::Verifier)
2204                .collect();
2205            Err(TransactionPoolErrors::BatchedErrors(
2206                transaction_pool_errors,
2207            ))
2208        } else {
2209            Ok(verified)
2210        }
2211    }
2212
2213    fn get_rebroadcastable<F>(&mut self, has_timed_out: F) -> Vec<Vec<UserCommand>>
2214    where
2215        F: Fn(&redux::Timestamp) -> bool,
2216    {
2217        let log = |has_timed_out: bool, s: &str, cmd: &ValidCommandWithHash| -> bool {
2218            if has_timed_out {
2219                eprintln!("{}: {:?}", s, cmd);
2220                false
2221            } else {
2222                true
2223            }
2224        };
2225
2226        self.locally_generated_uncommitted
2227            .retain(|key, (time, _batch)| {
2228                log(
2229                    has_timed_out(time),
2230                    "No longer rebroadcasting uncommitted expired command",
2231                    key,
2232                )
2233            });
2234        self.locally_generated_committed
2235            .retain(|key, (time, _batch)| {
2236                log(
2237                    has_timed_out(time),
2238                    "Removing committed locally generated expired command",
2239                    key,
2240                )
2241            });
2242
2243        let mut rebroadcastable_txs = self
2244            .locally_generated_uncommitted
2245            .iter()
2246            .collect::<Vec<_>>();
2247
2248        rebroadcastable_txs.sort_by(|(txn1, (_, batch1)), (txn2, (_, batch2))| {
2249            use std::cmp::Ordering::Equal;
2250
2251            let get_nonce =
2252                |txn: &ValidCommandWithHash| txn.data.forget_check().applicable_at_nonce();
2253
2254            match batch1.cmp(batch2) {
2255                Equal => (),
2256                x => return x,
2257            }
2258            match get_nonce(txn1).cmp(&get_nonce(txn2)) {
2259                Equal => (),
2260                x => return x,
2261            }
2262            txn1.hash.cmp(&txn2.hash)
2263        });
2264
2265        rebroadcastable_txs
2266            .into_iter()
2267            .group_by(|(_txn, (_time, batch))| batch)
2268            .into_iter()
2269            .map(|(_batch, group_txns)| {
2270                group_txns
2271                    .map(|(txn, _)| txn.data.forget_check())
2272                    .collect::<Vec<_>>()
2273            })
2274            .collect::<Vec<_>>()
2275    }
2276}
2277
2278#[cfg(test)]
2279mod tests {
2280    use super::*;
2281
2282    /// Make sure that the merge in `TransactionPool::verify` is correct
2283    #[test]
2284    fn test_map_merge() {
2285        let mut a = HashMap::new();
2286        a.insert(1, {
2287            let mut map = HashMap::new();
2288            map.insert(1, 10);
2289            map.insert(2, 12);
2290            map
2291        });
2292        let mut b = HashMap::new();
2293        b.insert(1, {
2294            let mut map = HashMap::new();
2295            map.insert(3, 20);
2296            map
2297        });
2298
2299        let new_map: HashMap<_, HashMap<_, _>> = HashMap::new();
2300        let merged = a
2301            .into_iter()
2302            .chain(b)
2303            .fold(new_map, |mut accum, (id, map)| {
2304                let entry = accum.entry(id).or_default();
2305                for (hash, vk) in map {
2306                    entry.insert(hash, vk);
2307                }
2308                accum
2309            });
2310
2311        let one = merged.get(&1).unwrap();
2312        assert!(one.get(&1).is_some());
2313        assert!(one.get(&2).is_some());
2314        assert!(one.get(&3).is_some());
2315
2316        dbg!(merged);
2317    }
2318}