node/watched_accounts/watched_accounts_reducer.rs
1use super::{
2 account_relevant_transactions_in_diff_iter, WatchedAccountBlockInfo, WatchedAccountBlockState,
3 WatchedAccountLedgerInitialState, WatchedAccountState, WatchedAccountsAction,
4 WatchedAccountsActionWithMetaRef, WatchedAccountsState,
5};
6
7impl WatchedAccountsState {
8 pub fn reducer(
9 mut state_context: crate::Substate<Self>,
10 action: WatchedAccountsActionWithMetaRef<'_>,
11 ) {
12 let Ok(state) = state_context.get_substate_mut() else {
13 // TODO: log or propagate
14 return;
15 };
16 let (action, meta) = action.split();
17
18 match action {
19 WatchedAccountsAction::Add { pub_key } => {
20 state.insert(
21 pub_key.clone(),
22 WatchedAccountState {
23 initial_state: WatchedAccountLedgerInitialState::Idle { time: meta.time() },
24 blocks: Default::default(),
25 },
26 );
27
28 // Dispatch
29 let pub_key = pub_key.clone();
30 let dispatcher = state_context.into_dispatcher();
31 dispatcher.push(WatchedAccountsAction::LedgerInitialStateGetInit { pub_key });
32 }
33 WatchedAccountsAction::LedgerInitialStateGetInit { pub_key: _ }
34 | WatchedAccountsAction::LedgerInitialStateGetRetry { pub_key: _ } => {
35 // TODO(binier)
36 // let Some((peer_id, p2p_rpc_id)) = store.state().p2p.get_free_peer_id_for_rpc() else { return };
37 // let block = {
38 // let Some(block) = store.state().consensus.best_tip() else { return };
39 // WatchedAccountBlockInfo {
40 // level: block.height() as u32,
41 // hash: block.hash.clone(),
42 // pred_hash: block.header.protocol_state.previous_state_hash.clone(),
43 // staged_ledger_hash: block
44 // .header
45 // .protocol_state
46 // .body
47 // .blockchain_state
48 // .staged_ledger_hash
49 // .non_snark
50 // .ledger_hash
51 // .clone(),
52 // }
53 // };
54
55 // let token_id = MinaBaseAccountIdDigestStableV1(BigInt::one());
56
57 // dispatcher.push(P2pRpcOutgoingInitAction {
58 // peer_id: peer_id.clone(),
59 // rpc_id: p2p_rpc_id,
60 // request: P2pRpcRequest::LedgerQuery((
61 // block.staged_ledger_hash.0.clone(),
62 // MinaLedgerSyncLedgerQueryStableV1::WhatAccountWithPath(
63 // pub_key.clone(),
64 // token_id.into(),
65 // ),
66 // )),
67 // requestor: P2pRpcRequestor::WatchedAccount(
68 // P2pRpcRequestorWatchedAccount::LedgerInitialGet(pub_key.clone()),
69 // ),
70 // });
71 // dispatcher.push(WatchedAccountsLedgerInitialStateGetPendingAction {
72 // pub_key,
73 // block,
74 // peer_id,
75 // p2p_rpc_id,
76 // });
77 }
78 WatchedAccountsAction::LedgerInitialStateGetPending {
79 pub_key,
80 block,
81 peer_id,
82 } => {
83 let Some(account) = state.get_mut(pub_key) else {
84 return;
85 };
86 account.blocks.clear();
87
88 account.initial_state = WatchedAccountLedgerInitialState::Pending {
89 time: meta.time(),
90 block: block.clone(),
91 peer_id: *peer_id,
92 };
93 }
94 WatchedAccountsAction::LedgerInitialStateGetError { pub_key, error } => {
95 let Some(account) = state.get_mut(pub_key) else {
96 return;
97 };
98 let peer_id = match &account.initial_state {
99 WatchedAccountLedgerInitialState::Pending { peer_id, .. } => *peer_id,
100 _ => return,
101 };
102 account.initial_state = WatchedAccountLedgerInitialState::Error {
103 time: meta.time(),
104 error: error.clone(),
105 peer_id,
106 };
107 }
108 WatchedAccountsAction::LedgerInitialStateGetSuccess { pub_key, data } => {
109 let Some(account) = state.get_mut(pub_key) else {
110 return;
111 };
112 let Some(block) = account.initial_state.block() else {
113 return;
114 };
115 account.initial_state = WatchedAccountLedgerInitialState::Success {
116 time: meta.time(),
117 block: block.clone(),
118 data: data.clone(),
119 };
120 }
121 WatchedAccountsAction::TransactionsIncludedInBlock { pub_key, block } => {
122 let transactions = account_relevant_transactions_in_diff_iter(
123 pub_key,
124 &block.body().staged_ledger_diff.diff,
125 )
126 .collect();
127
128 let Some(account) = state.get_mut(pub_key) else {
129 return;
130 };
131 account
132 .blocks
133 .push_back(WatchedAccountBlockState::TransactionsInBlockBody {
134 block: WatchedAccountBlockInfo {
135 level: block
136 .block
137 .header
138 .protocol_state
139 .body
140 .consensus_state
141 .blockchain_length
142 .0
143 .0,
144 hash: block.hash.clone(),
145 pred_hash: block
146 .block
147 .header
148 .protocol_state
149 .previous_state_hash
150 .clone(),
151 staged_ledger_hash: block
152 .block
153 .header
154 .protocol_state
155 .body
156 .blockchain_state
157 .staged_ledger_hash
158 .non_snark
159 .ledger_hash
160 .clone(),
161 },
162 transactions,
163 });
164
165 let pub_key = pub_key.clone();
166 let block_hash = block.hash.clone();
167 let dispatcher = state_context.into_dispatcher();
168 dispatcher.push(WatchedAccountsAction::BlockLedgerQueryInit {
169 pub_key,
170 block_hash,
171 });
172 }
173 WatchedAccountsAction::BlockLedgerQueryInit { .. } => {
174 // TODO(binier)
175 // let Some((peer_id, p2p_rpc_id)) = store.state().p2p.get_free_peer_id_for_rpc() else { return };
176 // let ledger_hash = {
177 // let Some(acc) = store.state().watched_accounts.get(&action.pub_key) else { return };
178 // let Some(block) = acc.block_find_by_hash(&action.block_hash) else { return };
179 // block.block().staged_ledger_hash.0.clone()
180 // };
181 // let token_id = MinaBaseAccountIdDigestStableV1(BigInt::one());
182
183 // store.dispatch(P2pRpcOutgoingInitAction {
184 // peer_id: peer_id.clone(),
185 // rpc_id: p2p_rpc_id,
186 // request: P2pRpcRequest::LedgerQuery((
187 // ledger_hash,
188 // MinaLedgerSyncLedgerQueryStableV1::WhatAccountWithPath(
189 // action.pub_key.clone(),
190 // token_id.into(),
191 // ),
192 // )),
193 // requestor: P2pRpcRequestor::WatchedAccount(
194 // P2pRpcRequestorWatchedAccount::BlockLedgerGet(
195 // action.pub_key.clone(),
196 // action.block_hash.clone(),
197 // ),
198 // ),
199 // });
200 // store.dispatch(WatchedAccountsBlockLedgerQueryPendingAction {
201 // pub_key: action.pub_key,
202 // block_hash: action.block_hash,
203 // peer_id,
204 // p2p_rpc_id,
205 // });
206 }
207 WatchedAccountsAction::BlockLedgerQueryPending {
208 pub_key,
209 block_hash,
210 ..
211 } => {
212 let Some(account) = state.get_mut(pub_key) else {
213 return;
214 };
215 let Some(block_state) = account.block_find_by_hash_mut(block_hash) else {
216 return;
217 };
218 *block_state = match block_state {
219 WatchedAccountBlockState::TransactionsInBlockBody {
220 block,
221 transactions,
222 } => WatchedAccountBlockState::LedgerAccountGetPending {
223 block: block.clone(),
224 transactions: std::mem::take(transactions),
225 },
226 _ => return,
227 };
228 }
229 WatchedAccountsAction::BlockLedgerQuerySuccess {
230 pub_key,
231 block_hash,
232 ledger_account,
233 } => {
234 let Some(account) = state.get_mut(pub_key) else {
235 return;
236 };
237 let Some(block_state) = account.block_find_by_hash_mut(block_hash) else {
238 return;
239 };
240 *block_state = match block_state {
241 WatchedAccountBlockState::LedgerAccountGetPending {
242 block,
243 transactions,
244 ..
245 } => WatchedAccountBlockState::LedgerAccountGetSuccess {
246 block: block.clone(),
247 transactions: std::mem::take(transactions),
248 ledger_account: ledger_account.clone(),
249 },
250 _ => return,
251 };
252 }
253 }
254 }
255}