node/watched_accounts/
watched_accounts_reducer.rs

1use super::{
2    account_relevant_transactions_in_diff_iter, WatchedAccountBlockInfo, WatchedAccountBlockState,
3    WatchedAccountLedgerInitialState, WatchedAccountState, WatchedAccountsAction,
4    WatchedAccountsActionWithMetaRef, WatchedAccountsState,
5};
6
7impl WatchedAccountsState {
8    pub fn reducer(
9        mut state_context: crate::Substate<Self>,
10        action: WatchedAccountsActionWithMetaRef<'_>,
11    ) {
12        let Ok(state) = state_context.get_substate_mut() else {
13            // TODO: log or propagate
14            return;
15        };
16        let (action, meta) = action.split();
17
18        match action {
19            WatchedAccountsAction::Add { pub_key } => {
20                state.insert(
21                    pub_key.clone(),
22                    WatchedAccountState {
23                        initial_state: WatchedAccountLedgerInitialState::Idle { time: meta.time() },
24                        blocks: Default::default(),
25                    },
26                );
27
28                // Dispatch
29                let pub_key = pub_key.clone();
30                let dispatcher = state_context.into_dispatcher();
31                dispatcher.push(WatchedAccountsAction::LedgerInitialStateGetInit { pub_key });
32            }
33            WatchedAccountsAction::LedgerInitialStateGetInit { pub_key: _ }
34            | WatchedAccountsAction::LedgerInitialStateGetRetry { pub_key: _ } => {
35                // TODO(binier)
36                // let Some((peer_id, p2p_rpc_id)) = store.state().p2p.get_free_peer_id_for_rpc() else { return };
37                // let block = {
38                //     let Some(block) = store.state().consensus.best_tip() else { return };
39                //     WatchedAccountBlockInfo {
40                //         level: block.height() as u32,
41                //         hash: block.hash.clone(),
42                //         pred_hash: block.header.protocol_state.previous_state_hash.clone(),
43                //         staged_ledger_hash: block
44                //             .header
45                //             .protocol_state
46                //             .body
47                //             .blockchain_state
48                //             .staged_ledger_hash
49                //             .non_snark
50                //             .ledger_hash
51                //             .clone(),
52                //     }
53                // };
54
55                // let token_id = MinaBaseAccountIdDigestStableV1(BigInt::one());
56
57                // dispatcher.push(P2pRpcOutgoingInitAction {
58                //     peer_id: peer_id.clone(),
59                //     rpc_id: p2p_rpc_id,
60                //     request: P2pRpcRequest::LedgerQuery((
61                //         block.staged_ledger_hash.0.clone(),
62                //         MinaLedgerSyncLedgerQueryStableV1::WhatAccountWithPath(
63                //             pub_key.clone(),
64                //             token_id.into(),
65                //         ),
66                //     )),
67                //     requestor: P2pRpcRequestor::WatchedAccount(
68                //         P2pRpcRequestorWatchedAccount::LedgerInitialGet(pub_key.clone()),
69                //     ),
70                // });
71                // dispatcher.push(WatchedAccountsLedgerInitialStateGetPendingAction {
72                //     pub_key,
73                //     block,
74                //     peer_id,
75                //     p2p_rpc_id,
76                // });
77            }
78            WatchedAccountsAction::LedgerInitialStateGetPending {
79                pub_key,
80                block,
81                peer_id,
82            } => {
83                let Some(account) = state.get_mut(pub_key) else {
84                    return;
85                };
86                account.blocks.clear();
87
88                account.initial_state = WatchedAccountLedgerInitialState::Pending {
89                    time: meta.time(),
90                    block: block.clone(),
91                    peer_id: *peer_id,
92                };
93            }
94            WatchedAccountsAction::LedgerInitialStateGetError { pub_key, error } => {
95                let Some(account) = state.get_mut(pub_key) else {
96                    return;
97                };
98                let peer_id = match &account.initial_state {
99                    WatchedAccountLedgerInitialState::Pending { peer_id, .. } => *peer_id,
100                    _ => return,
101                };
102                account.initial_state = WatchedAccountLedgerInitialState::Error {
103                    time: meta.time(),
104                    error: error.clone(),
105                    peer_id,
106                };
107            }
108            WatchedAccountsAction::LedgerInitialStateGetSuccess { pub_key, data } => {
109                let Some(account) = state.get_mut(pub_key) else {
110                    return;
111                };
112                let Some(block) = account.initial_state.block() else {
113                    return;
114                };
115                account.initial_state = WatchedAccountLedgerInitialState::Success {
116                    time: meta.time(),
117                    block: block.clone(),
118                    data: data.clone(),
119                };
120            }
121            WatchedAccountsAction::TransactionsIncludedInBlock { pub_key, block } => {
122                let transactions = account_relevant_transactions_in_diff_iter(
123                    pub_key,
124                    &block.body().staged_ledger_diff.diff,
125                )
126                .collect();
127
128                let Some(account) = state.get_mut(pub_key) else {
129                    return;
130                };
131                account
132                    .blocks
133                    .push_back(WatchedAccountBlockState::TransactionsInBlockBody {
134                        block: WatchedAccountBlockInfo {
135                            level: block
136                                .block
137                                .header
138                                .protocol_state
139                                .body
140                                .consensus_state
141                                .blockchain_length
142                                .0
143                                 .0,
144                            hash: block.hash.clone(),
145                            pred_hash: block
146                                .block
147                                .header
148                                .protocol_state
149                                .previous_state_hash
150                                .clone(),
151                            staged_ledger_hash: block
152                                .block
153                                .header
154                                .protocol_state
155                                .body
156                                .blockchain_state
157                                .staged_ledger_hash
158                                .non_snark
159                                .ledger_hash
160                                .clone(),
161                        },
162                        transactions,
163                    });
164
165                let pub_key = pub_key.clone();
166                let block_hash = block.hash.clone();
167                let dispatcher = state_context.into_dispatcher();
168                dispatcher.push(WatchedAccountsAction::BlockLedgerQueryInit {
169                    pub_key,
170                    block_hash,
171                });
172            }
173            WatchedAccountsAction::BlockLedgerQueryInit { .. } => {
174                // TODO(binier)
175                // let Some((peer_id, p2p_rpc_id)) = store.state().p2p.get_free_peer_id_for_rpc() else { return };
176                // let ledger_hash = {
177                //     let Some(acc) = store.state().watched_accounts.get(&action.pub_key) else { return };
178                //     let Some(block) = acc.block_find_by_hash(&action.block_hash) else { return };
179                //     block.block().staged_ledger_hash.0.clone()
180                // };
181                // let token_id = MinaBaseAccountIdDigestStableV1(BigInt::one());
182
183                // store.dispatch(P2pRpcOutgoingInitAction {
184                //     peer_id: peer_id.clone(),
185                //     rpc_id: p2p_rpc_id,
186                //     request: P2pRpcRequest::LedgerQuery((
187                //         ledger_hash,
188                //         MinaLedgerSyncLedgerQueryStableV1::WhatAccountWithPath(
189                //             action.pub_key.clone(),
190                //             token_id.into(),
191                //         ),
192                //     )),
193                //     requestor: P2pRpcRequestor::WatchedAccount(
194                //         P2pRpcRequestorWatchedAccount::BlockLedgerGet(
195                //             action.pub_key.clone(),
196                //             action.block_hash.clone(),
197                //         ),
198                //     ),
199                // });
200                // store.dispatch(WatchedAccountsBlockLedgerQueryPendingAction {
201                //     pub_key: action.pub_key,
202                //     block_hash: action.block_hash,
203                //     peer_id,
204                //     p2p_rpc_id,
205                // });
206            }
207            WatchedAccountsAction::BlockLedgerQueryPending {
208                pub_key,
209                block_hash,
210                ..
211            } => {
212                let Some(account) = state.get_mut(pub_key) else {
213                    return;
214                };
215                let Some(block_state) = account.block_find_by_hash_mut(block_hash) else {
216                    return;
217                };
218                *block_state = match block_state {
219                    WatchedAccountBlockState::TransactionsInBlockBody {
220                        block,
221                        transactions,
222                    } => WatchedAccountBlockState::LedgerAccountGetPending {
223                        block: block.clone(),
224                        transactions: std::mem::take(transactions),
225                    },
226                    _ => return,
227                };
228            }
229            WatchedAccountsAction::BlockLedgerQuerySuccess {
230                pub_key,
231                block_hash,
232                ledger_account,
233            } => {
234                let Some(account) = state.get_mut(pub_key) else {
235                    return;
236                };
237                let Some(block_state) = account.block_find_by_hash_mut(block_hash) else {
238                    return;
239                };
240                *block_state = match block_state {
241                    WatchedAccountBlockState::LedgerAccountGetPending {
242                        block,
243                        transactions,
244                        ..
245                    } => WatchedAccountBlockState::LedgerAccountGetSuccess {
246                        block: block.clone(),
247                        transactions: std::mem::take(transactions),
248                        ledger_account: ledger_account.clone(),
249                    },
250                    _ => return,
251                };
252            }
253        }
254    }
255}