node/ledger/write/
ledger_write_reducer.rs

1use openmina_core::bug_condition;
2use redux::Dispatcher;
3
4use crate::{
5    ledger_effectful::LedgerEffectfulAction,
6    transition_frontier::sync::{
7        ledger::staged::TransitionFrontierSyncLedgerStagedAction, TransitionFrontierSyncAction,
8    },
9    Action, BlockProducerAction, State, Substate,
10};
11
12use super::{
13    LedgerWriteAction, LedgerWriteActionWithMetaRef, LedgerWriteResponse, LedgerWriteState,
14};
15
16impl LedgerWriteState {
17    pub fn reducer(mut state_context: Substate<Self>, action: LedgerWriteActionWithMetaRef<'_>) {
18        let (action, meta) = action.split();
19        let Ok(state) = state_context.get_substate_mut() else {
20            return;
21        };
22
23        match action {
24            LedgerWriteAction::Init { request, on_init } => {
25                *state = Self::Init {
26                    time: meta.time(),
27                    request: request.clone(),
28                };
29
30                let dispatcher = state_context.into_dispatcher();
31                dispatcher.push(LedgerEffectfulAction::WriteInit {
32                    request: request.clone(),
33                    on_init: on_init.clone(),
34                });
35            }
36            LedgerWriteAction::Pending => {
37                if let Self::Init { request, .. } = state {
38                    *state = Self::Pending {
39                        time: meta.time(),
40                        request: request.clone(),
41                    };
42                }
43            }
44            LedgerWriteAction::Success { response } => {
45                if let Self::Pending { request, .. } = state {
46                    *state = Self::Success {
47                        time: meta.time(),
48                        request: request.clone(),
49                        response: response.clone(),
50                    };
51                }
52
53                let (dispatcher, state) = state_context.into_dispatcher_and_state();
54                Self::propagate_write_response(dispatcher, state, response.clone());
55                dispatcher.push(BlockProducerAction::StagedLedgerDiffCreateInit);
56                dispatcher.push(TransitionFrontierSyncAction::BlocksNextApplyInit);
57                dispatcher.push(TransitionFrontierSyncAction::CommitInit);
58                dispatcher.push(TransitionFrontierSyncLedgerStagedAction::ReconstructInit);
59            }
60        }
61    }
62
63    fn propagate_write_response(
64        dispatcher: &mut Dispatcher<Action, State>,
65        state: &State,
66        response: LedgerWriteResponse,
67    ) {
68        let Some(request) = &state.ledger.write.request() else {
69            return;
70        };
71        match (request, response) {
72            (
73                _,
74                LedgerWriteResponse::StagedLedgerReconstruct {
75                    staged_ledger_hash,
76                    result,
77                },
78            ) => {
79                let sync = &state.transition_frontier.sync;
80                let expected_ledger = sync
81                    .ledger_target()
82                    .and_then(|target| target.staged)
83                    .map(|v| v.hashes.non_snark.ledger_hash);
84                if expected_ledger.as_ref() == Some(&staged_ledger_hash) {
85                    match result {
86                        Err(error) => {
87                            dispatcher.push(
88                                TransitionFrontierSyncLedgerStagedAction::ReconstructError {
89                                    error,
90                                },
91                            );
92                        }
93                        Ok(()) => {
94                            dispatcher.push(
95                                TransitionFrontierSyncLedgerStagedAction::ReconstructSuccess {
96                                    ledger_hash: staged_ledger_hash,
97                                },
98                            );
99                        }
100                    }
101                }
102            }
103            (
104                _,
105                LedgerWriteResponse::StagedLedgerDiffCreate {
106                    pred_block_hash,
107                    global_slot_since_genesis,
108                    result,
109                },
110            ) => {
111                let Some((expected_pred_block_hash, expected_global_slot)) = None.or_else(|| {
112                    let pred_block = state.block_producer.current_parent_chain()?.last()?;
113                    let won_slot = state.block_producer.current_won_slot()?;
114                    let slot = won_slot.global_slot_since_genesis(pred_block.global_slot_diff());
115                    Some((pred_block.hash(), slot))
116                }) else {
117                    return;
118                };
119
120                if &pred_block_hash == expected_pred_block_hash
121                    && global_slot_since_genesis == expected_global_slot
122                {
123                    match result {
124                        Err(err) => {
125                            bug_condition!("StagedLedgerDiffCreate error: {err}");
126                        }
127                        Ok(output) => {
128                            dispatcher.push(BlockProducerAction::StagedLedgerDiffCreateSuccess {
129                                output,
130                            });
131                        }
132                    }
133                }
134            }
135            (
136                _,
137                LedgerWriteResponse::BlockApply {
138                    block_hash: hash,
139                    result,
140                },
141            ) => match result {
142                Err(error) => {
143                    dispatcher
144                        .push(TransitionFrontierSyncAction::BlocksNextApplyError { hash, error });
145                }
146                Ok(result) => {
147                    dispatcher.push(TransitionFrontierSyncAction::BlocksSendToArchive {
148                        hash: hash.clone(),
149                        data: result.clone(),
150                    });
151                    dispatcher.push(TransitionFrontierSyncAction::BlocksNextApplySuccess {
152                        hash,
153                        just_emitted_a_proof: result.just_emitted_a_proof,
154                    });
155                }
156            },
157            (
158                _,
159                LedgerWriteResponse::Commit {
160                    best_tip_hash,
161                    result,
162                },
163            ) => {
164                let best_tip = state.transition_frontier.sync.best_tip();
165                if best_tip.is_some_and(|tip| tip.hash() == &best_tip_hash) {
166                    dispatcher.push(TransitionFrontierSyncAction::CommitSuccess { result });
167                }
168            }
169        }
170    }
171}