mina_node/transaction_pool/candidate/
transaction_pool_candidate_reducer.rs1#![allow(clippy::unit_arg)]
2
3use crate::{
4 p2p::{
5 channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest},
6 PeerId,
7 },
8 p2p_ready, TransactionPoolAction,
9};
10
11use super::{
12 TransactionPoolCandidateAction, TransactionPoolCandidateActionWithMetaRef,
13 TransactionPoolCandidatesState,
14};
15
16impl TransactionPoolCandidatesState {
17 pub fn reducer(
18 mut state_context: crate::Substate<Self>,
19 action: TransactionPoolCandidateActionWithMetaRef<'_>,
20 ) {
21 let Ok(state) = state_context.get_substate_mut() else {
22 return;
24 };
25 let (action, meta) = action.split();
26
27 match action {
28 TransactionPoolCandidateAction::InfoReceived { peer_id, info } => {
29 state.info_received(meta.time(), *peer_id, info.clone());
30 }
31 TransactionPoolCandidateAction::FetchAll => {
32 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
33 let p2p = p2p_ready!(global_state.p2p, meta.time());
34 let peers = p2p.ready_peers_iter().map(|(id, _)| *id);
35 let get_order = |_hash: &_| {
36 0
38 };
39 let list = global_state
40 .transaction_pool
41 .candidates
42 .peers_next_transactions_to_fetch(peers, get_order);
43
44 for (peer_id, hash) in list {
45 dispatcher.push(TransactionPoolCandidateAction::FetchInit { peer_id, hash });
46 }
47 }
48 TransactionPoolCandidateAction::FetchInit { peer_id, hash } => {
49 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
50 let peer_id = *peer_id;
51 let hash = hash.clone();
52 let p2p = p2p_ready!(global_state.p2p, meta.time());
53 let Some(peer) = p2p.get_ready_peer(&peer_id) else {
54 return;
55 };
56 let rpc_id = peer.channels.next_local_rpc_id();
57
58 dispatcher.push(P2pChannelsRpcAction::RequestSend {
59 peer_id,
60 id: rpc_id,
61 request: Box::new(P2pRpcRequest::Transaction(hash.clone())),
62 on_init: Some(redux::callback!(
63 on_send_p2p_snark_rpc_request(
64 (peer_id: PeerId, rpc_id: P2pRpcId, request: P2pRpcRequest)
65 ) -> crate::Action {
66 let P2pRpcRequest::Transaction(hash) = request else {
67 unreachable!()
68 };
69 TransactionPoolCandidateAction::FetchPending {
70 hash,
71 peer_id,
72 rpc_id,
73 }
74 }
75 )),
76 });
77 }
78 TransactionPoolCandidateAction::FetchPending {
79 peer_id,
80 hash,
81 rpc_id,
82 } => {
83 state.fetch_pending(meta.time(), peer_id, hash, *rpc_id);
84 }
85 TransactionPoolCandidateAction::FetchError { peer_id, hash } => {
86 state.peer_transaction_remove(*peer_id, hash);
87 }
88 TransactionPoolCandidateAction::FetchSuccess {
89 peer_id,
90 transaction,
91 } => {
92 state.transaction_received(meta.time(), *peer_id, transaction.clone());
93 }
94 TransactionPoolCandidateAction::Libp2pTransactionsReceived {
95 peer_id,
96 transactions,
97 message_id,
98 } => {
99 state.transactions_received(
100 meta.time(),
101 *peer_id,
102 transactions.clone(),
103 *message_id,
104 );
105 }
106 TransactionPoolCandidateAction::VerifyNext => {
107 let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
108
109 let batch = global_state
110 .transaction_pool
111 .candidates
112 .get_batch_to_verify();
113 let Some((peer_id, batch, from_source)) = batch else {
114 return;
115 };
116
117 let transaction_hashes = batch.iter().map(|tx| tx.hash().clone()).collect();
118 dispatcher.push(TransactionPoolAction::StartVerify {
119 commands: batch.into_iter().collect(),
120 from_source,
121 });
122 dispatcher.push(TransactionPoolCandidateAction::VerifyPending {
123 peer_id,
124 transaction_hashes,
125 verify_id: (),
126 from_source,
127 });
128 }
129 TransactionPoolCandidateAction::VerifyPending {
130 peer_id,
131 transaction_hashes,
132 verify_id,
133 from_source,
134 } => {
135 state.verify_pending(meta.time(), peer_id, *verify_id, transaction_hashes);
136 let dispatcher = state_context.into_dispatcher();
137 dispatcher.push(TransactionPoolCandidateAction::VerifySuccess {
138 peer_id: *peer_id,
139 verify_id: *verify_id,
140 from_source: *from_source,
141 });
142 }
143 TransactionPoolCandidateAction::VerifyError {
144 peer_id: _,
145 verify_id: _,
146 } => {
147 unreachable!("TODO(binier)");
148 }
158 TransactionPoolCandidateAction::VerifySuccess {
159 peer_id,
160 verify_id,
161 from_source,
162 } => {
163 state.verify_result(meta.time(), peer_id, *verify_id, from_source, Ok(()));
164 }
165 TransactionPoolCandidateAction::PeerPrune { peer_id } => {
166 state.peer_remove(*peer_id);
167 }
168 }
169 }
170}