mina_node_testing/server/
simulator.rs1use std::{sync::Arc, time::Duration};
2
3use axum::{
4 extract::{Json, State},
5 http::StatusCode,
6 routing::put,
7};
8use mina_core::channels::oneshot;
9use mina_p2p_messages::v2;
10use serde::{Deserialize, Serialize};
11
12use crate::{
13 cluster::ClusterConfig,
14 scenarios::ClusterRunner,
15 simulator::{Simulator, SimulatorConfig},
16};
17
18use super::AppState;
19
20pub fn simulations_router() -> axum::Router<AppState> {
21 axum::Router::new().route("/", put(simulation_create))
22}
23
24#[derive(Deserialize)]
25struct SimulationCreateArgs {
26 cluster: ClusterConfig,
27 simulator: SimulatorConfig,
28 #[serde(default)]
29 override_genesis_state_timestamp: bool,
30}
31
32#[derive(Serialize)]
33struct SimulationCreateResponse {
34 cluster_id: u16,
35}
36
37async fn simulation_create(
38 State(state): State<AppState>,
39 Json(args): Json<SimulationCreateArgs>,
40) -> Result<Json<SimulationCreateResponse>, (StatusCode, String)> {
41 async fn setup(
42 state: AppState,
43 mut args: SimulationCreateArgs,
44 ) -> Result<(u16, Simulator), (StatusCode, String)> {
45 let (cluster_id, mut cluster) = state.cluster_create_empty(args.cluster).await?;
46
47 let initial_time = redux::Timestamp::global_now();
48 if args.override_genesis_state_timestamp {
49 Arc::get_mut(&mut args.simulator.genesis)
50 .unwrap()
51 .override_genesis_state_timestamp(v2::BlockTimeTimeStableV1(
52 (u64::from(initial_time) / 1_000_000).into(),
53 ));
54 }
55 let mut simulator = Simulator::new(initial_time, args.simulator);
56 simulator
57 .setup(&mut ClusterRunner::new(&mut cluster, |_| {}))
58 .await;
59 Ok((cluster_id, simulator))
60 }
61 let (setup_tx, setup_rx) = oneshot::channel();
62 let state_clone = state.clone();
63
64 std::thread::spawn(move || {
65 let state = state_clone;
66 let rt = tokio::runtime::Builder::new_current_thread()
67 .enable_all()
68 .build()
69 .unwrap();
70 rt.block_on(async move {
71 let (cluster_id, mut simulator) = match setup(state.clone(), args).await {
72 Err(err) => {
73 let _ = setup_tx.send(Err(err));
74 return;
75 }
76 Ok((cluster_id, simulator)) => {
77 let _ = setup_tx.send(Ok(cluster_id));
78 (cluster_id, simulator)
79 }
80 };
81 let cluster_mutex = match state.cluster_mutex(cluster_id).await {
82 Err(_) => return,
83 Ok(cluster_mutex) => Arc::downgrade(&cluster_mutex),
84 };
85 while let Some(cluster_mutex) = cluster_mutex.upgrade() {
86 let mut cluster = cluster_mutex.lock().await;
87 let mut runner = ClusterRunner::new(&mut cluster, |_| {});
88 let _ =
89 tokio::time::timeout(Duration::from_millis(500), simulator.run(&mut runner))
90 .await;
91 }
92 });
93 });
94 let cluster_id = setup_rx.await.unwrap()?;
95
96 Ok(Json(SimulationCreateResponse { cluster_id }))
97}