node/ledger/write/
ledger_write_reducer.rs1use openmina_core::bug_condition;
2use redux::Dispatcher;
3
4use crate::{
5 ledger_effectful::LedgerEffectfulAction,
6 transition_frontier::sync::{
7 ledger::staged::TransitionFrontierSyncLedgerStagedAction, TransitionFrontierSyncAction,
8 },
9 Action, BlockProducerAction, State, Substate,
10};
11
12use super::{
13 LedgerWriteAction, LedgerWriteActionWithMetaRef, LedgerWriteResponse, LedgerWriteState,
14};
15
16impl LedgerWriteState {
17 pub fn reducer(mut state_context: Substate<Self>, action: LedgerWriteActionWithMetaRef<'_>) {
18 let (action, meta) = action.split();
19 let Ok(state) = state_context.get_substate_mut() else {
20 return;
21 };
22
23 match action {
24 LedgerWriteAction::Init { request, on_init } => {
25 *state = Self::Init {
26 time: meta.time(),
27 request: request.clone(),
28 };
29
30 let dispatcher = state_context.into_dispatcher();
31 dispatcher.push(LedgerEffectfulAction::WriteInit {
32 request: request.clone(),
33 on_init: on_init.clone(),
34 });
35 }
36 LedgerWriteAction::Pending => {
37 if let Self::Init { request, .. } = state {
38 *state = Self::Pending {
39 time: meta.time(),
40 request: request.clone(),
41 };
42 }
43 }
44 LedgerWriteAction::Success { response } => {
45 if let Self::Pending { request, .. } = state {
46 *state = Self::Success {
47 time: meta.time(),
48 request: request.clone(),
49 response: response.clone(),
50 };
51 }
52
53 let (dispatcher, state) = state_context.into_dispatcher_and_state();
54 Self::propagate_write_response(dispatcher, state, response.clone());
55 dispatcher.push(BlockProducerAction::StagedLedgerDiffCreateInit);
56 dispatcher.push(TransitionFrontierSyncAction::BlocksNextApplyInit);
57 dispatcher.push(TransitionFrontierSyncAction::CommitInit);
58 dispatcher.push(TransitionFrontierSyncLedgerStagedAction::ReconstructInit);
59 }
60 }
61 }
62
63 fn propagate_write_response(
64 dispatcher: &mut Dispatcher<Action, State>,
65 state: &State,
66 response: LedgerWriteResponse,
67 ) {
68 let Some(request) = &state.ledger.write.request() else {
69 return;
70 };
71 match (request, response) {
72 (
73 _,
74 LedgerWriteResponse::StagedLedgerReconstruct {
75 staged_ledger_hash,
76 result,
77 },
78 ) => {
79 let sync = &state.transition_frontier.sync;
80 let expected_ledger = sync
81 .ledger_target()
82 .and_then(|target| target.staged)
83 .map(|v| v.hashes.non_snark.ledger_hash);
84 if expected_ledger.as_ref() == Some(&staged_ledger_hash) {
85 match result {
86 Err(error) => {
87 dispatcher.push(
88 TransitionFrontierSyncLedgerStagedAction::ReconstructError {
89 error,
90 },
91 );
92 }
93 Ok(()) => {
94 dispatcher.push(
95 TransitionFrontierSyncLedgerStagedAction::ReconstructSuccess {
96 ledger_hash: staged_ledger_hash,
97 },
98 );
99 }
100 }
101 }
102 }
103 (
104 _,
105 LedgerWriteResponse::StagedLedgerDiffCreate {
106 pred_block_hash,
107 global_slot_since_genesis,
108 result,
109 },
110 ) => {
111 let Some((expected_pred_block_hash, expected_global_slot)) = None.or_else(|| {
112 let pred_block = state.block_producer.current_parent_chain()?.last()?;
113 let won_slot = state.block_producer.current_won_slot()?;
114 let slot = won_slot.global_slot_since_genesis(pred_block.global_slot_diff());
115 Some((pred_block.hash(), slot))
116 }) else {
117 return;
118 };
119
120 if &pred_block_hash == expected_pred_block_hash
121 && global_slot_since_genesis == expected_global_slot
122 {
123 match result {
124 Err(err) => {
125 bug_condition!("StagedLedgerDiffCreate error: {err}");
126 }
127 Ok(output) => {
128 dispatcher.push(BlockProducerAction::StagedLedgerDiffCreateSuccess {
129 output,
130 });
131 }
132 }
133 }
134 }
135 (
136 _,
137 LedgerWriteResponse::BlockApply {
138 block_hash: hash,
139 result,
140 },
141 ) => match result {
142 Err(error) => {
143 dispatcher
144 .push(TransitionFrontierSyncAction::BlocksNextApplyError { hash, error });
145 }
146 Ok(result) => {
147 dispatcher.push(TransitionFrontierSyncAction::BlocksSendToArchive {
148 hash: hash.clone(),
149 data: result.clone(),
150 });
151 dispatcher.push(TransitionFrontierSyncAction::BlocksNextApplySuccess {
152 hash,
153 just_emitted_a_proof: result.just_emitted_a_proof,
154 });
155 }
156 },
157 (
158 _,
159 LedgerWriteResponse::Commit {
160 best_tip_hash,
161 result,
162 },
163 ) => {
164 let best_tip = state.transition_frontier.sync.best_tip();
165 if best_tip.is_some_and(|tip| tip.hash() == &best_tip_hash) {
166 dispatcher.push(TransitionFrontierSyncAction::CommitSuccess { result });
167 }
168 }
169 }
170 }
171}