mina_node_native/
replay.rs

1use crate::NodeService;
2use node::{
3    core::thread,
4    recorder::StateWithInputActionsReader,
5    snark::{BlockVerifier, TransactionVerifier},
6    ActionWithMeta, BuildEnv, Store,
7};
8use std::cell::RefCell;
9
10pub fn replay_state_with_input_actions(
11    dir: &str,
12    dynamic_effects_lib: Option<String>,
13    ignore_mismatch: bool,
14    mut check_build_env: impl FnMut(&BuildEnv, &BuildEnv, bool) -> anyhow::Result<()>,
15) -> anyhow::Result<crate::Node> {
16    eprintln!("replaying node based on initial state and actions from the dir: {dir}");
17    let reader = StateWithInputActionsReader::new(dir);
18
19    eprintln!(
20        "reading initial state from file: {}",
21        reader.initial_state_path().as_path().to_str().unwrap()
22    );
23    let initial_state = match reader.read_initial_state() {
24        Err(err) => anyhow::bail!("failed to read initial state. err: {err}"),
25        Ok(v) => v,
26    };
27
28    let rng_seed = initial_state.rng_seed;
29    let state = {
30        let mut state = initial_state.state.into_owned();
31        // TODO(binier): we shouldn't have to do this, but serialized
32        // index/srs doesn't match deserialized one.
33        state.snark.block_verify.verifier_index = BlockVerifier::make();
34        state.snark.block_verify.verifier_srs = node::snark::get_srs();
35        state.snark.user_command_verify.verifier_index = TransactionVerifier::make();
36        state.snark.user_command_verify.verifier_srs = node::snark::get_srs();
37        state
38    };
39
40    let effects: node::Effects<NodeService> = dynamic_effects_lib
41        .as_ref()
42        .map_or(replayer_effects, |_| replayer_effects_with_dyn_effects);
43    let p2p_sec_key = initial_state.p2p_sec_key;
44
45    let service = NodeService::for_replay(rng_seed, state.time(), p2p_sec_key, dynamic_effects_lib);
46
47    let mut node = crate::Node::new(rng_seed, state, service, Some(effects));
48
49    let store = node.store_mut();
50
51    let replay_env = BuildEnv::get();
52    check_build_env(&store.state().config.build, &replay_env, ignore_mismatch)?;
53
54    eprintln!("reading actions from dir: {dir}");
55
56    let mut input_action = None;
57    let mut actions = reader
58        .read_actions()
59        .flat_map(|(path, actions)| {
60            let file_path = path.as_path().to_str().unwrap();
61            eprintln!("processing actions from file: {file_path}");
62            actions
63        })
64        .peekable();
65
66    while let Some(action) = actions.peek() {
67        let replayer = store.service.replayer().unwrap();
68        let expected_actions = &mut replayer.expected_actions;
69
70        let action = if input_action.is_none() {
71            assert_eq!(
72                expected_actions.len(),
73                0,
74                "not all expected effects of the input action were dispatched! Ones left: {expected_actions:?}"
75            );
76            let (action, meta) = actions
77                .next()
78                .unwrap()
79                .as_action_with_meta()
80                .expect("expected input action, got effect action")
81                .split();
82            let kind = action.kind();
83            let _ = input_action.insert(action);
84            expected_actions.push_back((kind, meta));
85            actions.peek()
86        } else {
87            Some(action)
88        };
89
90        let is_done = if let Some(action) = action {
91            if action.action.is_none() {
92                let action = actions.next().unwrap();
93                expected_actions.push_back((action.kind, action.meta));
94                false
95            } else {
96                true
97            }
98        } else {
99            false
100        };
101
102        if is_done || actions.peek().is_none() {
103            if !is_done {
104                eprintln!("Warning! Executing last action for which we might not have all effect actions recorded.");
105            }
106            let action = input_action.take().unwrap();
107            assert!(store.dispatch(action));
108        }
109    }
110    Ok(node)
111}
112
113fn replayer_effects_with_dyn_effects(store: &mut Store<NodeService>, action: ActionWithMeta) {
114    dyn_effects(store, &action);
115    replayer_effects(store, action);
116}
117
118fn replayer_effects(store: &mut Store<NodeService>, action: ActionWithMeta) {
119    let replayer = store.service.replayer().unwrap();
120    let (kind, meta) = match replayer.expected_actions.pop_front() {
121        Some(v) => v,
122        None => panic!("unexpected action: {:?}", action),
123    };
124
125    assert_eq!(kind, action.action().kind());
126    assert_eq!(meta.time(), action.meta().time());
127
128    node::effects(store, action)
129}
130
131fn dyn_effects(store: &mut Store<NodeService>, action: &ActionWithMeta) {
132    DYN_EFFECTS_LIB.with(move |cell| loop {
133        let mut opt = cell.borrow_mut();
134        let fun = match opt.as_ref() {
135            None => {
136                let lib_path = &store.service.replayer().unwrap().replay_dynamic_effects_lib;
137                opt.insert(DynEffectsLib::load(lib_path)).fun
138            }
139            Some(lib) => lib.fun,
140        };
141
142        match fun(store, action) {
143            0 => return,
144            1 => {
145                opt.take();
146                let lib_path = &store.service.replayer().unwrap().replay_dynamic_effects_lib;
147                let query_modified = || match std::fs::metadata(lib_path).and_then(|v| v.modified())
148                {
149                    Err(err) => {
150                        eprintln!("Error querying replay_dynamic_effects_lib modified time: {err}");
151                        redux::SystemTime::UNIX_EPOCH
152                    }
153                    Ok(v) => v,
154                };
155
156                let initial_time = query_modified();
157                let sleep_dur = std::time::Duration::from_millis(100);
158                eprintln!("Waiting for {lib_path} to be modified.");
159                while initial_time >= query_modified() {
160                    thread::sleep(sleep_dur);
161                }
162            }
163            ret => panic!("unknown `replay_dynamic_effects` return number: {ret}"),
164        }
165    });
166}
167
168thread_local! {
169    static DYN_EFFECTS_LIB: RefCell<Option<DynEffectsLib>> = const { RefCell::new(None)};
170}
171
172struct DynEffectsLib {
173    handle: *mut nix::libc::c_void,
174    fun: fn(&mut Store<NodeService>, &ActionWithMeta) -> u8,
175}
176
177impl DynEffectsLib {
178    fn load(lib_path: &str) -> Self {
179        use nix::libc::{c_void, dlopen, dlsym, RTLD_NOW};
180        use std::ffi::CString;
181
182        let filename = CString::new(lib_path).unwrap();
183
184        let handle = unsafe { dlopen(filename.as_ptr(), RTLD_NOW) };
185        if handle.is_null() {
186            panic!("Failed to resolve dlopen {lib_path}")
187        }
188
189        let fun_name = CString::new("replay_dynamic_effects").unwrap();
190        let fun = unsafe { dlsym(handle, fun_name.as_ptr()) };
191        if fun.is_null() {
192            panic!("Failed to resolve '{}'", &fun_name.to_str().unwrap());
193        }
194
195        Self {
196            handle,
197            #[allow(clippy::missing_transmute_annotations)]
198            fun: unsafe { std::mem::transmute::<*mut c_void, _>(fun) },
199        }
200    }
201}
202
203impl Drop for DynEffectsLib {
204    fn drop(&mut self) {
205        let ret = unsafe { nix::libc::dlclose(self.handle) };
206        if ret != 0 {
207            panic!("Error while closing lib");
208        }
209    }
210}