mina_node_native/
replay.rs1use crate::NodeService;
2use node::{
3 core::thread,
4 recorder::StateWithInputActionsReader,
5 snark::{BlockVerifier, TransactionVerifier},
6 ActionWithMeta, BuildEnv, Store,
7};
8use std::cell::RefCell;
9
10pub fn replay_state_with_input_actions(
11 dir: &str,
12 dynamic_effects_lib: Option<String>,
13 ignore_mismatch: bool,
14 mut check_build_env: impl FnMut(&BuildEnv, &BuildEnv, bool) -> anyhow::Result<()>,
15) -> anyhow::Result<crate::Node> {
16 eprintln!("replaying node based on initial state and actions from the dir: {dir}");
17 let reader = StateWithInputActionsReader::new(dir);
18
19 eprintln!(
20 "reading initial state from file: {}",
21 reader.initial_state_path().as_path().to_str().unwrap()
22 );
23 let initial_state = match reader.read_initial_state() {
24 Err(err) => anyhow::bail!("failed to read initial state. err: {err}"),
25 Ok(v) => v,
26 };
27
28 let rng_seed = initial_state.rng_seed;
29 let state = {
30 let mut state = initial_state.state.into_owned();
31 state.snark.block_verify.verifier_index = BlockVerifier::make();
34 state.snark.block_verify.verifier_srs = node::snark::get_srs();
35 state.snark.user_command_verify.verifier_index = TransactionVerifier::make();
36 state.snark.user_command_verify.verifier_srs = node::snark::get_srs();
37 state
38 };
39
40 let effects: node::Effects<NodeService> = dynamic_effects_lib
41 .as_ref()
42 .map_or(replayer_effects, |_| replayer_effects_with_dyn_effects);
43 let p2p_sec_key = initial_state.p2p_sec_key;
44
45 let service = NodeService::for_replay(rng_seed, state.time(), p2p_sec_key, dynamic_effects_lib);
46
47 let mut node = crate::Node::new(rng_seed, state, service, Some(effects));
48
49 let store = node.store_mut();
50
51 let replay_env = BuildEnv::get();
52 check_build_env(&store.state().config.build, &replay_env, ignore_mismatch)?;
53
54 eprintln!("reading actions from dir: {dir}");
55
56 let mut input_action = None;
57 let mut actions = reader
58 .read_actions()
59 .flat_map(|(path, actions)| {
60 let file_path = path.as_path().to_str().unwrap();
61 eprintln!("processing actions from file: {file_path}");
62 actions
63 })
64 .peekable();
65
66 while let Some(action) = actions.peek() {
67 let replayer = store.service.replayer().unwrap();
68 let expected_actions = &mut replayer.expected_actions;
69
70 let action = if input_action.is_none() {
71 assert_eq!(
72 expected_actions.len(),
73 0,
74 "not all expected effects of the input action were dispatched! Ones left: {expected_actions:?}"
75 );
76 let (action, meta) = actions
77 .next()
78 .unwrap()
79 .as_action_with_meta()
80 .expect("expected input action, got effect action")
81 .split();
82 let kind = action.kind();
83 let _ = input_action.insert(action);
84 expected_actions.push_back((kind, meta));
85 actions.peek()
86 } else {
87 Some(action)
88 };
89
90 let is_done = if let Some(action) = action {
91 if action.action.is_none() {
92 let action = actions.next().unwrap();
93 expected_actions.push_back((action.kind, action.meta));
94 false
95 } else {
96 true
97 }
98 } else {
99 false
100 };
101
102 if is_done || actions.peek().is_none() {
103 if !is_done {
104 eprintln!("Warning! Executing last action for which we might not have all effect actions recorded.");
105 }
106 let action = input_action.take().unwrap();
107 assert!(store.dispatch(action));
108 }
109 }
110 Ok(node)
111}
112
113fn replayer_effects_with_dyn_effects(store: &mut Store<NodeService>, action: ActionWithMeta) {
114 dyn_effects(store, &action);
115 replayer_effects(store, action);
116}
117
118fn replayer_effects(store: &mut Store<NodeService>, action: ActionWithMeta) {
119 let replayer = store.service.replayer().unwrap();
120 let (kind, meta) = match replayer.expected_actions.pop_front() {
121 Some(v) => v,
122 None => panic!("unexpected action: {:?}", action),
123 };
124
125 assert_eq!(kind, action.action().kind());
126 assert_eq!(meta.time(), action.meta().time());
127
128 node::effects(store, action)
129}
130
131fn dyn_effects(store: &mut Store<NodeService>, action: &ActionWithMeta) {
132 DYN_EFFECTS_LIB.with(move |cell| loop {
133 let mut opt = cell.borrow_mut();
134 let fun = match opt.as_ref() {
135 None => {
136 let lib_path = &store.service.replayer().unwrap().replay_dynamic_effects_lib;
137 opt.insert(DynEffectsLib::load(lib_path)).fun
138 }
139 Some(lib) => lib.fun,
140 };
141
142 match fun(store, action) {
143 0 => return,
144 1 => {
145 opt.take();
146 let lib_path = &store.service.replayer().unwrap().replay_dynamic_effects_lib;
147 let query_modified = || match std::fs::metadata(lib_path).and_then(|v| v.modified())
148 {
149 Err(err) => {
150 eprintln!("Error querying replay_dynamic_effects_lib modified time: {err}");
151 redux::SystemTime::UNIX_EPOCH
152 }
153 Ok(v) => v,
154 };
155
156 let initial_time = query_modified();
157 let sleep_dur = std::time::Duration::from_millis(100);
158 eprintln!("Waiting for {lib_path} to be modified.");
159 while initial_time >= query_modified() {
160 thread::sleep(sleep_dur);
161 }
162 }
163 ret => panic!("unknown `replay_dynamic_effects` return number: {ret}"),
164 }
165 });
166}
167
168thread_local! {
169 static DYN_EFFECTS_LIB: RefCell<Option<DynEffectsLib>> = const { RefCell::new(None)};
170}
171
172struct DynEffectsLib {
173 handle: *mut nix::libc::c_void,
174 fun: fn(&mut Store<NodeService>, &ActionWithMeta) -> u8,
175}
176
177impl DynEffectsLib {
178 fn load(lib_path: &str) -> Self {
179 use nix::libc::{c_void, dlopen, dlsym, RTLD_NOW};
180 use std::ffi::CString;
181
182 let filename = CString::new(lib_path).unwrap();
183
184 let handle = unsafe { dlopen(filename.as_ptr(), RTLD_NOW) };
185 if handle.is_null() {
186 panic!("Failed to resolve dlopen {lib_path}")
187 }
188
189 let fun_name = CString::new("replay_dynamic_effects").unwrap();
190 let fun = unsafe { dlsym(handle, fun_name.as_ptr()) };
191 if fun.is_null() {
192 panic!("Failed to resolve '{}'", &fun_name.to_str().unwrap());
193 }
194
195 Self {
196 handle,
197 #[allow(clippy::missing_transmute_annotations)]
198 fun: unsafe { std::mem::transmute::<*mut c_void, _>(fun) },
199 }
200 }
201}
202
203impl Drop for DynEffectsLib {
204 fn drop(&mut self) {
205 let ret = unsafe { nix::libc::dlclose(self.handle) };
206 if ret != 0 {
207 panic!("Error while closing lib");
208 }
209 }
210}