1#![allow(clippy::unit_arg)]
2
3use std::collections::{BTreeMap, BTreeSet};
4
5use mina_p2p_messages::v2;
6use openmina_core::transaction::TransactionPoolMessageSource;
7use p2p::P2pNetworkPubsubMessageCacheId;
8use redux::Timestamp;
9use serde::{Deserialize, Serialize};
10
11use crate::{
12 core::transaction::{Transaction, TransactionHash, TransactionInfo, TransactionWithHash},
13 p2p::{channels::rpc::P2pRpcId, PeerId},
14};
15
16static EMPTY_PEER_TX_CANDIDATES: BTreeMap<TransactionHash, TransactionPoolCandidateState> =
17 BTreeMap::new();
18
19type NextBatch = (
20 PeerId,
21 Vec<TransactionWithHash>,
22 TransactionPoolMessageSource,
23);
24
25#[derive(Serialize, Deserialize, Debug, Clone, Default)]
26pub struct TransactionPoolCandidatesState {
27 by_peer: BTreeMap<PeerId, BTreeMap<TransactionHash, TransactionPoolCandidateState>>,
28 by_hash: BTreeMap<TransactionHash, BTreeSet<PeerId>>,
29 by_message_id: BTreeMap<P2pNetworkPubsubMessageCacheId, (PeerId, Vec<TransactionHash>)>,
30}
31
32#[derive(Serialize, Deserialize, Debug, Clone)]
33pub enum TransactionPoolCandidateState {
34 InfoReceived {
35 time: Timestamp,
36 info: TransactionInfo,
37 },
38 FetchPending {
39 time: Timestamp,
40 info: TransactionInfo,
41 rpc_id: P2pRpcId,
42 },
43 Received {
44 time: Timestamp,
45 transaction: TransactionWithHash,
46 },
47 VerifyPending {
48 time: Timestamp,
49 transaction: TransactionWithHash,
50 verify_id: (),
51 },
52 VerifyError {
53 time: Timestamp,
54 transaction: TransactionWithHash,
55 },
56 VerifySuccess {
57 time: Timestamp,
58 transaction: TransactionWithHash,
59 },
60}
61
62impl TransactionPoolCandidatesState {
63 pub fn new() -> Self {
64 Self::default()
65 }
66
67 pub fn transactions_count(&self) -> usize {
68 self.by_hash.len()
69 }
70
71 pub fn peer_transaction_count(&self, peer_id: &PeerId) -> usize {
72 self.by_peer.get(peer_id).map(|v| v.len()).unwrap_or(0)
73 }
74
75 pub fn contains(&self, hash: &TransactionHash) -> bool {
76 self.by_hash.contains_key(hash)
77 }
78
79 pub fn message_id_contains(&self, message_id: &P2pNetworkPubsubMessageCacheId) -> bool {
80 self.by_message_id.contains_key(message_id)
81 }
82
83 pub fn peer_contains(&self, peer_id: PeerId, hash: &TransactionHash) -> bool {
84 self.by_peer
85 .get(&peer_id)
86 .is_some_and(|txs| txs.contains_key(hash))
87 }
88
89 pub fn get(
90 &self,
91 peer_id: PeerId,
92 hash: &TransactionHash,
93 ) -> Option<&TransactionPoolCandidateState> {
94 self.by_peer.get(&peer_id)?.get(hash)
95 }
96
97 fn transactions_from_peer_or_empty(
98 &self,
99 peer_id: PeerId,
100 ) -> &BTreeMap<TransactionHash, TransactionPoolCandidateState> {
101 self.by_peer
102 .get(&peer_id)
103 .unwrap_or(&EMPTY_PEER_TX_CANDIDATES)
104 }
105
106 pub fn candidates_from_peer_iter(
107 &self,
108 peer_id: PeerId,
109 ) -> impl Iterator<Item = (&TransactionHash, &TransactionPoolCandidateState)> {
110 self.transactions_from_peer_or_empty(peer_id).iter()
111 }
112
113 pub fn candidates_from_peer_with_hashes<'a, I>(
114 &'a self,
115 peer_id: PeerId,
116 transaction_hashes: I,
117 ) -> impl Iterator<
118 Item = (
119 &'a TransactionHash,
120 Option<&'a TransactionPoolCandidateState>,
121 ),
122 >
123 where
124 I: IntoIterator<Item = &'a TransactionHash>,
125 {
126 let transactions = self.transactions_from_peer_or_empty(peer_id);
127 transaction_hashes
128 .into_iter()
129 .map(|hash| (hash, transactions.get(hash)))
130 }
131
132 pub fn info_received(&mut self, time: Timestamp, peer_id: PeerId, info: TransactionInfo) {
133 self.by_hash
134 .entry(info.hash.clone())
135 .or_default()
136 .insert(peer_id);
137
138 let hash = info.hash.clone();
139 let state = TransactionPoolCandidateState::InfoReceived { time, info };
140 self.by_peer.entry(peer_id).or_default().insert(hash, state);
141 }
142
143 pub fn peers_next_transactions_to_fetch<I, F>(
144 &self,
145 peers: I,
146 get_order: F,
147 ) -> Vec<(PeerId, TransactionHash)>
148 where
149 I: IntoIterator<Item = PeerId>,
150 F: Copy + Fn(&TransactionHash) -> usize,
151 {
152 let mut needs_fetching = peers
153 .into_iter()
154 .filter_map(|peer_id| Some((peer_id, self.by_peer.get(&peer_id)?)))
155 .flat_map(|(peer_id, transactions)| {
156 transactions
157 .iter()
158 .filter(|(_, state)| {
159 matches!(state, TransactionPoolCandidateState::InfoReceived { .. })
160 })
161 .map(move |(hash, state)| (get_order(hash), state.fee(), peer_id, hash))
162 })
163 .collect::<Vec<_>>();
164 needs_fetching
165 .sort_by(|(ord1, fee1, ..), (ord2, fee2, ..)| ord1.cmp(ord2).then(fee1.cmp(fee2)));
166
167 needs_fetching
168 .into_iter()
169 .scan(None, |last_ord, (ord, _, peer_id, hash)| {
170 if *last_ord == Some(ord) {
171 return Some(None);
172 }
173 *last_ord = Some(ord);
174 Some(Some((peer_id, hash.clone())))
175 })
176 .flatten()
177 .collect()
178 }
179
180 pub fn fetch_pending(
181 &mut self,
182 time: Timestamp,
183 peer_id: &PeerId,
184 hash: &TransactionHash,
185 rpc_id: P2pRpcId,
186 ) {
187 if let Some(state) = self
188 .by_peer
189 .get_mut(peer_id)
190 .and_then(|transactions| transactions.get_mut(hash))
191 {
192 if let TransactionPoolCandidateState::InfoReceived { info, .. } = state {
193 *state = TransactionPoolCandidateState::FetchPending {
194 time,
195 info: info.clone(),
196 rpc_id,
197 };
198 }
199 }
200 }
201
202 pub fn transaction_received(
203 &mut self,
204 time: Timestamp,
205 peer_id: PeerId,
206 transaction: TransactionWithHash,
207 ) {
208 let hash = transaction.hash().clone();
209 self.by_hash
210 .entry(hash.clone())
211 .or_default()
212 .insert(peer_id);
213
214 let state = TransactionPoolCandidateState::Received { time, transaction };
215 self.by_peer.entry(peer_id).or_default().insert(hash, state);
216 }
217
218 pub fn transactions_received(
219 &mut self,
220 time: Timestamp,
221 peer_id: PeerId,
222 transactions: Vec<TransactionWithHash>,
223 message_id: P2pNetworkPubsubMessageCacheId,
224 ) {
225 let transaction_hashes = transactions
226 .iter()
227 .map(TransactionWithHash::hash)
228 .cloned()
229 .collect::<Vec<_>>();
230
231 self.by_message_id
232 .insert(message_id, (peer_id, transaction_hashes));
233
234 transactions.into_iter().for_each(|transaction| {
235 self.transaction_received(time, peer_id, transaction);
236 })
237 }
238
239 pub fn get_batch_to_verify(&self) -> Option<NextBatch> {
243 self.next_batch_from_pubsub()
244 .or_else(|| self.next_batch_from_peers())
245 }
246
247 fn next_batch_from_peers(&self) -> Option<NextBatch> {
248 for (hash, peers) in self.by_hash.iter() {
249 if let Some(res) = None.or_else(|| {
250 for peer_id in peers {
251 let peer_transactions = self.by_peer.get(peer_id)?;
252 if peer_transactions.get(hash)?.transaction().is_some() {
253 let transactions = peer_transactions
254 .iter()
255 .filter_map(|(_, v)| match v {
256 TransactionPoolCandidateState::Received { transaction, .. } => {
257 Some(transaction)
258 }
259 _ => None,
260 })
261 .cloned()
262 .collect();
263 return Some((*peer_id, transactions, TransactionPoolMessageSource::None));
264 }
265 }
266 None
267 }) {
268 return Some(res);
269 }
270 }
271
272 None
273 }
274
275 fn next_batch_from_pubsub(&self) -> Option<NextBatch> {
276 let (message_id, (peer_id, transaction_hashes)) = self.by_message_id.iter().next()?;
277 let transactions = self
278 .by_peer
279 .get(peer_id)?
280 .iter()
281 .filter_map(|(hash, state)| {
282 let TransactionPoolCandidateState::Received { transaction, .. } = state else {
283 return None;
284 };
285 if transaction_hashes.contains(hash) {
286 Some(transaction)
287 } else {
288 None
289 }
290 })
291 .cloned()
292 .collect();
293
294 Some((
295 *peer_id,
296 transactions,
297 TransactionPoolMessageSource::pubsub(*message_id),
298 ))
299 }
300
301 pub fn verify_pending(
302 &mut self,
303 time: Timestamp,
304 peer_id: &PeerId,
305 verify_id: (),
306 transaction_hashes: &[TransactionHash],
307 ) {
308 let Some(peer_transactions) = self.by_peer.get_mut(peer_id) else {
309 return;
310 };
311
312 for hash in transaction_hashes {
313 if let Some(job_state) = peer_transactions.get_mut(hash) {
314 if let TransactionPoolCandidateState::Received { transaction, .. } = job_state {
315 *job_state = TransactionPoolCandidateState::VerifyPending {
316 time,
317 transaction: transaction.clone(),
318 verify_id,
319 };
320 }
321 }
322 }
323 }
324
325 pub fn verify_result(
326 &mut self,
327 _time: Timestamp,
328 peer_id: &PeerId,
329 verify_id: (),
330 from_source: &TransactionPoolMessageSource,
331 _result: Result<(), ()>,
332 ) {
333 match from_source {
334 TransactionPoolMessageSource::Pubsub { id } => {
335 let Some((_, transactions)) = self.by_message_id.remove(id) else {
336 return;
337 };
338
339 for hash in transactions {
340 self.transaction_remove(&hash);
341 }
342 }
343 _ => {
344 if let Some(peer_transactions) = self.by_peer.get_mut(peer_id) {
345 let txs_to_remove = peer_transactions
346 .iter()
347 .filter(|(_, job_state)| job_state.pending_verify_id() == Some(verify_id))
348 .map(|(hash, _)| hash.clone())
349 .collect::<Vec<_>>();
350
351 for hash in txs_to_remove {
352 self.transaction_remove(&hash);
353 }
354 }
355 }
356 }
357 }
358
359 pub fn peer_remove(&mut self, peer_id: PeerId) {
360 if let Some(txs) = self.by_peer.remove(&peer_id) {
361 for hash in txs.into_keys() {
362 if let Some(peers) = self.by_hash.get_mut(&hash) {
363 peers.remove(&peer_id);
364 if peers.is_empty() {
365 self.by_hash.remove(&hash);
366 }
367 }
368 }
369 }
370 }
371
372 pub fn peer_transaction_remove(&mut self, peer_id: PeerId, hash: &TransactionHash) {
373 if let Some(txs) = self.by_peer.get_mut(&peer_id) {
374 txs.remove(hash);
375 if let Some(peers) = self.by_hash.get_mut(hash) {
376 peers.remove(&peer_id);
377 if peers.is_empty() {
378 self.by_hash.remove(hash);
379 }
380 }
381 }
382 }
383
384 fn transaction_remove(&mut self, hash: &TransactionHash) {
385 if let Some(peers) = self.by_hash.remove(hash) {
386 for peer_id in peers {
387 if let Some(txs) = self.by_peer.get_mut(&peer_id) {
388 txs.remove(hash);
389 }
390 }
391 }
392 }
393
394 pub fn remove_inferior_transactions(&mut self, transaction: &Transaction) {
395 match transaction.hash() {
397 Err(err) => {
398 openmina_core::bug_condition!("tx hashing failed: {err}");
399 }
400 Ok(hash) => self.transaction_remove(&hash),
401 };
402 }
403
404 pub fn retain<F1, F2>(&mut self, mut predicate: F1)
405 where
406 F1: FnMut(&TransactionHash) -> F2,
407 F2: FnMut(&TransactionPoolCandidateState) -> bool,
408 {
409 let by_peer = &mut self.by_peer;
410 self.by_hash.retain(|hash, peers| {
411 let mut predicate = predicate(hash);
412 peers.retain(|peer_id| {
413 if let Some(peer_txs) = by_peer.get_mut(peer_id) {
414 match peer_txs.get(hash) {
415 Some(s) if predicate(s) => true,
416 Some(_) => {
417 peer_txs.remove(hash);
418 false
419 }
420 None => false,
421 }
422 } else {
423 false
424 }
425 });
426 !peers.is_empty()
427 })
428 }
429}
430
431impl TransactionPoolCandidateState {
432 pub fn fee(&self) -> u64 {
433 match self {
434 Self::InfoReceived { info, .. } | Self::FetchPending { info, .. } => info.fee,
435 Self::Received { transaction, .. }
436 | Self::VerifyPending { transaction, .. }
437 | Self::VerifyError { transaction, .. }
438 | Self::VerifySuccess { transaction, .. } => match transaction.body() {
439 v2::MinaBaseUserCommandStableV2::SignedCommand(v) => v.payload.common.fee.as_u64(),
440 v2::MinaBaseUserCommandStableV2::ZkappCommand(v) => v.fee_payer.body.fee.as_u64(),
441 },
442 }
443 }
444
445 pub fn transaction(&self) -> Option<&TransactionWithHash> {
446 match self {
447 Self::InfoReceived { .. } => None,
448 Self::FetchPending { .. } => None,
449 Self::Received { transaction, .. } => Some(transaction),
450 Self::VerifyPending { transaction, .. } => Some(transaction),
451 Self::VerifyError { transaction, .. } => Some(transaction),
452 Self::VerifySuccess { transaction, .. } => Some(transaction),
453 }
454 }
455
456 pub fn pending_verify_id(&self) -> Option<()> {
457 match self {
458 Self::VerifyPending { verify_id, .. } => Some(*verify_id),
459 _ => None,
460 }
461 }
462}