node/recorder/
recorder.rs1use std::{
2 borrow::Cow,
3 fs,
4 io::{BufWriter, Write},
5 path::{Path, PathBuf},
6 sync::{Mutex, TryLockError},
7};
8
9use crate::{
10 p2p::identity::SecretKey as P2pSecretKey, Action, ActionWithMeta, EventSourceAction, State,
11};
12
13use super::{RecordedActionWithMeta, RecordedInitialState};
14
15static ACTIONS_F: Mutex<Vec<Option<fs::File>>> = Mutex::new(Vec::new());
16
17pub enum Recorder {
19 None,
20 OnlyInputActions {
21 recorder_i: usize,
22 recorder_path: PathBuf,
23 actions_f_bytes_written: u64,
24 actions_f_index: usize,
25 },
26}
27
28impl Recorder {
29 pub fn only_input_actions<P: AsRef<Path>>(work_dir: P) -> Self {
30 let path = work_dir.as_ref().join("recorder");
31
32 let _ = fs::remove_dir_all(&path);
33 fs::create_dir_all(&path).expect("creating dir for openmina recorder failed!");
34
35 let actions_f_index = 1;
36 let actions_path = super::actions_path(&path, actions_f_index);
37
38 let file = fs::File::create(actions_path)
39 .expect("creating file for openmina recorder initial state failed!");
40 let mut actions_files = ACTIONS_F.try_lock().unwrap();
41 actions_files.push(Some(file));
42
43 Self::OnlyInputActions {
44 recorder_i: actions_files.len().saturating_sub(1),
45 recorder_path: path,
46 actions_f_bytes_written: 0,
47 actions_f_index,
48 }
49 }
50
51 pub fn initial_state(&mut self, rng_seed: [u8; 32], p2p_sec_key: P2pSecretKey, state: &State) {
52 match self {
53 Self::None => {}
54 Self::OnlyInputActions { recorder_path, .. } => {
55 let initial_state = RecordedInitialState {
56 rng_seed,
57 p2p_sec_key,
58 state: Cow::Borrowed(state),
59 };
60 let initial_state_path = super::initial_state_path(recorder_path);
61 let mut initial_state_f = fs::File::create(initial_state_path)
62 .expect("creating file for openmina recorder initial state failed!");
63 initial_state.write_to(&mut initial_state_f).unwrap();
64 initial_state_f.sync_all().unwrap();
65 }
66 }
67 }
68
69 pub fn action(&mut self, action: &ActionWithMeta) {
70 match self {
71 Self::None => {}
72 Self::OnlyInputActions {
73 recorder_i,
74 recorder_path,
75 actions_f_bytes_written,
76 actions_f_index,
77 ..
78 } => {
79 let is_input = match action.action() {
80 Action::CheckTimeouts(_) => true,
81 Action::EventSource(e) => match e {
82 EventSourceAction::NewEvent { .. } => true,
83 _ => return,
84 },
85 _ => false,
86 };
87
88 let data = if !is_input {
89 let kind = action.action().kind();
90 RecordedActionWithMeta::from((kind, action.meta().clone()))
91 } else {
92 RecordedActionWithMeta::from(action)
93 };
94
95 let mut files = ACTIONS_F.try_lock().unwrap();
96 let cur_f = files.get_mut(*recorder_i).unwrap(); let file = if *actions_f_bytes_written > 64 * 1024 * 1024 {
99 cur_f.take().unwrap().sync_all().unwrap();
100 *actions_f_bytes_written = 0;
101 *actions_f_index = actions_f_index
102 .checked_add(1)
103 .expect("overflow in actions_f_index");
104 cur_f.insert(
105 fs::File::create(super::actions_path(recorder_path, *actions_f_index))
106 .unwrap(),
107 )
108 } else {
109 cur_f.as_mut().unwrap()
110 };
111
112 let mut writer = BufWriter::new(file);
113
114 let encoded = data.encode().unwrap();
115 writer
118 .write_all(&(encoded.len() as u64).to_be_bytes())
119 .unwrap();
120 writer.write_all(&encoded).unwrap();
121 writer.flush().unwrap();
122
123 *actions_f_bytes_written = actions_f_bytes_written
124 .checked_add(
125 8u64.checked_add(encoded.len() as u64)
126 .expect("overflow in encoded len"),
127 )
128 .expect("overflow in actions_f_bytes_written");
129 }
130 }
131 }
132
133 pub fn graceful_shutdown() {
134 graceful_shutdown(None)
135 }
136}
137
138impl Default for Recorder {
139 fn default() -> Self {
140 Self::None
141 }
142}
143
144impl Drop for Recorder {
145 fn drop(&mut self) {
146 match self {
147 Self::None => {}
148 Self::OnlyInputActions { recorder_i, .. } => graceful_shutdown(Some(*recorder_i)),
149 }
150 }
151}
152
153fn graceful_shutdown(only_i: Option<usize>) {
154 let Some(mut files) = ACTIONS_F.try_lock().map_or_else(
155 |err| match err {
156 TryLockError::WouldBlock => None,
157 TryLockError::Poisoned(v) => Some(v.into_inner()),
158 },
159 Some,
160 ) else {
161 return;
162 };
163 let files_iter = files
164 .iter_mut()
165 .enumerate()
166 .filter(|(i, _)| only_i.is_none_or(|only_i| only_i == *i))
167 .filter_map(|(i, v)| Some((i, v.take()?)));
168
169 for (i, file) in files_iter {
170 eprintln!("Flushing recorded actions to disk before shutdown. i={i}");
171 let _ = file.sync_all();
172 }
173}