node/transaction_pool/candidate/
transaction_pool_candidate_reducer.rs

1#![allow(clippy::unit_arg)]
2
3use crate::{p2p_ready, TransactionPoolAction};
4use p2p::{
5    channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest},
6    PeerId,
7};
8
9use super::{
10    TransactionPoolCandidateAction, TransactionPoolCandidateActionWithMetaRef,
11    TransactionPoolCandidatesState,
12};
13
14impl TransactionPoolCandidatesState {
15    pub fn reducer(
16        mut state_context: crate::Substate<Self>,
17        action: TransactionPoolCandidateActionWithMetaRef<'_>,
18    ) {
19        let Ok(state) = state_context.get_substate_mut() else {
20            // TODO: log or propagate
21            return;
22        };
23        let (action, meta) = action.split();
24
25        match action {
26            TransactionPoolCandidateAction::InfoReceived { peer_id, info } => {
27                state.info_received(meta.time(), *peer_id, info.clone());
28            }
29            TransactionPoolCandidateAction::FetchAll => {
30                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
31                let p2p = p2p_ready!(global_state.p2p, meta.time());
32                let peers = p2p.ready_peers_iter().map(|(id, _)| *id);
33                let get_order = |_hash: &_| {
34                    // TODO(binier)
35                    0
36                };
37                let list = global_state
38                    .transaction_pool
39                    .candidates
40                    .peers_next_transactions_to_fetch(peers, get_order);
41
42                for (peer_id, hash) in list {
43                    dispatcher.push(TransactionPoolCandidateAction::FetchInit { peer_id, hash });
44                }
45            }
46            TransactionPoolCandidateAction::FetchInit { peer_id, hash } => {
47                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
48                let peer_id = *peer_id;
49                let hash = hash.clone();
50                let p2p = p2p_ready!(global_state.p2p, meta.time());
51                let Some(peer) = p2p.get_ready_peer(&peer_id) else {
52                    return;
53                };
54                let rpc_id = peer.channels.next_local_rpc_id();
55
56                dispatcher.push(P2pChannelsRpcAction::RequestSend {
57                    peer_id,
58                    id: rpc_id,
59                    request: Box::new(P2pRpcRequest::Transaction(hash.clone())),
60                    on_init: Some(redux::callback!(
61                        on_send_p2p_snark_rpc_request(
62                            (peer_id: PeerId, rpc_id: P2pRpcId, request: P2pRpcRequest)
63                        ) -> crate::Action {
64                            let P2pRpcRequest::Transaction(hash) = request else {
65                                unreachable!()
66                            };
67                            TransactionPoolCandidateAction::FetchPending {
68                                hash,
69                                peer_id,
70                                rpc_id,
71                            }
72                        }
73                    )),
74                });
75            }
76            TransactionPoolCandidateAction::FetchPending {
77                peer_id,
78                hash,
79                rpc_id,
80            } => {
81                state.fetch_pending(meta.time(), peer_id, hash, *rpc_id);
82            }
83            TransactionPoolCandidateAction::FetchError { peer_id, hash } => {
84                state.peer_transaction_remove(*peer_id, hash);
85            }
86            TransactionPoolCandidateAction::FetchSuccess {
87                peer_id,
88                transaction,
89            } => {
90                state.transaction_received(meta.time(), *peer_id, transaction.clone());
91            }
92            TransactionPoolCandidateAction::Libp2pTransactionsReceived {
93                peer_id,
94                transactions,
95                message_id,
96            } => {
97                state.transactions_received(
98                    meta.time(),
99                    *peer_id,
100                    transactions.clone(),
101                    *message_id,
102                );
103            }
104            TransactionPoolCandidateAction::VerifyNext => {
105                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
106
107                let batch = global_state
108                    .transaction_pool
109                    .candidates
110                    .get_batch_to_verify();
111                let Some((peer_id, batch, from_source)) = batch else {
112                    return;
113                };
114
115                let transaction_hashes = batch.iter().map(|tx| tx.hash().clone()).collect();
116                dispatcher.push(TransactionPoolAction::StartVerify {
117                    commands: batch.into_iter().collect(),
118                    from_source,
119                });
120                dispatcher.push(TransactionPoolCandidateAction::VerifyPending {
121                    peer_id,
122                    transaction_hashes,
123                    verify_id: (),
124                    from_source,
125                });
126            }
127            TransactionPoolCandidateAction::VerifyPending {
128                peer_id,
129                transaction_hashes,
130                verify_id,
131                from_source,
132            } => {
133                state.verify_pending(meta.time(), peer_id, *verify_id, transaction_hashes);
134                let dispatcher = state_context.into_dispatcher();
135                dispatcher.push(TransactionPoolCandidateAction::VerifySuccess {
136                    peer_id: *peer_id,
137                    verify_id: *verify_id,
138                    from_source: *from_source,
139                });
140            }
141            TransactionPoolCandidateAction::VerifyError {
142                peer_id: _,
143                verify_id: _,
144            } => {
145                unreachable!("TODO(binier)");
146                // state.verify_result(meta.time(), peer_id, *verify_id, Err(()));
147
148                // // TODO(binier): blacklist peer
149                // let dispatcher = state_context.into_dispatcher();
150                // let peer_id = *peer_id;
151                // dispatcher.push(P2pDisconnectionAction::Init {
152                //     peer_id,
153                //     reason: P2pDisconnectionReason::TransactionPoolVerifyError,
154                // });
155            }
156            TransactionPoolCandidateAction::VerifySuccess {
157                peer_id,
158                verify_id,
159                from_source,
160            } => {
161                state.verify_result(meta.time(), peer_id, *verify_id, from_source, Ok(()));
162            }
163            TransactionPoolCandidateAction::PeerPrune { peer_id } => {
164                state.peer_remove(*peer_id);
165            }
166        }
167    }
168}