node/transaction_pool/candidate/
transaction_pool_candidate_state.rs

1#![allow(clippy::unit_arg)]
2
3use std::collections::{BTreeMap, BTreeSet};
4
5use mina_p2p_messages::v2;
6use openmina_core::transaction::TransactionPoolMessageSource;
7use p2p::P2pNetworkPubsubMessageCacheId;
8use redux::Timestamp;
9use serde::{Deserialize, Serialize};
10
11use crate::{
12    core::transaction::{Transaction, TransactionHash, TransactionInfo, TransactionWithHash},
13    p2p::{channels::rpc::P2pRpcId, PeerId},
14};
15
16static EMPTY_PEER_TX_CANDIDATES: BTreeMap<TransactionHash, TransactionPoolCandidateState> =
17    BTreeMap::new();
18
19type NextBatch = (
20    PeerId,
21    Vec<TransactionWithHash>,
22    TransactionPoolMessageSource,
23);
24
25#[derive(Serialize, Deserialize, Debug, Clone, Default)]
26pub struct TransactionPoolCandidatesState {
27    by_peer: BTreeMap<PeerId, BTreeMap<TransactionHash, TransactionPoolCandidateState>>,
28    by_hash: BTreeMap<TransactionHash, BTreeSet<PeerId>>,
29    by_message_id: BTreeMap<P2pNetworkPubsubMessageCacheId, (PeerId, Vec<TransactionHash>)>,
30}
31
32#[derive(Serialize, Deserialize, Debug, Clone)]
33pub enum TransactionPoolCandidateState {
34    InfoReceived {
35        time: Timestamp,
36        info: TransactionInfo,
37    },
38    FetchPending {
39        time: Timestamp,
40        info: TransactionInfo,
41        rpc_id: P2pRpcId,
42    },
43    Received {
44        time: Timestamp,
45        transaction: TransactionWithHash,
46    },
47    VerifyPending {
48        time: Timestamp,
49        transaction: TransactionWithHash,
50        verify_id: (),
51    },
52    VerifyError {
53        time: Timestamp,
54        transaction: TransactionWithHash,
55    },
56    VerifySuccess {
57        time: Timestamp,
58        transaction: TransactionWithHash,
59    },
60}
61
62impl TransactionPoolCandidatesState {
63    pub fn new() -> Self {
64        Self::default()
65    }
66
67    pub fn transactions_count(&self) -> usize {
68        self.by_hash.len()
69    }
70
71    pub fn peer_transaction_count(&self, peer_id: &PeerId) -> usize {
72        self.by_peer.get(peer_id).map(|v| v.len()).unwrap_or(0)
73    }
74
75    pub fn contains(&self, hash: &TransactionHash) -> bool {
76        self.by_hash.contains_key(hash)
77    }
78
79    pub fn message_id_contains(&self, message_id: &P2pNetworkPubsubMessageCacheId) -> bool {
80        self.by_message_id.contains_key(message_id)
81    }
82
83    pub fn peer_contains(&self, peer_id: PeerId, hash: &TransactionHash) -> bool {
84        self.by_peer
85            .get(&peer_id)
86            .is_some_and(|txs| txs.contains_key(hash))
87    }
88
89    pub fn get(
90        &self,
91        peer_id: PeerId,
92        hash: &TransactionHash,
93    ) -> Option<&TransactionPoolCandidateState> {
94        self.by_peer.get(&peer_id)?.get(hash)
95    }
96
97    fn transactions_from_peer_or_empty(
98        &self,
99        peer_id: PeerId,
100    ) -> &BTreeMap<TransactionHash, TransactionPoolCandidateState> {
101        self.by_peer
102            .get(&peer_id)
103            .unwrap_or(&EMPTY_PEER_TX_CANDIDATES)
104    }
105
106    pub fn candidates_from_peer_iter(
107        &self,
108        peer_id: PeerId,
109    ) -> impl Iterator<Item = (&TransactionHash, &TransactionPoolCandidateState)> {
110        self.transactions_from_peer_or_empty(peer_id).iter()
111    }
112
113    pub fn candidates_from_peer_with_hashes<'a, I>(
114        &'a self,
115        peer_id: PeerId,
116        transaction_hashes: I,
117    ) -> impl Iterator<
118        Item = (
119            &'a TransactionHash,
120            Option<&'a TransactionPoolCandidateState>,
121        ),
122    >
123    where
124        I: IntoIterator<Item = &'a TransactionHash>,
125    {
126        let transactions = self.transactions_from_peer_or_empty(peer_id);
127        transaction_hashes
128            .into_iter()
129            .map(|hash| (hash, transactions.get(hash)))
130    }
131
132    pub fn info_received(&mut self, time: Timestamp, peer_id: PeerId, info: TransactionInfo) {
133        self.by_hash
134            .entry(info.hash.clone())
135            .or_default()
136            .insert(peer_id);
137
138        let hash = info.hash.clone();
139        let state = TransactionPoolCandidateState::InfoReceived { time, info };
140        self.by_peer.entry(peer_id).or_default().insert(hash, state);
141    }
142
143    pub fn peers_next_transactions_to_fetch<I, F>(
144        &self,
145        peers: I,
146        get_order: F,
147    ) -> Vec<(PeerId, TransactionHash)>
148    where
149        I: IntoIterator<Item = PeerId>,
150        F: Copy + Fn(&TransactionHash) -> usize,
151    {
152        let mut needs_fetching = peers
153            .into_iter()
154            .filter_map(|peer_id| Some((peer_id, self.by_peer.get(&peer_id)?)))
155            .flat_map(|(peer_id, transactions)| {
156                transactions
157                    .iter()
158                    .filter(|(_, state)| {
159                        matches!(state, TransactionPoolCandidateState::InfoReceived { .. })
160                    })
161                    .map(move |(hash, state)| (get_order(hash), state.fee(), peer_id, hash))
162            })
163            .collect::<Vec<_>>();
164        needs_fetching
165            .sort_by(|(ord1, fee1, ..), (ord2, fee2, ..)| ord1.cmp(ord2).then(fee1.cmp(fee2)));
166
167        needs_fetching
168            .into_iter()
169            .scan(None, |last_ord, (ord, _, peer_id, hash)| {
170                if *last_ord == Some(ord) {
171                    return Some(None);
172                }
173                *last_ord = Some(ord);
174                Some(Some((peer_id, hash.clone())))
175            })
176            .flatten()
177            .collect()
178    }
179
180    pub fn fetch_pending(
181        &mut self,
182        time: Timestamp,
183        peer_id: &PeerId,
184        hash: &TransactionHash,
185        rpc_id: P2pRpcId,
186    ) {
187        if let Some(state) = self
188            .by_peer
189            .get_mut(peer_id)
190            .and_then(|transactions| transactions.get_mut(hash))
191        {
192            if let TransactionPoolCandidateState::InfoReceived { info, .. } = state {
193                *state = TransactionPoolCandidateState::FetchPending {
194                    time,
195                    info: info.clone(),
196                    rpc_id,
197                };
198            }
199        }
200    }
201
202    pub fn transaction_received(
203        &mut self,
204        time: Timestamp,
205        peer_id: PeerId,
206        transaction: TransactionWithHash,
207    ) {
208        let hash = transaction.hash().clone();
209        self.by_hash
210            .entry(hash.clone())
211            .or_default()
212            .insert(peer_id);
213
214        let state = TransactionPoolCandidateState::Received { time, transaction };
215        self.by_peer.entry(peer_id).or_default().insert(hash, state);
216    }
217
218    pub fn transactions_received(
219        &mut self,
220        time: Timestamp,
221        peer_id: PeerId,
222        transactions: Vec<TransactionWithHash>,
223        message_id: P2pNetworkPubsubMessageCacheId,
224    ) {
225        let transaction_hashes = transactions
226            .iter()
227            .map(TransactionWithHash::hash)
228            .cloned()
229            .collect::<Vec<_>>();
230
231        self.by_message_id
232            .insert(message_id, (peer_id, transaction_hashes));
233
234        transactions.into_iter().for_each(|transaction| {
235            self.transaction_received(time, peer_id, transaction);
236        })
237    }
238
239    /// Get next batch of transactions to verify,
240    /// first checks if there are any transactions to verify from pubsub
241    /// after that checks for transactions from peers
242    pub fn get_batch_to_verify(&self) -> Option<NextBatch> {
243        self.next_batch_from_pubsub()
244            .or_else(|| self.next_batch_from_peers())
245    }
246
247    fn next_batch_from_peers(&self) -> Option<NextBatch> {
248        for (hash, peers) in self.by_hash.iter() {
249            if let Some(res) = None.or_else(|| {
250                for peer_id in peers {
251                    let peer_transactions = self.by_peer.get(peer_id)?;
252                    if peer_transactions.get(hash)?.transaction().is_some() {
253                        let transactions = peer_transactions
254                            .iter()
255                            .filter_map(|(_, v)| match v {
256                                TransactionPoolCandidateState::Received { transaction, .. } => {
257                                    Some(transaction)
258                                }
259                                _ => None,
260                            })
261                            .cloned()
262                            .collect();
263                        return Some((*peer_id, transactions, TransactionPoolMessageSource::None));
264                    }
265                }
266                None
267            }) {
268                return Some(res);
269            }
270        }
271
272        None
273    }
274
275    fn next_batch_from_pubsub(&self) -> Option<NextBatch> {
276        let (message_id, (peer_id, transaction_hashes)) = self.by_message_id.iter().next()?;
277        let transactions = self
278            .by_peer
279            .get(peer_id)?
280            .iter()
281            .filter_map(|(hash, state)| {
282                let TransactionPoolCandidateState::Received { transaction, .. } = state else {
283                    return None;
284                };
285                if transaction_hashes.contains(hash) {
286                    Some(transaction)
287                } else {
288                    None
289                }
290            })
291            .cloned()
292            .collect();
293
294        Some((
295            *peer_id,
296            transactions,
297            TransactionPoolMessageSource::pubsub(*message_id),
298        ))
299    }
300
301    pub fn verify_pending(
302        &mut self,
303        time: Timestamp,
304        peer_id: &PeerId,
305        verify_id: (),
306        transaction_hashes: &[TransactionHash],
307    ) {
308        let Some(peer_transactions) = self.by_peer.get_mut(peer_id) else {
309            return;
310        };
311
312        for hash in transaction_hashes {
313            if let Some(job_state) = peer_transactions.get_mut(hash) {
314                if let TransactionPoolCandidateState::Received { transaction, .. } = job_state {
315                    *job_state = TransactionPoolCandidateState::VerifyPending {
316                        time,
317                        transaction: transaction.clone(),
318                        verify_id,
319                    };
320                }
321            }
322        }
323    }
324
325    pub fn verify_result(
326        &mut self,
327        _time: Timestamp,
328        peer_id: &PeerId,
329        verify_id: (),
330        from_source: &TransactionPoolMessageSource,
331        _result: Result<(), ()>,
332    ) {
333        match from_source {
334            TransactionPoolMessageSource::Pubsub { id } => {
335                let Some((_, transactions)) = self.by_message_id.remove(id) else {
336                    return;
337                };
338
339                for hash in transactions {
340                    self.transaction_remove(&hash);
341                }
342            }
343            _ => {
344                if let Some(peer_transactions) = self.by_peer.get_mut(peer_id) {
345                    let txs_to_remove = peer_transactions
346                        .iter()
347                        .filter(|(_, job_state)| job_state.pending_verify_id() == Some(verify_id))
348                        .map(|(hash, _)| hash.clone())
349                        .collect::<Vec<_>>();
350
351                    for hash in txs_to_remove {
352                        self.transaction_remove(&hash);
353                    }
354                }
355            }
356        }
357    }
358
359    pub fn peer_remove(&mut self, peer_id: PeerId) {
360        if let Some(txs) = self.by_peer.remove(&peer_id) {
361            for hash in txs.into_keys() {
362                if let Some(peers) = self.by_hash.get_mut(&hash) {
363                    peers.remove(&peer_id);
364                    if peers.is_empty() {
365                        self.by_hash.remove(&hash);
366                    }
367                }
368            }
369        }
370    }
371
372    pub fn peer_transaction_remove(&mut self, peer_id: PeerId, hash: &TransactionHash) {
373        if let Some(txs) = self.by_peer.get_mut(&peer_id) {
374            txs.remove(hash);
375            if let Some(peers) = self.by_hash.get_mut(hash) {
376                peers.remove(&peer_id);
377                if peers.is_empty() {
378                    self.by_hash.remove(hash);
379                }
380            }
381        }
382    }
383
384    fn transaction_remove(&mut self, hash: &TransactionHash) {
385        if let Some(peers) = self.by_hash.remove(hash) {
386            for peer_id in peers {
387                if let Some(txs) = self.by_peer.get_mut(&peer_id) {
388                    txs.remove(hash);
389                }
390            }
391        }
392    }
393
394    pub fn remove_inferior_transactions(&mut self, transaction: &Transaction) {
395        // TODO(binier)
396        match transaction.hash() {
397            Err(err) => {
398                openmina_core::bug_condition!("tx hashing failed: {err}");
399            }
400            Ok(hash) => self.transaction_remove(&hash),
401        };
402    }
403
404    pub fn retain<F1, F2>(&mut self, mut predicate: F1)
405    where
406        F1: FnMut(&TransactionHash) -> F2,
407        F2: FnMut(&TransactionPoolCandidateState) -> bool,
408    {
409        let by_peer = &mut self.by_peer;
410        self.by_hash.retain(|hash, peers| {
411            let mut predicate = predicate(hash);
412            peers.retain(|peer_id| {
413                if let Some(peer_txs) = by_peer.get_mut(peer_id) {
414                    match peer_txs.get(hash) {
415                        Some(s) if predicate(s) => true,
416                        Some(_) => {
417                            peer_txs.remove(hash);
418                            false
419                        }
420                        None => false,
421                    }
422                } else {
423                    false
424                }
425            });
426            !peers.is_empty()
427        })
428    }
429}
430
431impl TransactionPoolCandidateState {
432    pub fn fee(&self) -> u64 {
433        match self {
434            Self::InfoReceived { info, .. } | Self::FetchPending { info, .. } => info.fee,
435            Self::Received { transaction, .. }
436            | Self::VerifyPending { transaction, .. }
437            | Self::VerifyError { transaction, .. }
438            | Self::VerifySuccess { transaction, .. } => match transaction.body() {
439                v2::MinaBaseUserCommandStableV2::SignedCommand(v) => v.payload.common.fee.as_u64(),
440                v2::MinaBaseUserCommandStableV2::ZkappCommand(v) => v.fee_payer.body.fee.as_u64(),
441            },
442        }
443    }
444
445    pub fn transaction(&self) -> Option<&TransactionWithHash> {
446        match self {
447            Self::InfoReceived { .. } => None,
448            Self::FetchPending { .. } => None,
449            Self::Received { transaction, .. } => Some(transaction),
450            Self::VerifyPending { transaction, .. } => Some(transaction),
451            Self::VerifyError { transaction, .. } => Some(transaction),
452            Self::VerifySuccess { transaction, .. } => Some(transaction),
453        }
454    }
455
456    pub fn pending_verify_id(&self) -> Option<()> {
457        match self {
458            Self::VerifyPending { verify_id, .. } => Some(*verify_id),
459            _ => None,
460        }
461    }
462}