1mod config;
2pub use config::*;
3use mina_core::{thread, ChainId};
4use mina_p2p_messages::v2::StateHash;
5use node::{
6 core::log::{info, system_time},
7 p2p::{
8 connection::outgoing::{
9 P2pConnectionOutgoingInitLibp2pOpts, P2pConnectionOutgoingInitOpts,
10 },
11 PeerId,
12 },
13};
14
15use std::{
16 path::{Path, PathBuf},
17 process::Child,
18 time::Duration,
19};
20
21use serde::{Deserialize, Serialize};
22
23pub struct OcamlNode {
24 child: Child,
25 executable: OcamlNodeExecutable,
26 pub libp2p_port: u16,
27 pub graphql_port: u16,
28 peer_id: libp2p::PeerId,
29 #[allow(dead_code)]
30 temp_dir: temp_dir::TempDir,
31}
32
33#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
34pub enum OcamlStep {
35 WaitReady { timeout: Duration },
39 Kill,
42 KillAndRemove,
45}
46
47impl OcamlNode {
48 pub fn start(config: OcamlNodeConfig) -> anyhow::Result<Self> {
49 let dir = config.dir.path();
50 let config_dir = dir.join(".config");
51 let daemon_json_path = config_dir.join("daemon.json");
52
53 Self::make_dir_rec(&config_dir)
54 .map_err(|err| anyhow::anyhow!("failed to create config dir: {err}"))?;
55
56 let peer_id = match Self::read_peer_id(dir) {
57 Ok(v) => v,
58 Err(_) => Self::generate_libp2p_keypair(&config, dir).map_err(|err| {
59 anyhow::anyhow!("failed to generate libp2p keys for ocaml node. err: {err}")
60 })?,
61 };
62 let peer_id = peer_id.parse()?;
63
64 match &config.daemon_json {
65 DaemonJson::Custom(path) => {
66 std::fs::copy(path, &daemon_json_path).map_err(|err| {
67 anyhow::anyhow!(
68 "failed to copy daemon_json from: '{}', to: '{}'; error: {}",
69 path,
70 daemon_json_path.display(),
71 err
72 )
73 })?;
74 }
75 DaemonJson::InMem(json) => {
76 std::fs::write(&daemon_json_path, json.to_string()).map_err(|err| {
77 anyhow::anyhow!(
78 "failed to write InMem daemon.json to {}; error: {}",
79 daemon_json_path.display(),
80 err
81 )
82 })?;
83 }
84 }
85
86 let block_producer = match config.block_producer.as_ref() {
87 None => None,
88 Some(sec_key) => {
89 let sec_key_bs58 = sec_key.to_string();
90 let key_path = config_dir.join("block_producer_key");
91 let mut cmd = config.cmd([
92 ("CODA_PRIVKEY", sec_key_bs58.as_str()),
93 ("MINA_PRIVKEY_PASS", ""),
94 ]);
95 cmd.arg("advanced")
96 .arg("wrap-key")
97 .arg("--privkey-path")
98 .arg(&key_path);
99 if !cmd
100 .status()
101 .map_err(|err| anyhow::anyhow!("block producer key wrap failed: {err}"))?
102 .success()
103 {
104 anyhow::bail!("block producer key wrap failed! Unknown error");
105 }
106 Some(key_path)
107 }
108 };
109
110 let mut cmd = config.cmd([("MINA_LIBP2P_PASS", ""), ("MINA_PRIVKEY_PASS", "")]);
111
112 cmd.arg("daemon");
113 cmd.arg("--config-dir").arg(&config_dir);
114 cmd.arg("--libp2p-keypair").arg(Self::privkey_path(dir));
115 cmd.args(["--external-ip", "127.0.0.1"])
116 .args(["--external-port", &config.libp2p_port.to_string()])
117 .args(["--client-port", &config.client_port.to_string()])
118 .args(["--rest-port", &config.graphql_port.to_string()]);
119
120 let is_seed = config.initial_peers.is_empty();
121 for peer in config.initial_peers {
122 cmd.args(["--peer", &peer.to_string()]);
123 }
124 if is_seed {
125 cmd.arg("--seed");
126 }
127 if let Some(key_path) = block_producer {
128 cmd.arg("--block-producer-key").arg(key_path);
129 }
130
131 cmd.stdout(std::process::Stdio::piped())
132 .stderr(std::process::Stdio::piped());
133
134 info!(system_time(); "Spawning OCaml daemon process");
135 let mut child = cmd.spawn()?;
136 info!(system_time(); "OCaml daemon process started with PID: {:?}", child.id());
137
138 let stdout = child
139 .stdout
140 .take()
141 .ok_or_else(|| anyhow::anyhow!("no stdout"))?;
142 let stderr = child
143 .stderr
144 .take()
145 .ok_or_else(|| anyhow::anyhow!("no stderr"))?;
146
147 let prefix = format!("[localhost:{}] ", config.libp2p_port);
148 let prefix2 = prefix.clone();
149 thread::spawn(
150 move || {
151 if Self::read_stream(stdout, std::io::stdout(), &prefix).is_err() {}
152 },
153 );
154 thread::spawn(
155 move || {
156 if Self::read_stream(stderr, std::io::stderr(), &prefix2).is_err() {}
157 },
158 );
159
160 Ok(Self {
161 child,
162 executable: config.executable,
163 libp2p_port: config.libp2p_port,
164 graphql_port: config.graphql_port,
165 peer_id,
166 temp_dir: config.dir,
167 })
168 }
169
170 pub fn dial_addr(&self) -> P2pConnectionOutgoingInitOpts {
171 P2pConnectionOutgoingInitOpts::LibP2P(P2pConnectionOutgoingInitLibp2pOpts {
172 peer_id: self.peer_id(),
173 host: [127, 0, 0, 1].into(),
174 port: self.libp2p_port,
175 })
176 }
177
178 pub fn peer_id(&self) -> PeerId {
179 self.peer_id.try_into().unwrap()
180 }
181
182 pub async fn exec(&mut self, step: OcamlStep) -> anyhow::Result<bool> {
183 Ok(match step {
184 OcamlStep::WaitReady { timeout } => {
185 let t = redux::Instant::now();
186 self.wait_for_p2p(timeout).await?;
187 self.wait_for_synced(timeout - t.elapsed()).await?;
188 true
189 }
190 OcamlStep::Kill | OcamlStep::KillAndRemove => {
191 self.kill()?;
192 true
193 }
194 })
195 }
196
197 fn kill(&mut self) -> std::io::Result<()> {
198 self.child.kill()
199 }
200
201 const PRIVKEY_PATH: &'static str = ".libp2p/key";
202 const LIBP2P_KEYS: [(&'static str, &'static str); 10] = [
203 (
204 "12D3KooWKG1ZakBYEirEWdFYSstTEvxzTxCyuyaebojKthiESfWi",
205 r#"{"box_primitive":"xsalsa20poly1305","pw_primitive":"argon2i","nonce":"7fZJwAwzGgwFGipwjVBTiinHt5NfjTFjSnhk8rA","pwsalt":"9Gz996pkoUjJ8docbT7fJvJg16iL","pwdiff":[134217728,6],"ciphertext":"7oHFyFJd9kWTfH6R7GvbHD4WXBw5JQeiQYBGUpFhiPuiAyEgi1BVbczrzS6njWJ9FkRdNgZqwmSo23GPX4Zs27m3U66dJvyahendHCndG3Wu9wi8yaees78AQpbsU7JRa7U5DyCs9d34QLwpsgrGC2CqtDHJD3K3YncxVDjk4CCKeHseukZXUvkFToqY9CZRLHgYXR29hiB8JyTgoQ4maDDBdqBpFRb6Rjfb3LX8WEat6NpTjWi4A9uvNyDqk68a2aAo8ofjP811SBYxjZY3PMdD4hs5UAAZqbUNA"}"#,
206 ),
207 (
208 "12D3KooWQoSDqpenkrjCnKCCLnwEMRVRqUfZJXqD8Y8awEnBAJZX",
209 r#"{"box_primitive":"xsalsa20poly1305","pw_primitive":"argon2i","nonce":"8psKzkQReBbhoWYriYSNGFVKsb7GMARxfSZLWDF","pwsalt":"9MdeSxJxE33e5sxYhnmPQNTDxbKj","pwdiff":[134217728,6],"ciphertext":"6YnXZjwJue344VEkLnFJ62VY9E2QxZPEtDoZSBXnNzJUEEK5MVjcC63GeM37kVXTVvoj8r9C9i4mUbTiwjpL9wg4NqxkcJpTMVBe6WDsYYrtt7S9o6p4xWAjm1hvbxXcsTzPN361amo2ZNCAuMGpCWQPnxeZ69bwLQkn4vKeGkdiUMdnziNfhKRaFcya4C1dNNoz8kAWFRexzZrjvSBymzZCvZPgof1mApyzcoWuYtAdENqbNURg2DBv53nLetmqA9zLTcDbXYE5hTgkVzMHa6qiia4xAhDbrax4r"}"#,
210 ),
211 (
212 "12D3KooWEHSJwkn5ZdYVAcULpb3D3k6U2K4sU2BFh5xhoig1J4NK",
213 r#"{"box_primitive":"xsalsa20poly1305","pw_primitive":"argon2i","nonce":"84F8XwEf5k2yPqgC8NSTdf5wQEw7kmRgEdyss91","pwsalt":"BeSN6EXqWBCx9fndvnib5BYnNNdc","pwdiff":[134217728,6],"ciphertext":"7ECwoC7vK5QRKrPZvrBmaWrTKJiHKq9fCZr8YyVPgfHFS2VQ1BeKgQPvoc5JHNhy2Yju8PDDKzS3zHCdRSGaVRX49VrxjSU9wg4Fj7EWPGEs6VNCQahudCovGFd69iqS8WvcDSSw6acaEstSssTFYV3mUviDbkRA5HM9fUvE8SBkg9rHeeghCQ4qLK7cFRywCx9P6nronv5b5yy15xJDjAp6h7fwNxA7daXGa3E8dhtsE6FaPCtefkLBKJFXzuo3CMdRcBMZTt2XVGHS27rxtn1j5jeT9y34EMA5t"}"#,
214 ),
215 (
216 "12D3KooWA1qcHYLWKZ7EUBMh3KKWbwys1DhH35WmH4y96scpntmv",
217 r#"{"box_primitive":"xsalsa20poly1305","pw_primitive":"argon2i","nonce":"6ifkqZVQ8MXHP5StG5umfJ2HhuSEpwxx33bRD4e","pwsalt":"9qREDkkiXgKV7VtryRcAkrzGY41p","pwdiff":[134217728,6],"ciphertext":"6uYbsTw56Vzaj4Jb9vWVQLTNHk92Qv6kFgZrqit8gUBya1M76U4wuNqQ1XMk5iKFSXFxf9hjtKN1NAbruUTnoySaCdCuLfjVJwUpDJFWwjRV3vmFjmZch9YjAL4H81z89V52BZkGpSantUxqGMLSrJjz8z5Rrr3C7ZCvooZddFieFkFfDtLBfhAsB3U2usMh83L6VMPg5Hawn7krdznSVzagiS1ENDPR92LfsCvxVVGxSNvUbBbjFCdroDb7eYi8mshiC3nGQWDYRQu9kQkHC6xsgarTsumkXnYpd"}"#,
218 ),
219 (
220 "12D3KooWC14WLzaCT6fkR5WzzrayXDhmBpox7yWkif7Pg6Sk4uz1",
221 r#"{"box_primitive":"xsalsa20poly1305","pw_primitive":"argon2i","nonce":"7rGH8gCM5UL9LvatyUP6Lka8b4Ev2o5Mbe5ou7L","pwsalt":"BKcQuXkYx6H4xfcLLGM8ykJP5a3D","pwdiff":[134217728,6],"ciphertext":"7tZEvuCoXyXRVxh9NNrMCRLXgXj8MHxNgwoesztLQTagMzKhf48EepzUReYEViNC2EpWb2h7yoJdXMUbDGUuSQdoM1eF3qum9rHtmU4xdv8SGBEP9q9YHb1n8YS2SEr7WNcN3DsX7cqrzfnSjDsXNZaGzR5CbrK7g3NGv8RVyT1uZF2VHHeapDY6nFCyKN9nUJKpizbbRguR25QwWyx2nzcKF3mGq2iuCNVN5Z6gqzk7fhD8XFayxj57MqwvWTRD6pLRBJcmqCF5L9ZqpKEHAjXMcv3nwkKBHJaAF"}"#,
222 ),
223 (
224 "12D3KooWRJEo19dU5eWgab1YrBPnK9HQA4SqDeQwx9NrankTcfSi",
225 r#"{"box_primitive":"xsalsa20poly1305","pw_primitive":"argon2i","nonce":"6VMHa83xqvbDzAppmfe9B8wn7dzzdAC1fyP5bXs","pwsalt":"9cXcTqJGi651tpA1ZBKSPVBP2h3q","pwdiff":[134217728,6],"ciphertext":"783EDXabmg2PmWhrSqcDog82NhWNMWasKC4o2d1oxVDTDxhmH5yGcjY74wV8HH16DpJw4ZxzW8gUCyC7Mhx1hEG8kc7wn38yBsoAqGfkA34g9n4FYJzwHvAB7on7zK3cveh2jXF3TTt3Etg4advpM7LvbY2eE9pz95TU1pCagu7haB83JHn2qSnfSCcMUTjS9copfJgkVD6YQYUxJmVi9erYUufjJqiF9p4ciCSuicU5SaJVB3rpSaRt1VgWMXeg47qWXVg98byNadHi8PgQZNnFifJc4FUDPtHDj"}"#,
226 ),
227 (
228 "12D3KooWPTtAt3LXFqs8vbGL9VACn5wgcBm1jvaHZTGHnHzFq7c4",
229 r#"{"box_primitive":"xsalsa20poly1305","pw_primitive":"argon2i","nonce":"8q2jNrxkiwzvk8LWuU8zWy93APjitkaZUKr3fDP","pwsalt":"B5YVY1p2yFfGRsjs2us7WnmhWqPq","pwdiff":[134217728,6],"ciphertext":"7xxCXkAf3DokjZQ6CtwbuuXMBeYB5p6KxQAqaFLx96yBAsqQaK3EprR3xDKVR78x7Zrzj4NXWrFow2cg4xtze12SS43t46E3QhSsYPohcuZzKJe4agGJMDZVHaqd1aAPtJd2CX1fZCrWmxpa3hB72H2EKYPFSG1FYv77cYxU45aJx3V1XQAEQtoYKP9FmL95xogJHVHQSe2xWrvga8CLY4qtshJWkwHP1mV59xam9WhhZZjZkSThYTJMW9f4NTQ2EwRuud9zReLkh8fGEvfoxFjMsw8NCVxrtdTi5"}"#,
230 ),
231 (
232 "12D3KooWLrjE3v7wZSCT4HsnXYmRsQnCWsaGcmU9mSJfPDRyjvpd",
233 r#"{"box_primitive":"xsalsa20poly1305","pw_primitive":"argon2i","nonce":"7zKo7kmPLjoJdvgxyMp2jguDnGDnyZ49TWfajiZ","pwsalt":"ARyFY4GSnyTC8ZjhvEaCSq3QeGsU","pwdiff":[134217728,6],"ciphertext":"7FfzcAPsEM7Lv5JzL7rYqgrHGV5bBDVVTEukULpDaCMGbHJDRFkgjxx6c6gbxCRbMJKma9yMrr5zxHsT9tfYCU7PAzqgVqa87TfphNXmqdSrNKWZVTS5SGMXAqku6vfJ19PA7TzJdr7oHZSjoim5Lh8r2x9iUTto7tdCBy4xWJQXE7aQYQ2ybB95DjzA3CtK7ypjxnZJDnvYq8zgXx9netbAX3NdTtsRDKQwNmjBzoQiKWd5jrqMigfFNcRTnJdpEn8jYMfa4fmXsnUe9ziXvjYdH9DJQA1354UQq"}"#,
234 ),
235 (
236 "12D3KooWKPaZfwU42A9SDxyuDGWsLYiedudJvDAHY81UMXLLEgTe",
237 r#"{"box_primitive":"xsalsa20poly1305","pw_primitive":"argon2i","nonce":"7QAvnHnfp1MMJgEYN8VVwLuwWF3GEMUG5DA2KWP","pwsalt":"9YP8yrJRfF6w4efUSTZrPRhoV8xZ","pwdiff":[134217728,6],"ciphertext":"6i8Efh1z9AZi2ExYLFBLsPGSAXQT72pgQNP9yPBfDwqr8dtQxvqdh6V8qXwjyVSiWLGs6zbUFQj1mJ17F9irLzUTFLQ9WG98HveuUJLxv2WPoEekb2AUntAYNkbgVEmtWEYytWqYZnJUx5g1cnLo5ENzrcDTDcbHbQnYVkyH2GNpNrH4WqjK3TyDS2PzKwTwBFVMyhK4BUDVtjJWxfHnQNSaxT7gsGFNQTJjizxpqaqFk7Nc11GPaJmJFJKoVYD4ozCkeR2RKM9Dk7ct49vTQvULqqETwJmK1yMTf"}"#,
238 ),
239 (
240 "12D3KooWKrWvpCzTJs45HS8c8Hbo9sfe65wBVCLWW2gyJDmnDDif",
241 r#"{"box_primitive":"xsalsa20poly1305","pw_primitive":"argon2i","nonce":"8QEpCE7a48EdYXQ44ff9PXWDKw2AMmHMxc3nUaq","pwsalt":"8UUffeMbTFAwUwkamE1xWoVTJqDi","pwdiff":[134217728,6],"ciphertext":"93Wdv4Xq9794GWuZPZUPug2gfPpXD2dnLxSx9jGjtr7rTGv14W7JPieGJJ4zTw6T54x1NwyhH9HcDwsQxmUT364KibpbczuA9bnTFcp6ahoYpetrHB8FJTk7TprkmazqprJm7QDqJ97jyE7PuVNWg9NSbMRzet1c5Jxk2qfUYVdtSNgcQB5J5oUTibL6fc5UKZmfBoSixw3E3QFPnBRN8W7X3nfcHykK9eck2u5YJrv1gRYoupp2EX1CjWwKp3ebDa9bLLZiWTTSBKsj7uhLh5aCxgHpoPCyNcnaq"}"#,
242 ),
243 ];
244
245 fn privkey_path(dir: &Path) -> PathBuf {
246 dir.join(Self::PRIVKEY_PATH)
247 }
248
249 fn read_peer_id(dir: &Path) -> anyhow::Result<String> {
250 Ok(
251 std::fs::read_to_string(Self::privkey_path(dir).with_extension("peerid"))?
252 .trim()
253 .into(),
254 )
255 }
256
257 fn make_dir_rec(path: &Path) -> anyhow::Result<()> {
258 use std::os::unix::fs::DirBuilderExt;
259 std::fs::DirBuilder::new()
260 .recursive(true)
261 .mode(0o700)
262 .create(path)
263 .map_err(Into::into)
264 }
265
266 fn generate_libp2p_keypair(config: &OcamlNodeConfig, dir: &Path) -> anyhow::Result<String> {
267 use std::{fs::OpenOptions, io::Write, os::unix::fs::OpenOptionsExt};
268
269 let (peer_id, key) = Self::LIBP2P_KEYS[config.libp2p_keypair_i];
270 let privkey_path = Self::privkey_path(dir);
271 let privkey_parent_dir = privkey_path.as_path().parent().unwrap();
272 Self::make_dir_rec(privkey_parent_dir)?;
273
274 let mut file = OpenOptions::new()
275 .create(true)
276 .write(true)
277 .truncate(true)
278 .mode(0o600)
279 .open(&privkey_path)?;
280 file.write_all(key.as_bytes())?;
281 std::fs::write(
282 privkey_path.with_extension("peerid"),
283 format!("peerid:{peer_id}"),
284 )?;
285 Ok(peer_id.to_owned())
286 }
287
288 fn read_stream<R: std::io::Read, W: std::io::Write>(
289 from: R,
290 mut to: W,
291 prefix: &str,
292 ) -> std::io::Result<()> {
293 let mut buf = std::io::BufReader::new(from);
294 let mut line = String::with_capacity(256);
295 while std::io::BufRead::read_line(&mut buf, &mut line)? > 0 {
296 to.write_all(prefix.as_bytes())?;
297 to.write_all(line.as_bytes())?;
298 line.clear();
299 }
300 Ok(())
301 }
302
303 pub async fn chain_id(&self) -> anyhow::Result<ChainId> {
305 let res = self
306 .grapql_query("query { daemonStatus { chainId } }")
307 .await?;
308 let chain_id = res["data"]["daemonStatus"]["chainId"]
309 .as_str()
310 .ok_or_else(|| anyhow::anyhow!("empty chain_id response"))?;
311 ChainId::from_hex(chain_id).map_err(|e| anyhow::anyhow!("invalid chain_id: {}", e))
312 }
313
314 pub async fn synced_best_tip(&self) -> anyhow::Result<Option<StateHash>> {
317 let mut res = self
318 .grapql_query("query { daemonStatus { syncStatus, stateHash } }")
319 .await?;
320 let data = &mut res["data"]["daemonStatus"];
321 if data["syncStatus"].as_str() == Some("SYNCED") {
322 Ok(Some(serde_json::from_value(data["stateHash"].take())?))
323 } else {
324 Ok(None)
325 }
326 }
327
328 fn graphql_addr(&self) -> String {
329 format!("http://127.0.0.1:{}/graphql", self.graphql_port)
330 }
331
332 pub async fn grapql_query(&self, query: &str) -> anyhow::Result<serde_json::Value> {
338 let client = reqwest::Client::new();
339 let response = client
340 .post(self.graphql_addr())
341 .json(&{
342 serde_json::json!({
343 "query": query
344 })
345 })
346 .send()
347 .await?;
348
349 Ok(response.json().await?)
350 }
351
352 async fn wait_for_p2p(&self, timeout: Duration) -> anyhow::Result<()> {
353 let port = self.libp2p_port;
354 let timeout_fut = tokio::time::sleep(timeout);
355 let mut interval = tokio::time::interval(Duration::from_secs(2));
356 let probe = tokio::task::spawn(async move {
357 loop {
358 interval.tick().await;
359 if tokio::net::TcpStream::connect(("127.0.0.1", port))
360 .await
361 .is_ok()
362 {
363 return;
364 }
365 }
366 });
367 tokio::select! {
368 _ = timeout_fut => anyhow::bail!("waiting for ocaml node's p2p port to be ready timed out! timeout: {timeout:?}"),
369 _ = probe => Ok(()),
370 }
371 }
372
373 async fn wait_for_synced(&self, timeout: Duration) -> anyhow::Result<()> {
374 let mut interval = tokio::time::interval(Duration::from_secs(1));
375 tokio::time::timeout(timeout, async {
376 loop {
377 interval.tick().await;
378 if self.synced_best_tip().await.is_ok_and(|tip| tip.is_some()) {
379 return;
380 }
381 }
382 })
383 .await
384 .map_err(|_| {
385 anyhow::anyhow!("waiting for ocaml node to be synced timed out! timeout: {timeout:?}")
386 })
387 }
388}
389
390impl Drop for OcamlNode {
391 fn drop(&mut self) {
392 match self.child.try_wait() {
393 Err(err) => {
394 eprintln!("error getting status from OCaml node: {err}");
395 }
396 Ok(None) => {
397 self.executable.kill(&self.temp_dir);
398 if let Err(err) = self.child.kill() {
399 eprintln!("error killing OCaml node: {err}");
400 } else if let Err(err) = self.child.wait() {
401 eprintln!("error getting status from OCaml node: {err}");
402 }
403 }
404 _ => {}
405 }
406 }
407}
408
409#[test]
410fn run_ocaml() {
411 use crate::node::DaemonJson;
412
413 let mut node = OcamlNode::start(OcamlNodeConfig {
414 executable: OcamlNodeExecutable::find_working().unwrap(),
415 dir: temp_dir::TempDir::new().unwrap(),
416 libp2p_keypair_i: 0,
417 libp2p_port: 8302,
418 graphql_port: 3086,
419 client_port: 8301,
420 initial_peers: Vec::new(),
421 daemon_json: DaemonJson::Custom("/var/lib/coda/config_6929a7ec.json".to_owned()),
422 block_producer: None,
423 })
424 .unwrap();
425
426 node.child.wait().unwrap();
427}