mina_node/transaction_pool/candidate/
transaction_pool_candidate_reducer.rs

1#![allow(clippy::unit_arg)]
2
3use crate::{
4    p2p::{
5        channels::rpc::{P2pChannelsRpcAction, P2pRpcId, P2pRpcRequest},
6        PeerId,
7    },
8    p2p_ready, TransactionPoolAction,
9};
10
11use super::{
12    TransactionPoolCandidateAction, TransactionPoolCandidateActionWithMetaRef,
13    TransactionPoolCandidatesState,
14};
15
16impl TransactionPoolCandidatesState {
17    pub fn reducer(
18        mut state_context: crate::Substate<Self>,
19        action: TransactionPoolCandidateActionWithMetaRef<'_>,
20    ) {
21        let Ok(state) = state_context.get_substate_mut() else {
22            // TODO: log or propagate
23            return;
24        };
25        let (action, meta) = action.split();
26
27        match action {
28            TransactionPoolCandidateAction::InfoReceived { peer_id, info } => {
29                state.info_received(meta.time(), *peer_id, info.clone());
30            }
31            TransactionPoolCandidateAction::FetchAll => {
32                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
33                let p2p = p2p_ready!(global_state.p2p, meta.time());
34                let peers = p2p.ready_peers_iter().map(|(id, _)| *id);
35                let get_order = |_hash: &_| {
36                    // TODO(binier)
37                    0
38                };
39                let list = global_state
40                    .transaction_pool
41                    .candidates
42                    .peers_next_transactions_to_fetch(peers, get_order);
43
44                for (peer_id, hash) in list {
45                    dispatcher.push(TransactionPoolCandidateAction::FetchInit { peer_id, hash });
46                }
47            }
48            TransactionPoolCandidateAction::FetchInit { peer_id, hash } => {
49                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
50                let peer_id = *peer_id;
51                let hash = hash.clone();
52                let p2p = p2p_ready!(global_state.p2p, meta.time());
53                let Some(peer) = p2p.get_ready_peer(&peer_id) else {
54                    return;
55                };
56                let rpc_id = peer.channels.next_local_rpc_id();
57
58                dispatcher.push(P2pChannelsRpcAction::RequestSend {
59                    peer_id,
60                    id: rpc_id,
61                    request: Box::new(P2pRpcRequest::Transaction(hash.clone())),
62                    on_init: Some(redux::callback!(
63                        on_send_p2p_snark_rpc_request(
64                            (peer_id: PeerId, rpc_id: P2pRpcId, request: P2pRpcRequest)
65                        ) -> crate::Action {
66                            let P2pRpcRequest::Transaction(hash) = request else {
67                                unreachable!()
68                            };
69                            TransactionPoolCandidateAction::FetchPending {
70                                hash,
71                                peer_id,
72                                rpc_id,
73                            }
74                        }
75                    )),
76                });
77            }
78            TransactionPoolCandidateAction::FetchPending {
79                peer_id,
80                hash,
81                rpc_id,
82            } => {
83                state.fetch_pending(meta.time(), peer_id, hash, *rpc_id);
84            }
85            TransactionPoolCandidateAction::FetchError { peer_id, hash } => {
86                state.peer_transaction_remove(*peer_id, hash);
87            }
88            TransactionPoolCandidateAction::FetchSuccess {
89                peer_id,
90                transaction,
91            } => {
92                state.transaction_received(meta.time(), *peer_id, transaction.clone());
93            }
94            TransactionPoolCandidateAction::Libp2pTransactionsReceived {
95                peer_id,
96                transactions,
97                message_id,
98            } => {
99                state.transactions_received(
100                    meta.time(),
101                    *peer_id,
102                    transactions.clone(),
103                    *message_id,
104                );
105            }
106            TransactionPoolCandidateAction::VerifyNext => {
107                let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
108
109                let batch = global_state
110                    .transaction_pool
111                    .candidates
112                    .get_batch_to_verify();
113                let Some((peer_id, batch, from_source)) = batch else {
114                    return;
115                };
116
117                let transaction_hashes = batch.iter().map(|tx| tx.hash().clone()).collect();
118                dispatcher.push(TransactionPoolAction::StartVerify {
119                    commands: batch.into_iter().collect(),
120                    from_source,
121                });
122                dispatcher.push(TransactionPoolCandidateAction::VerifyPending {
123                    peer_id,
124                    transaction_hashes,
125                    verify_id: (),
126                    from_source,
127                });
128            }
129            TransactionPoolCandidateAction::VerifyPending {
130                peer_id,
131                transaction_hashes,
132                verify_id,
133                from_source,
134            } => {
135                state.verify_pending(meta.time(), peer_id, *verify_id, transaction_hashes);
136                let dispatcher = state_context.into_dispatcher();
137                dispatcher.push(TransactionPoolCandidateAction::VerifySuccess {
138                    peer_id: *peer_id,
139                    verify_id: *verify_id,
140                    from_source: *from_source,
141                });
142            }
143            TransactionPoolCandidateAction::VerifyError {
144                peer_id: _,
145                verify_id: _,
146            } => {
147                unreachable!("TODO(binier)");
148                // state.verify_result(meta.time(), peer_id, *verify_id, Err(()));
149
150                // // TODO(binier): blacklist peer
151                // let dispatcher = state_context.into_dispatcher();
152                // let peer_id = *peer_id;
153                // dispatcher.push(P2pDisconnectionAction::Init {
154                //     peer_id,
155                //     reason: P2pDisconnectionReason::TransactionPoolVerifyError,
156                // });
157            }
158            TransactionPoolCandidateAction::VerifySuccess {
159                peer_id,
160                verify_id,
161                from_source,
162            } => {
163                state.verify_result(meta.time(), peer_id, *verify_id, from_source, Ok(()));
164            }
165            TransactionPoolCandidateAction::PeerPrune { peer_id } => {
166                state.peer_remove(*peer_id);
167            }
168        }
169    }
170}