node/recorder/
recorder.rs

1use std::{
2    borrow::Cow,
3    fs,
4    io::{BufWriter, Write},
5    path::{Path, PathBuf},
6    sync::{Mutex, TryLockError},
7};
8
9use crate::{
10    p2p::identity::SecretKey as P2pSecretKey, Action, ActionWithMeta, EventSourceAction, State,
11};
12
13use super::{RecordedActionWithMeta, RecordedInitialState};
14
15static ACTIONS_F: Mutex<Vec<Option<fs::File>>> = Mutex::new(Vec::new());
16
17/// Panics: if all the `Recorder` instances aren't in the same thread.
18pub enum Recorder {
19    None,
20    OnlyInputActions {
21        recorder_i: usize,
22        recorder_path: PathBuf,
23        actions_f_bytes_written: u64,
24        actions_f_index: usize,
25    },
26}
27
28impl Recorder {
29    pub fn only_input_actions<P: AsRef<Path>>(work_dir: P) -> Self {
30        let path = work_dir.as_ref().join("recorder");
31
32        let _ = fs::remove_dir_all(&path);
33        fs::create_dir_all(&path).expect("creating dir for openmina recorder failed!");
34
35        let actions_f_index = 1;
36        let actions_path = super::actions_path(&path, actions_f_index);
37
38        let file = fs::File::create(actions_path)
39            .expect("creating file for openmina recorder initial state failed!");
40        let mut actions_files = ACTIONS_F.try_lock().unwrap();
41        actions_files.push(Some(file));
42
43        Self::OnlyInputActions {
44            recorder_i: actions_files.len().saturating_sub(1),
45            recorder_path: path,
46            actions_f_bytes_written: 0,
47            actions_f_index,
48        }
49    }
50
51    pub fn initial_state(&mut self, rng_seed: [u8; 32], p2p_sec_key: P2pSecretKey, state: &State) {
52        match self {
53            Self::None => {}
54            Self::OnlyInputActions { recorder_path, .. } => {
55                let initial_state = RecordedInitialState {
56                    rng_seed,
57                    p2p_sec_key,
58                    state: Cow::Borrowed(state),
59                };
60                let initial_state_path = super::initial_state_path(recorder_path);
61                let mut initial_state_f = fs::File::create(initial_state_path)
62                    .expect("creating file for openmina recorder initial state failed!");
63                initial_state.write_to(&mut initial_state_f).unwrap();
64                initial_state_f.sync_all().unwrap();
65            }
66        }
67    }
68
69    pub fn action(&mut self, action: &ActionWithMeta) {
70        match self {
71            Self::None => {}
72            Self::OnlyInputActions {
73                recorder_i,
74                recorder_path,
75                actions_f_bytes_written,
76                actions_f_index,
77                ..
78            } => {
79                let is_input = match action.action() {
80                    Action::CheckTimeouts(_) => true,
81                    Action::EventSource(e) => match e {
82                        EventSourceAction::NewEvent { .. } => true,
83                        _ => return,
84                    },
85                    _ => false,
86                };
87
88                let data = if !is_input {
89                    let kind = action.action().kind();
90                    RecordedActionWithMeta::from((kind, action.meta().clone()))
91                } else {
92                    RecordedActionWithMeta::from(action)
93                };
94
95                let mut files = ACTIONS_F.try_lock().unwrap();
96                let cur_f = files.get_mut(*recorder_i).unwrap(); // TODO: error propagation
97
98                let file = if *actions_f_bytes_written > 64 * 1024 * 1024 {
99                    cur_f.take().unwrap().sync_all().unwrap();
100                    *actions_f_bytes_written = 0;
101                    *actions_f_index = actions_f_index
102                        .checked_add(1)
103                        .expect("overflow in actions_f_index");
104                    cur_f.insert(
105                        fs::File::create(super::actions_path(recorder_path, *actions_f_index))
106                            .unwrap(),
107                    )
108                } else {
109                    cur_f.as_mut().unwrap()
110                };
111
112                let mut writer = BufWriter::new(file);
113
114                let encoded = data.encode().unwrap();
115                // RecordedActionWithMeta::decode(&encoded)
116                //     .expect(&format!("failed to decode encoded message: {:?}", data));
117                writer
118                    .write_all(&(encoded.len() as u64).to_be_bytes())
119                    .unwrap();
120                writer.write_all(&encoded).unwrap();
121                writer.flush().unwrap();
122
123                *actions_f_bytes_written = actions_f_bytes_written
124                    .checked_add(
125                        8u64.checked_add(encoded.len() as u64)
126                            .expect("overflow in encoded len"),
127                    )
128                    .expect("overflow in actions_f_bytes_written");
129            }
130        }
131    }
132
133    pub fn graceful_shutdown() {
134        graceful_shutdown(None)
135    }
136}
137
138impl Default for Recorder {
139    fn default() -> Self {
140        Self::None
141    }
142}
143
144impl Drop for Recorder {
145    fn drop(&mut self) {
146        match self {
147            Self::None => {}
148            Self::OnlyInputActions { recorder_i, .. } => graceful_shutdown(Some(*recorder_i)),
149        }
150    }
151}
152
153fn graceful_shutdown(only_i: Option<usize>) {
154    let Some(mut files) = ACTIONS_F.try_lock().map_or_else(
155        |err| match err {
156            TryLockError::WouldBlock => None,
157            TryLockError::Poisoned(v) => Some(v.into_inner()),
158        },
159        Some,
160    ) else {
161        return;
162    };
163    let files_iter = files
164        .iter_mut()
165        .enumerate()
166        .filter(|(i, _)| only_i.is_none_or(|only_i| only_i == *i))
167        .filter_map(|(i, v)| Some((i, v.take()?)));
168
169    for (i, file) in files_iter {
170        eprintln!("Flushing recorded actions to disk before shutdown. i={i}");
171        let _ = file.sync_all();
172    }
173}