node/transaction_pool/candidate/
transaction_pool_candidate_actions.rs

1use openmina_core::{
2    transaction::{
3        TransactionHash, TransactionInfo, TransactionPoolMessageSource, TransactionWithHash,
4    },
5    ActionEvent,
6};
7use p2p::P2pNetworkPubsubMessageCacheId;
8use serde::{Deserialize, Serialize};
9
10use crate::p2p::{channels::rpc::P2pRpcId, PeerId};
11
12use super::TransactionPoolCandidateState;
13
14pub type TransactionPoolCandidateActionWithMeta =
15    redux::ActionWithMeta<TransactionPoolCandidateAction>;
16pub type TransactionPoolCandidateActionWithMetaRef<'a> =
17    redux::ActionWithMeta<&'a TransactionPoolCandidateAction>;
18
19#[derive(Serialize, Deserialize, Debug, Clone, ActionEvent)]
20pub enum TransactionPoolCandidateAction {
21    InfoReceived {
22        peer_id: PeerId,
23        info: TransactionInfo,
24    },
25    #[action_event(level = trace)]
26    FetchAll,
27    FetchInit {
28        peer_id: PeerId,
29        hash: TransactionHash,
30    },
31    FetchPending {
32        peer_id: PeerId,
33        hash: TransactionHash,
34        rpc_id: P2pRpcId,
35    },
36    FetchError {
37        peer_id: PeerId,
38        hash: TransactionHash,
39    },
40    FetchSuccess {
41        peer_id: PeerId,
42        transaction: TransactionWithHash,
43    },
44    /// Callback for transactions received over pubsub
45    Libp2pTransactionsReceived {
46        peer_id: PeerId,
47        transactions: Vec<TransactionWithHash>,
48        message_id: P2pNetworkPubsubMessageCacheId,
49    },
50    #[action_event(level = trace)]
51    VerifyNext,
52    VerifyPending {
53        peer_id: PeerId,
54        transaction_hashes: Vec<TransactionHash>,
55        verify_id: (),
56        from_source: TransactionPoolMessageSource,
57    },
58    VerifyError {
59        peer_id: PeerId,
60        verify_id: (),
61    },
62    VerifySuccess {
63        peer_id: PeerId,
64        verify_id: (),
65        from_source: TransactionPoolMessageSource,
66    },
67    PeerPrune {
68        peer_id: PeerId,
69    },
70}
71
72impl redux::EnablingCondition<crate::State> for TransactionPoolCandidateAction {
73    fn is_enabled(&self, state: &crate::State, _time: redux::Timestamp) -> bool {
74        match self {
75            TransactionPoolCandidateAction::InfoReceived { peer_id, info } => {
76                !state.transaction_pool.contains(&info.hash)
77                    && !state
78                        .transaction_pool
79                        .candidates
80                        .peer_contains(*peer_id, &info.hash)
81            }
82            TransactionPoolCandidateAction::FetchAll => state.p2p.ready().is_some(),
83            TransactionPoolCandidateAction::FetchInit { peer_id, hash } => {
84                let is_peer_available = state
85                    .p2p
86                    .get_ready_peer(peer_id)
87                    .is_some_and(|peer| peer.channels.rpc.can_send_request());
88                is_peer_available
89                    && state
90                        .transaction_pool
91                        .candidates
92                        .get(*peer_id, hash)
93                        .is_some_and(|s| {
94                            matches!(s, TransactionPoolCandidateState::InfoReceived { .. })
95                        })
96            }
97            TransactionPoolCandidateAction::FetchPending { peer_id, hash, .. } => state
98                .transaction_pool
99                .candidates
100                .get(*peer_id, hash)
101                .is_some_and(|s| matches!(s, TransactionPoolCandidateState::InfoReceived { .. })),
102            TransactionPoolCandidateAction::FetchError { peer_id, hash } => state
103                .transaction_pool
104                .candidates
105                .get(*peer_id, hash)
106                .is_some(),
107            TransactionPoolCandidateAction::FetchSuccess {
108                peer_id,
109                transaction,
110            } => state
111                .transaction_pool
112                .candidates
113                .get(*peer_id, transaction.hash())
114                .is_some(),
115            TransactionPoolCandidateAction::Libp2pTransactionsReceived { .. } => true,
116            TransactionPoolCandidateAction::VerifyNext => {
117                // Don't continue if we are producing a block, or we never synced yet
118                // or if the ledger service is busy.
119                !state.block_producer.is_producing()
120                    && state
121                        .transition_frontier
122                        .best_tip()
123                        .is_some_and(|b| !b.is_genesis())
124                    && !state.ledger.write.is_busy()
125            }
126            TransactionPoolCandidateAction::VerifyPending {
127                peer_id,
128                transaction_hashes,
129                ..
130            } => {
131                !transaction_hashes.is_empty()
132                    && state
133                        .transaction_pool
134                        .candidates
135                        .candidates_from_peer_with_hashes(*peer_id, transaction_hashes)
136                        .all(|(_, state)| {
137                            matches!(state, Some(TransactionPoolCandidateState::Received { .. }))
138                        })
139            }
140            TransactionPoolCandidateAction::VerifyError { .. } => {
141                // TODO(binier)
142                true
143            }
144            TransactionPoolCandidateAction::VerifySuccess { .. } => {
145                // TODO(binier)
146                true
147            }
148            TransactionPoolCandidateAction::PeerPrune { peer_id } => {
149                state
150                    .transaction_pool
151                    .candidates
152                    .peer_transaction_count(peer_id)
153                    > 0
154            }
155        }
156    }
157}
158
159use crate::transaction_pool::TransactionPoolAction;
160
161impl From<TransactionPoolCandidateAction> for crate::Action {
162    fn from(value: TransactionPoolCandidateAction) -> Self {
163        Self::TransactionPool(TransactionPoolAction::Candidate(value))
164    }
165}