openmina_node_common/service/archive/
mod.rs

1use mina_p2p_messages::v2::{self};
2use node::{
3    core::{channels::mpsc, thread},
4    ledger::write::BlockApplyResult,
5};
6use std::{env, io::Write};
7
8use mina_p2p_messages::v2::PrecomputedBlock;
9use openmina_core::NetworkConfig;
10use std::net::SocketAddr;
11
12use super::NodeService;
13
14#[cfg(not(target_arch = "wasm32"))]
15pub mod aws;
16#[cfg(not(target_arch = "wasm32"))]
17pub mod gcp;
18#[cfg(not(target_arch = "wasm32"))]
19pub mod rpc;
20
21pub mod config;
22
23use config::ArchiveStorageOptions;
24
25const ARCHIVE_SEND_RETRIES: u8 = 5;
26const MAX_EVENT_COUNT: u64 = 100;
27const RETRY_INTERVAL_MS: u64 = 1000;
28
29#[derive(Debug, thiserror::Error)]
30pub enum Error {
31    #[error("Environment variable {0} is not set")]
32    EnvironmentVariableNotSet(String),
33    #[error("Failed to upload block to AWS: {0}")]
34    UploadError(String),
35}
36
37pub struct ArchiveService {
38    archive_sender: mpsc::UnboundedSender<BlockApplyResult>,
39}
40
41#[cfg(not(target_arch = "wasm32"))]
42struct ArchiveServiceClients {
43    archiver_address: Option<SocketAddr>,
44    aws_client: Option<aws::ArchiveAWSClient>,
45    gcp_client: Option<gcp::ArchiveGCPClient>,
46    local_path: Option<String>,
47}
48
49#[cfg(not(target_arch = "wasm32"))]
50impl ArchiveServiceClients {
51    async fn new(options: &ArchiveStorageOptions, work_dir: String) -> Result<Self, Error> {
52        let aws_client = if options.uses_aws_precomputed_storage() {
53            let client = aws::ArchiveAWSClient::new().await?;
54            Some(client)
55        } else {
56            None
57        };
58
59        let gcp_client = if options.uses_gcp_precomputed_storage() {
60            let client = gcp::ArchiveGCPClient::new().await?;
61            Some(client)
62        } else {
63            None
64        };
65
66        let local_path = if options.uses_local_precomputed_storage() {
67            let env_path = env::var("OPENMINA_LOCAL_PRECOMPUTED_STORAGE_PATH");
68            let default = format!("{}/archive-precomputed", work_dir);
69            Some(env_path.unwrap_or(default))
70        } else {
71            None
72        };
73
74        let archiver_address = if options.uses_archiver_process() {
75            let address = std::env::var("OPENMINA_ARCHIVE_ADDRESS")
76                .expect("OPENMINA_ARCHIVE_ADDRESS is not set");
77            let address = reqwest::Url::parse(&address).expect("Invalid URL");
78
79            // Convert URL to SocketAddr
80            let socket_addrs = address.socket_addrs(|| None).expect("Invalid URL");
81
82            let socket_addr = socket_addrs.first().expect("No socket address found");
83
84            Some(*socket_addr)
85        } else {
86            None
87        };
88
89        Ok(Self {
90            archiver_address,
91            aws_client,
92            gcp_client,
93            local_path,
94        })
95    }
96
97    pub async fn send_block(&self, breadcrumb: BlockApplyResult, options: &ArchiveStorageOptions) {
98        if options.uses_archiver_process() {
99            if let Some(socket_addr) = self.archiver_address {
100                Self::handle_archiver_process(&breadcrumb, &socket_addr).await;
101            } else {
102                node::core::warn!(summary = "Archiver address not set");
103            }
104        }
105
106        if options.requires_precomputed_block() {
107            let network_name = NetworkConfig::global().name;
108            let height = breadcrumb.block.height();
109            let state_hash = breadcrumb.block.hash();
110
111            let key = format!("{network_name}-{height}-{state_hash}.json");
112
113            node::core::info!(
114                summary = "Uploading precomputed block to archive",
115                key = key.clone()
116            );
117
118            let precomputed_block: PrecomputedBlock = match breadcrumb.try_into() {
119                Ok(block) => block,
120                Err(_) => {
121                    node::core::warn!(
122                        summary = "Failed to convert breadcrumb to precomputed block"
123                    );
124                    return;
125                }
126            };
127
128            let data = match serde_json::to_vec(&precomputed_block) {
129                Ok(data) => data,
130                Err(e) => {
131                    node::core::warn!(
132                        summary = "Failed to serialize precomputed block",
133                        error = e.to_string()
134                    );
135                    return;
136                }
137            };
138
139            if options.uses_local_precomputed_storage() {
140                if let Some(path) = &self.local_path {
141                    let key_clone = key.clone();
142                    match write_to_local_storage(path, &key, &data) {
143                        Ok(_) => node::core::info!(
144                            summary = "Successfully wrote precomputed block to local storage",
145                            key = key_clone
146                        ),
147                        Err(e) => node::core::warn!(
148                            summary = "Failed to write precomputed block to local storage",
149                            key = key_clone,
150                            error = e.to_string()
151                        ),
152                    }
153                } else {
154                    node::core::warn!(summary = "Local precomputed storage path not set");
155                }
156            }
157
158            if options.uses_gcp_precomputed_storage() {
159                if let Some(client) = &self.gcp_client {
160                    if let Err(e) = client.upload_block(&key, &data).await {
161                        node::core::warn!(
162                            summary = "Failed to upload precomputed block to GCP",
163                            error = e.to_string()
164                        );
165                    }
166                } else {
167                    node::core::warn!(summary = "GCP client not initialized");
168                }
169            }
170            if options.uses_aws_precomputed_storage() {
171                if let Some(client) = &self.aws_client {
172                    if let Err(e) = client.upload_block(&key, &data).await {
173                        node::core::warn!(
174                            summary = "Failed to upload precomputed block to AWS",
175                            error = e.to_string()
176                        );
177                    }
178                } else {
179                    node::core::warn!(summary = "AWS client not initialized");
180                }
181            }
182        }
183    }
184
185    async fn handle_archiver_process(breadcrumb: &BlockApplyResult, socket_addr: &SocketAddr) {
186        let mut retries = ARCHIVE_SEND_RETRIES;
187
188        let archive_transition_frontier_diff: v2::ArchiveTransitionFrontierDiff =
189            breadcrumb.clone().try_into().unwrap();
190
191        for _ in 0..ARCHIVE_SEND_RETRIES {
192            match rpc::send_diff(
193                *socket_addr,
194                v2::ArchiveRpc::SendDiff(archive_transition_frontier_diff.clone()),
195            ) {
196                Ok(result) if result.should_retry() => {
197                    node::core::warn!(summary = "Archive closed connection, retrying...");
198                    tokio::time::sleep(tokio::time::Duration::from_millis(RETRY_INTERVAL_MS)).await;
199                }
200                Ok(_) => {
201                    node::core::info!(summary = "Successfully sent diff to archive");
202                    return;
203                }
204                Err(e) => {
205                    node::core::warn!(
206                        summary = "Failed sending diff to archive",
207                        error = e.to_string(),
208                        retries = retries
209                    );
210                    tokio::time::sleep(tokio::time::Duration::from_millis(RETRY_INTERVAL_MS)).await;
211                }
212            }
213            retries -= 1;
214        }
215    }
216}
217
218impl ArchiveService {
219    fn new(archive_sender: mpsc::UnboundedSender<BlockApplyResult>) -> Self {
220        Self { archive_sender }
221    }
222
223    #[cfg(not(target_arch = "wasm32"))]
224    async fn run(
225        mut archive_receiver: mpsc::UnboundedReceiver<BlockApplyResult>,
226        options: ArchiveStorageOptions,
227        work_dir: String,
228    ) {
229        let clients = match ArchiveServiceClients::new(&options, work_dir).await {
230            Ok(clients) => clients,
231            Err(e) => {
232                node::core::error!(
233                    summary = "Failed to initialize archive service clients",
234                    error = e.to_string()
235                );
236                return;
237            }
238        };
239
240        while let Some(breadcrumb) = archive_receiver.recv().await {
241            clients.send_block(breadcrumb, &options).await;
242        }
243    }
244
245    // Note: Placeholder for the wasm implementation, if we decide to include an archive mode in the future
246    #[cfg(target_arch = "wasm32")]
247    fn run(
248        mut archive_receiver: mpsc::UnboundedReceiver<BlockApplyResult>,
249        options: ArchiveStorageOptions,
250        work_dir: String,
251    ) {
252        unimplemented!()
253    }
254
255    pub fn start(options: ArchiveStorageOptions, work_dir: String) -> Self {
256        let (archive_sender, archive_receiver) = mpsc::unbounded_channel::<BlockApplyResult>();
257
258        #[cfg(not(target_arch = "wasm32"))]
259        Self::start_native(archive_receiver, options, work_dir);
260
261        #[cfg(target_arch = "wasm32")]
262        Self::start_wasm(archive_receiver, options, work_dir);
263
264        Self::new(archive_sender)
265    }
266
267    #[cfg(not(target_arch = "wasm32"))]
268    fn start_native(
269        archive_receiver: mpsc::UnboundedReceiver<BlockApplyResult>,
270        options: ArchiveStorageOptions,
271        work_dir: String,
272    ) {
273        let runtime = tokio::runtime::Builder::new_current_thread()
274            .enable_all()
275            .build()
276            .unwrap();
277
278        thread::Builder::new()
279            .name("openmina_archive".to_owned())
280            .spawn(move || {
281                runtime.block_on(Self::run(archive_receiver, options, work_dir));
282            })
283            .unwrap();
284    }
285
286    #[cfg(target_arch = "wasm32")]
287    fn start_wasm(
288        archive_receiver: mpsc::UnboundedReceiver<BlockApplyResult>,
289        options: ArchiveStorageOptions,
290        work_dir: String,
291    ) {
292        thread::Builder::new()
293            .name("openmina_archive".to_owned())
294            .spawn(move || {
295                Self::run(archive_receiver, options, work_dir);
296            })
297            .unwrap();
298    }
299}
300
301impl node::transition_frontier::archive::archive_service::ArchiveService for NodeService {
302    fn send_to_archive(&mut self, data: BlockApplyResult) {
303        if let Some(archive) = self.archive.as_mut() {
304            if let Err(e) = archive.archive_sender.send(data) {
305                node::core::warn!(
306                    summary = "Failed sending diff to archive service",
307                    error = e.to_string()
308                );
309            }
310        }
311    }
312}
313
314// Note: Placeholder for the wasm implementation, if we decide to include an archive mode in the future
315#[cfg(target_arch = "wasm32")]
316mod rpc {}
317
318fn write_to_local_storage(base_path: &str, key: &str, data: &[u8]) -> Result<(), Error> {
319    use std::{
320        fs::{create_dir_all, File},
321        path::Path,
322    };
323
324    let path = Path::new(base_path).join(key);
325    if let Some(parent) = path.parent() {
326        create_dir_all(parent)
327            .map_err(|e| Error::UploadError(format!("Directory creation failed: {}", e)))?;
328    }
329
330    let mut file = File::create(&path)
331        .map_err(|e| Error::UploadError(format!("File creation failed: {}", e)))?;
332
333    file.write_all(data)
334        .map_err(|e| Error::UploadError(format!("File write failed: {}", e)))?;
335
336    Ok(())
337}