openmina_node_native/
replay.rs

1use std::cell::RefCell;
2
3use node::{
4    core::thread,
5    recorder::StateWithInputActionsReader,
6    snark::{BlockVerifier, TransactionVerifier},
7    ActionWithMeta, BuildEnv, Store,
8};
9
10use crate::NodeService;
11
12pub fn replay_state_with_input_actions(
13    dir: &str,
14    dynamic_effects_lib: Option<String>,
15    ignore_mismatch: bool,
16    mut check_build_env: impl FnMut(&BuildEnv, &BuildEnv, bool) -> anyhow::Result<()>,
17) -> anyhow::Result<crate::Node> {
18    eprintln!("replaying node based on initial state and actions from the dir: {dir}");
19    let reader = StateWithInputActionsReader::new(dir);
20
21    eprintln!(
22        "reading initial state from file: {}",
23        reader.initial_state_path().as_path().to_str().unwrap()
24    );
25    let initial_state = match reader.read_initial_state() {
26        Err(err) => anyhow::bail!("failed to read initial state. err: {err}"),
27        Ok(v) => v,
28    };
29
30    let rng_seed = initial_state.rng_seed;
31    let state = {
32        let mut state = initial_state.state.into_owned();
33        // TODO(binier): we shouldn't have to do this, but serialized
34        // index/srs doesn't match deserialized one.
35        state.snark.block_verify.verifier_index = BlockVerifier::make();
36        state.snark.block_verify.verifier_srs = node::snark::get_srs();
37        state.snark.user_command_verify.verifier_index = TransactionVerifier::make();
38        state.snark.user_command_verify.verifier_srs = node::snark::get_srs();
39        state
40    };
41
42    let effects: node::Effects<NodeService> = dynamic_effects_lib
43        .as_ref()
44        .map_or(replayer_effects, |_| replayer_effects_with_dyn_effects);
45    let p2p_sec_key = initial_state.p2p_sec_key;
46
47    let service = NodeService::for_replay(rng_seed, state.time(), p2p_sec_key, dynamic_effects_lib);
48
49    let mut node = crate::Node::new(rng_seed, state, service, Some(effects));
50
51    let store = node.store_mut();
52
53    let replay_env = BuildEnv::get();
54    check_build_env(&store.state().config.build, &replay_env, ignore_mismatch)?;
55
56    eprintln!("reading actions from dir: {dir}");
57
58    let mut input_action = None;
59    let mut actions = reader
60        .read_actions()
61        .flat_map(|(path, actions)| {
62            let file_path = path.as_path().to_str().unwrap();
63            eprintln!("processing actions from file: {file_path}");
64            actions
65        })
66        .peekable();
67
68    while let Some(action) = actions.peek() {
69        let replayer = store.service.replayer().unwrap();
70        let expected_actions = &mut replayer.expected_actions;
71
72        let action = if input_action.is_none() {
73            assert_eq!(
74                expected_actions.len(),
75                0,
76                "not all expected effects of the input action were dispatched! Ones left: {expected_actions:?}"
77            );
78            let (action, meta) = actions
79                .next()
80                .unwrap()
81                .as_action_with_meta()
82                .expect("expected input action, got effect action")
83                .split();
84            let kind = action.kind();
85            let _ = input_action.insert(action);
86            expected_actions.push_back((kind, meta));
87            actions.peek()
88        } else {
89            Some(action)
90        };
91
92        let is_done = if let Some(action) = action {
93            if action.action.is_none() {
94                let action = actions.next().unwrap();
95                expected_actions.push_back((action.kind, action.meta));
96                false
97            } else {
98                true
99            }
100        } else {
101            false
102        };
103
104        if is_done || actions.peek().is_none() {
105            if !is_done {
106                eprintln!("Warning! Executing last action for which we might not have all effect actions recorded.");
107            }
108            let action = input_action.take().unwrap();
109            assert!(store.dispatch(action));
110        }
111    }
112    Ok(node)
113}
114
115fn replayer_effects_with_dyn_effects(store: &mut Store<NodeService>, action: ActionWithMeta) {
116    dyn_effects(store, &action);
117    replayer_effects(store, action);
118}
119
120fn replayer_effects(store: &mut Store<NodeService>, action: ActionWithMeta) {
121    let replayer = store.service.replayer().unwrap();
122    let (kind, meta) = match replayer.expected_actions.pop_front() {
123        Some(v) => v,
124        None => panic!("unexpected action: {:?}", action),
125    };
126
127    assert_eq!(kind, action.action().kind());
128    assert_eq!(meta.time(), action.meta().time());
129
130    node::effects(store, action)
131}
132
133fn dyn_effects(store: &mut Store<NodeService>, action: &ActionWithMeta) {
134    DYN_EFFECTS_LIB.with(move |cell| loop {
135        let mut opt = cell.borrow_mut();
136        let fun = match opt.as_ref() {
137            None => {
138                let lib_path = &store.service.replayer().unwrap().replay_dynamic_effects_lib;
139                opt.insert(DynEffectsLib::load(lib_path)).fun
140            }
141            Some(lib) => lib.fun,
142        };
143
144        match fun(store, action) {
145            0 => return,
146            1 => {
147                opt.take();
148                let lib_path = &store.service.replayer().unwrap().replay_dynamic_effects_lib;
149                let query_modified = || match std::fs::metadata(lib_path).and_then(|v| v.modified())
150                {
151                    Err(err) => {
152                        eprintln!("Error querying replay_dynamic_effects_lib modified time: {err}");
153                        redux::SystemTime::UNIX_EPOCH
154                    }
155                    Ok(v) => v,
156                };
157
158                let initial_time = query_modified();
159                let sleep_dur = std::time::Duration::from_millis(100);
160                eprintln!("Waiting for {lib_path} to be modified.");
161                while initial_time >= query_modified() {
162                    thread::sleep(sleep_dur);
163                }
164            }
165            ret => panic!("unknown `replay_dynamic_effects` return number: {ret}"),
166        }
167    });
168}
169
170thread_local! {
171    static DYN_EFFECTS_LIB: RefCell<Option<DynEffectsLib>> = const { RefCell::new(None)};
172}
173
174struct DynEffectsLib {
175    handle: *mut nix::libc::c_void,
176    fun: fn(&mut Store<NodeService>, &ActionWithMeta) -> u8,
177}
178
179impl DynEffectsLib {
180    fn load(lib_path: &str) -> Self {
181        use nix::libc::{c_void, dlopen, dlsym, RTLD_NOW};
182        use std::ffi::CString;
183
184        let filename = CString::new(lib_path).unwrap();
185
186        let handle = unsafe { dlopen(filename.as_ptr(), RTLD_NOW) };
187        if handle.is_null() {
188            panic!("Failed to resolve dlopen {lib_path}")
189        }
190
191        let fun_name = CString::new("replay_dynamic_effects").unwrap();
192        let fun = unsafe { dlsym(handle, fun_name.as_ptr()) };
193        if fun.is_null() {
194            panic!("Failed to resolve '{}'", &fun_name.to_str().unwrap());
195        }
196
197        Self {
198            handle,
199            #[allow(clippy::missing_transmute_annotations)]
200            fun: unsafe { std::mem::transmute::<*mut c_void, _>(fun) },
201        }
202    }
203}
204
205impl Drop for DynEffectsLib {
206    fn drop(&mut self) {
207        let ret = unsafe { nix::libc::dlclose(self.handle) };
208        if ret != 0 {
209            panic!("Error while closing lib");
210        }
211    }
212}