node/transaction_pool/candidate/
transaction_pool_candidate_reducer.rs1#![allow(clippy::unit_arg)]
2
3use crate::{p2p_ready, TransactionPoolAction};
4use p2p::{
5 channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest},
6 PeerId,
7};
8
9use super::{
10 TransactionPoolCandidateAction, TransactionPoolCandidateActionWithMetaRef,
11 TransactionPoolCandidatesState,
12};
13
14impl TransactionPoolCandidatesState {
15 pub fn reducer(
16 mut state_context: crate::Substate<Self>,
17 action: TransactionPoolCandidateActionWithMetaRef<'_>,
18 ) {
19 let Ok(state) = state_context.get_substate_mut() else {
20 return;
22 };
23 let (action, meta) = action.split();
24
25 match action {
26 TransactionPoolCandidateAction::InfoReceived { peer_id, info } => {
27 state.info_received(meta.time(), *peer_id, info.clone());
28 }
29 TransactionPoolCandidateAction::FetchAll => {
30 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
31 let p2p = p2p_ready!(global_state.p2p, meta.time());
32 let peers = p2p.ready_peers_iter().map(|(id, _)| *id);
33 let get_order = |_hash: &_| {
34 0
36 };
37 let list = global_state
38 .transaction_pool
39 .candidates
40 .peers_next_transactions_to_fetch(peers, get_order);
41
42 for (peer_id, hash) in list {
43 dispatcher.push(TransactionPoolCandidateAction::FetchInit { peer_id, hash });
44 }
45 }
46 TransactionPoolCandidateAction::FetchInit { peer_id, hash } => {
47 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
48 let peer_id = *peer_id;
49 let hash = hash.clone();
50 let p2p = p2p_ready!(global_state.p2p, meta.time());
51 let Some(peer) = p2p.get_ready_peer(&peer_id) else {
52 return;
53 };
54 let rpc_id = peer.channels.next_local_rpc_id();
55
56 dispatcher.push(P2pChannelsRpcAction::RequestSend {
57 peer_id,
58 id: rpc_id,
59 request: Box::new(P2pRpcRequest::Transaction(hash.clone())),
60 on_init: Some(redux::callback!(
61 on_send_p2p_snark_rpc_request(
62 (peer_id: PeerId, rpc_id: P2pRpcId, request: P2pRpcRequest)
63 ) -> crate::Action {
64 let P2pRpcRequest::Transaction(hash) = request else {
65 unreachable!()
66 };
67 TransactionPoolCandidateAction::FetchPending {
68 hash,
69 peer_id,
70 rpc_id,
71 }
72 }
73 )),
74 });
75 }
76 TransactionPoolCandidateAction::FetchPending {
77 peer_id,
78 hash,
79 rpc_id,
80 } => {
81 state.fetch_pending(meta.time(), peer_id, hash, *rpc_id);
82 }
83 TransactionPoolCandidateAction::FetchError { peer_id, hash } => {
84 state.peer_transaction_remove(*peer_id, hash);
85 }
86 TransactionPoolCandidateAction::FetchSuccess {
87 peer_id,
88 transaction,
89 } => {
90 state.transaction_received(meta.time(), *peer_id, transaction.clone());
91 }
92 TransactionPoolCandidateAction::Libp2pTransactionsReceived {
93 peer_id,
94 transactions,
95 message_id,
96 } => {
97 state.transactions_received(
98 meta.time(),
99 *peer_id,
100 transactions.clone(),
101 *message_id,
102 );
103 }
104 TransactionPoolCandidateAction::VerifyNext => {
105 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
106
107 let batch = global_state
108 .transaction_pool
109 .candidates
110 .get_batch_to_verify();
111 let Some((peer_id, batch, from_source)) = batch else {
112 return;
113 };
114
115 let transaction_hashes = batch.iter().map(|tx| tx.hash().clone()).collect();
116 dispatcher.push(TransactionPoolAction::StartVerify {
117 commands: batch.into_iter().collect(),
118 from_source,
119 });
120 dispatcher.push(TransactionPoolCandidateAction::VerifyPending {
121 peer_id,
122 transaction_hashes,
123 verify_id: (),
124 from_source,
125 });
126 }
127 TransactionPoolCandidateAction::VerifyPending {
128 peer_id,
129 transaction_hashes,
130 verify_id,
131 from_source,
132 } => {
133 state.verify_pending(meta.time(), peer_id, *verify_id, transaction_hashes);
134 let dispatcher = state_context.into_dispatcher();
135 dispatcher.push(TransactionPoolCandidateAction::VerifySuccess {
136 peer_id: *peer_id,
137 verify_id: *verify_id,
138 from_source: *from_source,
139 });
140 }
141 TransactionPoolCandidateAction::VerifyError {
142 peer_id: _,
143 verify_id: _,
144 } => {
145 unreachable!("TODO(binier)");
146 }
156 TransactionPoolCandidateAction::VerifySuccess {
157 peer_id,
158 verify_id,
159 from_source,
160 } => {
161 state.verify_result(meta.time(), peer_id, *verify_id, from_source, Ok(()));
162 }
163 TransactionPoolCandidateAction::PeerPrune { peer_id } => {
164 state.peer_remove(*peer_id);
165 }
166 }
167 }
168}