node/recorder/
recorder.rs

1use std::{
2    borrow::Cow,
3    fs,
4    io::{BufWriter, Write},
5    path::{Path, PathBuf},
6    sync::{Mutex, TryLockError},
7};
8
9use crate::{
10    p2p::identity::SecretKey as P2pSecretKey, Action, ActionWithMeta, EventSourceAction, State,
11};
12
13use super::{RecordedActionWithMeta, RecordedInitialState};
14
15static ACTIONS_F: Mutex<Vec<Option<fs::File>>> = Mutex::new(Vec::new());
16
17/// Panics: if all the `Recorder` instances aren't in the same thread.
18#[derive(Default)]
19pub enum Recorder {
20    #[default]
21    None,
22    OnlyInputActions {
23        recorder_i: usize,
24        recorder_path: PathBuf,
25        actions_f_bytes_written: u64,
26        actions_f_index: usize,
27    },
28}
29
30impl Recorder {
31    pub fn only_input_actions<P: AsRef<Path>>(work_dir: P) -> Self {
32        let path = work_dir.as_ref().join("recorder");
33
34        let _ = fs::remove_dir_all(&path);
35        fs::create_dir_all(&path).expect("creating dir for mina recorder failed!");
36
37        let actions_f_index = 1;
38        let actions_path = super::actions_path(&path, actions_f_index);
39
40        let file = fs::File::create(actions_path)
41            .expect("creating file for mina recorder initial state failed!");
42        let mut actions_files = ACTIONS_F.try_lock().unwrap();
43        actions_files.push(Some(file));
44
45        Self::OnlyInputActions {
46            recorder_i: actions_files.len().saturating_sub(1),
47            recorder_path: path,
48            actions_f_bytes_written: 0,
49            actions_f_index,
50        }
51    }
52
53    pub fn initial_state(&mut self, rng_seed: [u8; 32], p2p_sec_key: P2pSecretKey, state: &State) {
54        match self {
55            Self::None => {}
56            Self::OnlyInputActions { recorder_path, .. } => {
57                let initial_state = RecordedInitialState {
58                    rng_seed,
59                    p2p_sec_key,
60                    state: Cow::Borrowed(state),
61                };
62                let initial_state_path = super::initial_state_path(recorder_path);
63                let mut initial_state_f = fs::File::create(initial_state_path)
64                    .expect("creating file for mina recorder initial state failed!");
65                initial_state.write_to(&mut initial_state_f).unwrap();
66                initial_state_f.sync_all().unwrap();
67            }
68        }
69    }
70
71    pub fn action(&mut self, action: &ActionWithMeta) {
72        match self {
73            Self::None => {}
74            Self::OnlyInputActions {
75                recorder_i,
76                recorder_path,
77                actions_f_bytes_written,
78                actions_f_index,
79                ..
80            } => {
81                let is_input = match action.action() {
82                    Action::CheckTimeouts(_) => true,
83                    Action::EventSource(e) => match e {
84                        EventSourceAction::NewEvent { .. } => true,
85                        _ => return,
86                    },
87                    _ => false,
88                };
89
90                let data = if !is_input {
91                    let kind = action.action().kind();
92                    RecordedActionWithMeta::from((kind, action.meta().clone()))
93                } else {
94                    RecordedActionWithMeta::from(action)
95                };
96
97                let mut files = ACTIONS_F.try_lock().unwrap();
98                let cur_f = files.get_mut(*recorder_i).unwrap(); // TODO: error propagation
99
100                let file = if *actions_f_bytes_written > 64 * 1024 * 1024 {
101                    cur_f.take().unwrap().sync_all().unwrap();
102                    *actions_f_bytes_written = 0;
103                    *actions_f_index = actions_f_index
104                        .checked_add(1)
105                        .expect("overflow in actions_f_index");
106                    cur_f.insert(
107                        fs::File::create(super::actions_path(recorder_path, *actions_f_index))
108                            .unwrap(),
109                    )
110                } else {
111                    cur_f.as_mut().unwrap()
112                };
113
114                let mut writer = BufWriter::new(file);
115
116                let encoded = data.encode().unwrap();
117                // RecordedActionWithMeta::decode(&encoded)
118                //     .expect(&format!("failed to decode encoded message: {:?}", data));
119                writer
120                    .write_all(&(encoded.len() as u64).to_be_bytes())
121                    .unwrap();
122                writer.write_all(&encoded).unwrap();
123                writer.flush().unwrap();
124
125                *actions_f_bytes_written = actions_f_bytes_written
126                    .checked_add(
127                        8u64.checked_add(encoded.len() as u64)
128                            .expect("overflow in encoded len"),
129                    )
130                    .expect("overflow in actions_f_bytes_written");
131            }
132        }
133    }
134
135    pub fn graceful_shutdown() {
136        graceful_shutdown(None)
137    }
138}
139
140impl Drop for Recorder {
141    fn drop(&mut self) {
142        match self {
143            Self::None => {}
144            Self::OnlyInputActions { recorder_i, .. } => graceful_shutdown(Some(*recorder_i)),
145        }
146    }
147}
148
149fn graceful_shutdown(only_i: Option<usize>) {
150    let Some(mut files) = ACTIONS_F.try_lock().map_or_else(
151        |err| match err {
152            TryLockError::WouldBlock => None,
153            TryLockError::Poisoned(v) => Some(v.into_inner()),
154        },
155        Some,
156    ) else {
157        return;
158    };
159    let files_iter = files
160        .iter_mut()
161        .enumerate()
162        .filter(|(i, _)| only_i.is_none_or(|only_i| only_i == *i))
163        .filter_map(|(i, v)| Some((i, v.take()?)));
164
165    for (i, file) in files_iter {
166        eprintln!("Flushing recorded actions to disk before shutdown. i={i}");
167        let _ = file.sync_all();
168    }
169}