mina_node_testing/server/
simulator.rs

1use std::{sync::Arc, time::Duration};
2
3use axum::{
4    extract::{Json, State},
5    http::StatusCode,
6    routing::put,
7};
8use mina_core::channels::oneshot;
9use mina_p2p_messages::v2;
10use serde::{Deserialize, Serialize};
11
12use crate::{
13    cluster::ClusterConfig,
14    scenarios::ClusterRunner,
15    simulator::{Simulator, SimulatorConfig},
16};
17
18use super::AppState;
19
20pub fn simulations_router() -> axum::Router<AppState> {
21    axum::Router::new().route("/", put(simulation_create))
22}
23
24#[derive(Deserialize)]
25struct SimulationCreateArgs {
26    cluster: ClusterConfig,
27    simulator: SimulatorConfig,
28    #[serde(default)]
29    override_genesis_state_timestamp: bool,
30}
31
32#[derive(Serialize)]
33struct SimulationCreateResponse {
34    cluster_id: u16,
35}
36
37async fn simulation_create(
38    State(state): State<AppState>,
39    Json(args): Json<SimulationCreateArgs>,
40) -> Result<Json<SimulationCreateResponse>, (StatusCode, String)> {
41    async fn setup(
42        state: AppState,
43        mut args: SimulationCreateArgs,
44    ) -> Result<(u16, Simulator), (StatusCode, String)> {
45        let (cluster_id, mut cluster) = state.cluster_create_empty(args.cluster).await?;
46
47        let initial_time = redux::Timestamp::global_now();
48        if args.override_genesis_state_timestamp {
49            Arc::get_mut(&mut args.simulator.genesis)
50                .unwrap()
51                .override_genesis_state_timestamp(v2::BlockTimeTimeStableV1(
52                    (u64::from(initial_time) / 1_000_000).into(),
53                ));
54        }
55        let mut simulator = Simulator::new(initial_time, args.simulator);
56        simulator
57            .setup(&mut ClusterRunner::new(&mut cluster, |_| {}))
58            .await;
59        Ok((cluster_id, simulator))
60    }
61    let (setup_tx, setup_rx) = oneshot::channel();
62    let state_clone = state.clone();
63
64    std::thread::spawn(move || {
65        let state = state_clone;
66        let rt = tokio::runtime::Builder::new_current_thread()
67            .enable_all()
68            .build()
69            .unwrap();
70        rt.block_on(async move {
71            let (cluster_id, mut simulator) = match setup(state.clone(), args).await {
72                Err(err) => {
73                    let _ = setup_tx.send(Err(err));
74                    return;
75                }
76                Ok((cluster_id, simulator)) => {
77                    let _ = setup_tx.send(Ok(cluster_id));
78                    (cluster_id, simulator)
79                }
80            };
81            let cluster_mutex = match state.cluster_mutex(cluster_id).await {
82                Err(_) => return,
83                Ok(cluster_mutex) => Arc::downgrade(&cluster_mutex),
84            };
85            while let Some(cluster_mutex) = cluster_mutex.upgrade() {
86                let mut cluster = cluster_mutex.lock().await;
87                let mut runner = ClusterRunner::new(&mut cluster, |_| {});
88                let _ =
89                    tokio::time::timeout(Duration::from_millis(500), simulator.run(&mut runner))
90                        .await;
91            }
92        });
93    });
94    let cluster_id = setup_rx.await.unwrap()?;
95
96    Ok(Json(SimulationCreateResponse { cluster_id }))
97}