openmina_node_common/service/archive/
mod.rs1use mina_p2p_messages::v2::{self};
2use node::{
3 core::{channels::mpsc, thread},
4 ledger::write::BlockApplyResult,
5};
6use std::{env, io::Write};
7
8use mina_p2p_messages::v2::PrecomputedBlock;
9use openmina_core::NetworkConfig;
10use std::net::SocketAddr;
11
12use super::NodeService;
13
14#[cfg(not(target_arch = "wasm32"))]
15pub mod aws;
16#[cfg(not(target_arch = "wasm32"))]
17pub mod gcp;
18#[cfg(not(target_arch = "wasm32"))]
19pub mod rpc;
20
21pub mod config;
22
23use config::ArchiveStorageOptions;
24
25const ARCHIVE_SEND_RETRIES: u8 = 5;
26const MAX_EVENT_COUNT: u64 = 100;
27const RETRY_INTERVAL_MS: u64 = 1000;
28
29#[derive(Debug, thiserror::Error)]
30pub enum Error {
31 #[error("Environment variable {0} is not set")]
32 EnvironmentVariableNotSet(String),
33 #[error("Failed to upload block to AWS: {0}")]
34 UploadError(String),
35}
36
37pub struct ArchiveService {
38 archive_sender: mpsc::UnboundedSender<BlockApplyResult>,
39}
40
41#[cfg(not(target_arch = "wasm32"))]
42struct ArchiveServiceClients {
43 archiver_address: Option<SocketAddr>,
44 aws_client: Option<aws::ArchiveAWSClient>,
45 gcp_client: Option<gcp::ArchiveGCPClient>,
46 local_path: Option<String>,
47}
48
49#[cfg(not(target_arch = "wasm32"))]
50impl ArchiveServiceClients {
51 async fn new(options: &ArchiveStorageOptions, work_dir: String) -> Result<Self, Error> {
52 let aws_client = if options.uses_aws_precomputed_storage() {
53 let client = aws::ArchiveAWSClient::new().await?;
54 Some(client)
55 } else {
56 None
57 };
58
59 let gcp_client = if options.uses_gcp_precomputed_storage() {
60 let client = gcp::ArchiveGCPClient::new().await?;
61 Some(client)
62 } else {
63 None
64 };
65
66 let local_path = if options.uses_local_precomputed_storage() {
67 let env_path = env::var("OPENMINA_LOCAL_PRECOMPUTED_STORAGE_PATH");
68 let default = format!("{}/archive-precomputed", work_dir);
69 Some(env_path.unwrap_or(default))
70 } else {
71 None
72 };
73
74 let archiver_address = if options.uses_archiver_process() {
75 let address = std::env::var("OPENMINA_ARCHIVE_ADDRESS")
76 .expect("OPENMINA_ARCHIVE_ADDRESS is not set");
77 let address = reqwest::Url::parse(&address).expect("Invalid URL");
78
79 let socket_addrs = address.socket_addrs(|| None).expect("Invalid URL");
81
82 let socket_addr = socket_addrs.first().expect("No socket address found");
83
84 Some(*socket_addr)
85 } else {
86 None
87 };
88
89 Ok(Self {
90 archiver_address,
91 aws_client,
92 gcp_client,
93 local_path,
94 })
95 }
96
97 pub async fn send_block(&self, breadcrumb: BlockApplyResult, options: &ArchiveStorageOptions) {
98 if options.uses_archiver_process() {
99 if let Some(socket_addr) = self.archiver_address {
100 Self::handle_archiver_process(&breadcrumb, &socket_addr).await;
101 } else {
102 node::core::warn!(summary = "Archiver address not set");
103 }
104 }
105
106 if options.requires_precomputed_block() {
107 let network_name = NetworkConfig::global().name;
108 let height = breadcrumb.block.height();
109 let state_hash = breadcrumb.block.hash();
110
111 let key = format!("{network_name}-{height}-{state_hash}.json");
112
113 node::core::info!(
114 summary = "Uploading precomputed block to archive",
115 key = key.clone()
116 );
117
118 let precomputed_block: PrecomputedBlock = match breadcrumb.try_into() {
119 Ok(block) => block,
120 Err(_) => {
121 node::core::warn!(
122 summary = "Failed to convert breadcrumb to precomputed block"
123 );
124 return;
125 }
126 };
127
128 let data = match serde_json::to_vec(&precomputed_block) {
129 Ok(data) => data,
130 Err(e) => {
131 node::core::warn!(
132 summary = "Failed to serialize precomputed block",
133 error = e.to_string()
134 );
135 return;
136 }
137 };
138
139 if options.uses_local_precomputed_storage() {
140 if let Some(path) = &self.local_path {
141 let key_clone = key.clone();
142 match write_to_local_storage(path, &key, &data) {
143 Ok(_) => node::core::info!(
144 summary = "Successfully wrote precomputed block to local storage",
145 key = key_clone
146 ),
147 Err(e) => node::core::warn!(
148 summary = "Failed to write precomputed block to local storage",
149 key = key_clone,
150 error = e.to_string()
151 ),
152 }
153 } else {
154 node::core::warn!(summary = "Local precomputed storage path not set");
155 }
156 }
157
158 if options.uses_gcp_precomputed_storage() {
159 if let Some(client) = &self.gcp_client {
160 if let Err(e) = client.upload_block(&key, &data).await {
161 node::core::warn!(
162 summary = "Failed to upload precomputed block to GCP",
163 error = e.to_string()
164 );
165 }
166 } else {
167 node::core::warn!(summary = "GCP client not initialized");
168 }
169 }
170 if options.uses_aws_precomputed_storage() {
171 if let Some(client) = &self.aws_client {
172 if let Err(e) = client.upload_block(&key, &data).await {
173 node::core::warn!(
174 summary = "Failed to upload precomputed block to AWS",
175 error = e.to_string()
176 );
177 }
178 } else {
179 node::core::warn!(summary = "AWS client not initialized");
180 }
181 }
182 }
183 }
184
185 async fn handle_archiver_process(breadcrumb: &BlockApplyResult, socket_addr: &SocketAddr) {
186 let mut retries = ARCHIVE_SEND_RETRIES;
187
188 let archive_transition_frontier_diff: v2::ArchiveTransitionFrontierDiff =
189 breadcrumb.clone().try_into().unwrap();
190
191 for _ in 0..ARCHIVE_SEND_RETRIES {
192 match rpc::send_diff(
193 *socket_addr,
194 v2::ArchiveRpc::SendDiff(archive_transition_frontier_diff.clone()),
195 ) {
196 Ok(result) if result.should_retry() => {
197 node::core::warn!(summary = "Archive closed connection, retrying...");
198 tokio::time::sleep(tokio::time::Duration::from_millis(RETRY_INTERVAL_MS)).await;
199 }
200 Ok(_) => {
201 node::core::info!(summary = "Successfully sent diff to archive");
202 return;
203 }
204 Err(e) => {
205 node::core::warn!(
206 summary = "Failed sending diff to archive",
207 error = e.to_string(),
208 retries = retries
209 );
210 tokio::time::sleep(tokio::time::Duration::from_millis(RETRY_INTERVAL_MS)).await;
211 }
212 }
213 retries -= 1;
214 }
215 }
216}
217
218impl ArchiveService {
219 fn new(archive_sender: mpsc::UnboundedSender<BlockApplyResult>) -> Self {
220 Self { archive_sender }
221 }
222
223 #[cfg(not(target_arch = "wasm32"))]
224 async fn run(
225 mut archive_receiver: mpsc::UnboundedReceiver<BlockApplyResult>,
226 options: ArchiveStorageOptions,
227 work_dir: String,
228 ) {
229 let clients = match ArchiveServiceClients::new(&options, work_dir).await {
230 Ok(clients) => clients,
231 Err(e) => {
232 node::core::error!(
233 summary = "Failed to initialize archive service clients",
234 error = e.to_string()
235 );
236 return;
237 }
238 };
239
240 while let Some(breadcrumb) = archive_receiver.recv().await {
241 clients.send_block(breadcrumb, &options).await;
242 }
243 }
244
245 #[cfg(target_arch = "wasm32")]
247 fn run(
248 mut archive_receiver: mpsc::UnboundedReceiver<BlockApplyResult>,
249 options: ArchiveStorageOptions,
250 work_dir: String,
251 ) {
252 unimplemented!()
253 }
254
255 pub fn start(options: ArchiveStorageOptions, work_dir: String) -> Self {
256 let (archive_sender, archive_receiver) = mpsc::unbounded_channel::<BlockApplyResult>();
257
258 #[cfg(not(target_arch = "wasm32"))]
259 Self::start_native(archive_receiver, options, work_dir);
260
261 #[cfg(target_arch = "wasm32")]
262 Self::start_wasm(archive_receiver, options, work_dir);
263
264 Self::new(archive_sender)
265 }
266
267 #[cfg(not(target_arch = "wasm32"))]
268 fn start_native(
269 archive_receiver: mpsc::UnboundedReceiver<BlockApplyResult>,
270 options: ArchiveStorageOptions,
271 work_dir: String,
272 ) {
273 let runtime = tokio::runtime::Builder::new_current_thread()
274 .enable_all()
275 .build()
276 .unwrap();
277
278 thread::Builder::new()
279 .name("openmina_archive".to_owned())
280 .spawn(move || {
281 runtime.block_on(Self::run(archive_receiver, options, work_dir));
282 })
283 .unwrap();
284 }
285
286 #[cfg(target_arch = "wasm32")]
287 fn start_wasm(
288 archive_receiver: mpsc::UnboundedReceiver<BlockApplyResult>,
289 options: ArchiveStorageOptions,
290 work_dir: String,
291 ) {
292 thread::Builder::new()
293 .name("openmina_archive".to_owned())
294 .spawn(move || {
295 Self::run(archive_receiver, options, work_dir);
296 })
297 .unwrap();
298 }
299}
300
301impl node::transition_frontier::archive::archive_service::ArchiveService for NodeService {
302 fn send_to_archive(&mut self, data: BlockApplyResult) {
303 if let Some(archive) = self.archive.as_mut() {
304 if let Err(e) = archive.archive_sender.send(data) {
305 node::core::warn!(
306 summary = "Failed sending diff to archive service",
307 error = e.to_string()
308 );
309 }
310 }
311 }
312}
313
314#[cfg(target_arch = "wasm32")]
316mod rpc {}
317
318fn write_to_local_storage(base_path: &str, key: &str, data: &[u8]) -> Result<(), Error> {
319 use std::{
320 fs::{create_dir_all, File},
321 path::Path,
322 };
323
324 let path = Path::new(base_path).join(key);
325 if let Some(parent) = path.parent() {
326 create_dir_all(parent)
327 .map_err(|e| Error::UploadError(format!("Directory creation failed: {}", e)))?;
328 }
329
330 let mut file = File::create(&path)
331 .map_err(|e| Error::UploadError(format!("File creation failed: {}", e)))?;
332
333 file.write_all(data)
334 .map_err(|e| Error::UploadError(format!("File write failed: {}", e)))?;
335
336 Ok(())
337}