openmina_node_native/
replay.rs1use std::cell::RefCell;
2
3use node::{
4 core::thread,
5 recorder::StateWithInputActionsReader,
6 snark::{BlockVerifier, TransactionVerifier},
7 ActionWithMeta, BuildEnv, Store,
8};
9
10use crate::NodeService;
11
12pub fn replay_state_with_input_actions(
13 dir: &str,
14 dynamic_effects_lib: Option<String>,
15 ignore_mismatch: bool,
16 mut check_build_env: impl FnMut(&BuildEnv, &BuildEnv, bool) -> anyhow::Result<()>,
17) -> anyhow::Result<crate::Node> {
18 eprintln!("replaying node based on initial state and actions from the dir: {dir}");
19 let reader = StateWithInputActionsReader::new(dir);
20
21 eprintln!(
22 "reading initial state from file: {}",
23 reader.initial_state_path().as_path().to_str().unwrap()
24 );
25 let initial_state = match reader.read_initial_state() {
26 Err(err) => anyhow::bail!("failed to read initial state. err: {err}"),
27 Ok(v) => v,
28 };
29
30 let rng_seed = initial_state.rng_seed;
31 let state = {
32 let mut state = initial_state.state.into_owned();
33 state.snark.block_verify.verifier_index = BlockVerifier::make();
36 state.snark.block_verify.verifier_srs = node::snark::get_srs();
37 state.snark.user_command_verify.verifier_index = TransactionVerifier::make();
38 state.snark.user_command_verify.verifier_srs = node::snark::get_srs();
39 state
40 };
41
42 let effects: node::Effects<NodeService> = dynamic_effects_lib
43 .as_ref()
44 .map_or(replayer_effects, |_| replayer_effects_with_dyn_effects);
45 let p2p_sec_key = initial_state.p2p_sec_key;
46
47 let service = NodeService::for_replay(rng_seed, state.time(), p2p_sec_key, dynamic_effects_lib);
48
49 let mut node = crate::Node::new(rng_seed, state, service, Some(effects));
50
51 let store = node.store_mut();
52
53 let replay_env = BuildEnv::get();
54 check_build_env(&store.state().config.build, &replay_env, ignore_mismatch)?;
55
56 eprintln!("reading actions from dir: {dir}");
57
58 let mut input_action = None;
59 let mut actions = reader
60 .read_actions()
61 .flat_map(|(path, actions)| {
62 let file_path = path.as_path().to_str().unwrap();
63 eprintln!("processing actions from file: {file_path}");
64 actions
65 })
66 .peekable();
67
68 while let Some(action) = actions.peek() {
69 let replayer = store.service.replayer().unwrap();
70 let expected_actions = &mut replayer.expected_actions;
71
72 let action = if input_action.is_none() {
73 assert_eq!(
74 expected_actions.len(),
75 0,
76 "not all expected effects of the input action were dispatched! Ones left: {expected_actions:?}"
77 );
78 let (action, meta) = actions
79 .next()
80 .unwrap()
81 .as_action_with_meta()
82 .expect("expected input action, got effect action")
83 .split();
84 let kind = action.kind();
85 let _ = input_action.insert(action);
86 expected_actions.push_back((kind, meta));
87 actions.peek()
88 } else {
89 Some(action)
90 };
91
92 let is_done = if let Some(action) = action {
93 if action.action.is_none() {
94 let action = actions.next().unwrap();
95 expected_actions.push_back((action.kind, action.meta));
96 false
97 } else {
98 true
99 }
100 } else {
101 false
102 };
103
104 if is_done || actions.peek().is_none() {
105 if !is_done {
106 eprintln!("Warning! Executing last action for which we might not have all effect actions recorded.");
107 }
108 let action = input_action.take().unwrap();
109 assert!(store.dispatch(action));
110 }
111 }
112 Ok(node)
113}
114
115fn replayer_effects_with_dyn_effects(store: &mut Store<NodeService>, action: ActionWithMeta) {
116 dyn_effects(store, &action);
117 replayer_effects(store, action);
118}
119
120fn replayer_effects(store: &mut Store<NodeService>, action: ActionWithMeta) {
121 let replayer = store.service.replayer().unwrap();
122 let (kind, meta) = match replayer.expected_actions.pop_front() {
123 Some(v) => v,
124 None => panic!("unexpected action: {:?}", action),
125 };
126
127 assert_eq!(kind, action.action().kind());
128 assert_eq!(meta.time(), action.meta().time());
129
130 node::effects(store, action)
131}
132
133fn dyn_effects(store: &mut Store<NodeService>, action: &ActionWithMeta) {
134 DYN_EFFECTS_LIB.with(move |cell| loop {
135 let mut opt = cell.borrow_mut();
136 let fun = match opt.as_ref() {
137 None => {
138 let lib_path = &store.service.replayer().unwrap().replay_dynamic_effects_lib;
139 opt.insert(DynEffectsLib::load(lib_path)).fun
140 }
141 Some(lib) => lib.fun,
142 };
143
144 match fun(store, action) {
145 0 => return,
146 1 => {
147 opt.take();
148 let lib_path = &store.service.replayer().unwrap().replay_dynamic_effects_lib;
149 let query_modified = || match std::fs::metadata(lib_path).and_then(|v| v.modified())
150 {
151 Err(err) => {
152 eprintln!("Error querying replay_dynamic_effects_lib modified time: {err}");
153 redux::SystemTime::UNIX_EPOCH
154 }
155 Ok(v) => v,
156 };
157
158 let initial_time = query_modified();
159 let sleep_dur = std::time::Duration::from_millis(100);
160 eprintln!("Waiting for {lib_path} to be modified.");
161 while initial_time >= query_modified() {
162 thread::sleep(sleep_dur);
163 }
164 }
165 ret => panic!("unknown `replay_dynamic_effects` return number: {ret}"),
166 }
167 });
168}
169
170thread_local! {
171 static DYN_EFFECTS_LIB: RefCell<Option<DynEffectsLib>> = const { RefCell::new(None)};
172}
173
174struct DynEffectsLib {
175 handle: *mut nix::libc::c_void,
176 fun: fn(&mut Store<NodeService>, &ActionWithMeta) -> u8,
177}
178
179impl DynEffectsLib {
180 fn load(lib_path: &str) -> Self {
181 use nix::libc::{c_void, dlopen, dlsym, RTLD_NOW};
182 use std::ffi::CString;
183
184 let filename = CString::new(lib_path).unwrap();
185
186 let handle = unsafe { dlopen(filename.as_ptr(), RTLD_NOW) };
187 if handle.is_null() {
188 panic!("Failed to resolve dlopen {lib_path}")
189 }
190
191 let fun_name = CString::new("replay_dynamic_effects").unwrap();
192 let fun = unsafe { dlsym(handle, fun_name.as_ptr()) };
193 if fun.is_null() {
194 panic!("Failed to resolve '{}'", &fun_name.to_str().unwrap());
195 }
196
197 Self {
198 handle,
199 #[allow(clippy::missing_transmute_annotations)]
200 fun: unsafe { std::mem::transmute::<*mut c_void, _>(fun) },
201 }
202 }
203}
204
205impl Drop for DynEffectsLib {
206 fn drop(&mut self) {
207 let ret = unsafe { nix::libc::dlclose(self.handle) };
208 if ret != 0 {
209 panic!("Error while closing lib");
210 }
211 }
212}