node/recorder/
recorder.rs1use std::{
2 borrow::Cow,
3 fs,
4 io::{BufWriter, Write},
5 path::{Path, PathBuf},
6 sync::{Mutex, TryLockError},
7};
8
9use crate::{
10 p2p::identity::SecretKey as P2pSecretKey, Action, ActionWithMeta, EventSourceAction, State,
11};
12
13use super::{RecordedActionWithMeta, RecordedInitialState};
14
15static ACTIONS_F: Mutex<Vec<Option<fs::File>>> = Mutex::new(Vec::new());
16
17#[derive(Default)]
19pub enum Recorder {
20 #[default]
21 None,
22 OnlyInputActions {
23 recorder_i: usize,
24 recorder_path: PathBuf,
25 actions_f_bytes_written: u64,
26 actions_f_index: usize,
27 },
28}
29
30impl Recorder {
31 pub fn only_input_actions<P: AsRef<Path>>(work_dir: P) -> Self {
32 let path = work_dir.as_ref().join("recorder");
33
34 let _ = fs::remove_dir_all(&path);
35 fs::create_dir_all(&path).expect("creating dir for mina recorder failed!");
36
37 let actions_f_index = 1;
38 let actions_path = super::actions_path(&path, actions_f_index);
39
40 let file = fs::File::create(actions_path)
41 .expect("creating file for mina recorder initial state failed!");
42 let mut actions_files = ACTIONS_F.try_lock().unwrap();
43 actions_files.push(Some(file));
44
45 Self::OnlyInputActions {
46 recorder_i: actions_files.len().saturating_sub(1),
47 recorder_path: path,
48 actions_f_bytes_written: 0,
49 actions_f_index,
50 }
51 }
52
53 pub fn initial_state(&mut self, rng_seed: [u8; 32], p2p_sec_key: P2pSecretKey, state: &State) {
54 match self {
55 Self::None => {}
56 Self::OnlyInputActions { recorder_path, .. } => {
57 let initial_state = RecordedInitialState {
58 rng_seed,
59 p2p_sec_key,
60 state: Cow::Borrowed(state),
61 };
62 let initial_state_path = super::initial_state_path(recorder_path);
63 let mut initial_state_f = fs::File::create(initial_state_path)
64 .expect("creating file for mina recorder initial state failed!");
65 initial_state.write_to(&mut initial_state_f).unwrap();
66 initial_state_f.sync_all().unwrap();
67 }
68 }
69 }
70
71 pub fn action(&mut self, action: &ActionWithMeta) {
72 match self {
73 Self::None => {}
74 Self::OnlyInputActions {
75 recorder_i,
76 recorder_path,
77 actions_f_bytes_written,
78 actions_f_index,
79 ..
80 } => {
81 let is_input = match action.action() {
82 Action::CheckTimeouts(_) => true,
83 Action::EventSource(e) => match e {
84 EventSourceAction::NewEvent { .. } => true,
85 _ => return,
86 },
87 _ => false,
88 };
89
90 let data = if !is_input {
91 let kind = action.action().kind();
92 RecordedActionWithMeta::from((kind, action.meta().clone()))
93 } else {
94 RecordedActionWithMeta::from(action)
95 };
96
97 let mut files = ACTIONS_F.try_lock().unwrap();
98 let cur_f = files.get_mut(*recorder_i).unwrap(); let file = if *actions_f_bytes_written > 64 * 1024 * 1024 {
101 cur_f.take().unwrap().sync_all().unwrap();
102 *actions_f_bytes_written = 0;
103 *actions_f_index = actions_f_index
104 .checked_add(1)
105 .expect("overflow in actions_f_index");
106 cur_f.insert(
107 fs::File::create(super::actions_path(recorder_path, *actions_f_index))
108 .unwrap(),
109 )
110 } else {
111 cur_f.as_mut().unwrap()
112 };
113
114 let mut writer = BufWriter::new(file);
115
116 let encoded = data.encode().unwrap();
117 writer
120 .write_all(&(encoded.len() as u64).to_be_bytes())
121 .unwrap();
122 writer.write_all(&encoded).unwrap();
123 writer.flush().unwrap();
124
125 *actions_f_bytes_written = actions_f_bytes_written
126 .checked_add(
127 8u64.checked_add(encoded.len() as u64)
128 .expect("overflow in encoded len"),
129 )
130 .expect("overflow in actions_f_bytes_written");
131 }
132 }
133 }
134
135 pub fn graceful_shutdown() {
136 graceful_shutdown(None)
137 }
138}
139
140impl Drop for Recorder {
141 fn drop(&mut self) {
142 match self {
143 Self::None => {}
144 Self::OnlyInputActions { recorder_i, .. } => graceful_shutdown(Some(*recorder_i)),
145 }
146 }
147}
148
149fn graceful_shutdown(only_i: Option<usize>) {
150 let Some(mut files) = ACTIONS_F.try_lock().map_or_else(
151 |err| match err {
152 TryLockError::WouldBlock => None,
153 TryLockError::Poisoned(v) => Some(v.into_inner()),
154 },
155 Some,
156 ) else {
157 return;
158 };
159 let files_iter = files
160 .iter_mut()
161 .enumerate()
162 .filter(|(i, _)| only_i.is_none_or(|only_i| only_i == *i))
163 .filter_map(|(i, v)| Some((i, v.take()?)));
164
165 for (i, file) in files_iter {
166 eprintln!("Flushing recorded actions to disk before shutdown. i={i}");
167 let _ = file.sync_all();
168 }
169}