mina_node_testing/server/
mod.rs

1pub mod simulator;
2pub mod webnode;
3
4use crate::{
5    cluster::{Cluster, ClusterConfig, ClusterNodeId},
6    node::NodeTestingConfig,
7    scenario::{event_details, Scenario, ScenarioId, ScenarioInfo, ScenarioStep},
8    service::PendingEventId,
9};
10
11use std::{
12    collections::{BTreeMap, BTreeSet},
13    path::PathBuf,
14    sync::Arc,
15    time::Duration,
16};
17
18use axum::{
19    extract::{Path, State},
20    http::{header, StatusCode},
21    middleware,
22    routing::{get, get_service, post, put},
23    Json, Router,
24};
25use mina_node_native::p2p::webrtc::webrtc_signal_send;
26use node::{
27    account::AccountPublicKey,
28    p2p::{
29        connection::outgoing::P2pConnectionOutgoingInitOpts,
30        webrtc::{Host, Offer, P2pConnectionResponse, SignalingMethod},
31    },
32    transition_frontier::genesis::{GenesisConfig, PrebuiltGenesisConfig},
33};
34use rand::{rngs::StdRng, Rng, SeedableRng};
35use serde::{Deserialize, Serialize};
36use tokio::{
37    net::TcpListener,
38    runtime::Runtime,
39    sync::{oneshot, Mutex, MutexGuard, OwnedMutexGuard},
40};
41use tower_http::{cors::CorsLayer, services::ServeDir};
42
43pub fn server(rt: Runtime, host: Host, port: u16, ssl_port: Option<u16>) {
44    let fe_dist_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
45        .join("../../")
46        .join("frontend/dist/frontend/");
47    eprintln!("scenarios path: {}", Scenario::PATH);
48    eprintln!("FrontEnd dist path: {fe_dist_dir:?}");
49
50    let state = AppState::new(host, ssl_port);
51
52    let scenarios_router = Router::new()
53        .route("/", get(scenario_list))
54        .route("/", put(scenario_create))
55        .route("/:id", get(scenario_get))
56        .route("/:id/nodes", put(scenario_node_add))
57        .route("/:id/steps", put(scenario_step_add));
58
59    let clusters_router = Router::new()
60        .route("/", get(cluster_list))
61        .route("/create/:scenario_id", put(cluster_create))
62        .route("/:cluster_id", get(cluster_get))
63        .nest("/:cluster_id/webnode", webnode::router())
64        .route("/:cluster_id/run", post(cluster_run))
65        .route("/:cluster_id/run/auto", post(cluster_run_auto))
66        .route(
67            "/:cluster_id/scenarios/reload",
68            post(cluster_scenarios_reload),
69        )
70        .route(
71            "/:cluster_id/mina/webrtc/signal/:offer",
72            get(cluster_webrtc_signal),
73        )
74        .route("/:cluster_id/seeds", get(cluster_seeds))
75        .route("/:cluster_id/genesis/config", get(cluster_genesis_config))
76        .route(
77            "/:cluster_id/nodes/events/pending",
78            get(cluster_events_pending),
79        )
80        .route(
81            "/:cluster_id/nodes/:node_id/events/pending",
82            get(cluster_node_events_pending),
83        )
84        .route("/:cluster_id/destroy", post(cluster_destroy));
85
86    let cors = CorsLayer::very_permissive();
87    let coop_coep = middleware::from_fn(|req, next: middleware::Next| async {
88        let mut resp = next.run(req).await;
89        resp.headers_mut().insert(
90            header::HeaderName::from_static("cross-origin-embedder-policy"),
91            header::HeaderValue::from_static("require-corp"),
92        );
93        resp.headers_mut().insert(
94            header::HeaderName::from_static("cross-origin-opener-policy"),
95            header::HeaderValue::from_static("same-origin"),
96        );
97        resp
98    });
99
100    let app = Router::new()
101        .nest("/scenarios", scenarios_router)
102        .nest("/clusters", clusters_router)
103        .nest("/simulations", simulator::simulations_router())
104        .fallback(get_service(ServeDir::new(&fe_dist_dir)).layer(coop_coep.clone()))
105        .with_state(state)
106        .layer(cors);
107
108    rt.block_on(async {
109        let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port));
110        let listener = TcpListener::bind(addr).await.unwrap();
111        axum::serve(listener, app.into_make_service())
112            .await
113            .unwrap();
114    });
115}
116
117pub struct AppStateInner {
118    host: Host,
119    ssl_port: Option<u16>,
120    rng: StdRng,
121    clusters: BTreeMap<u16, Arc<Mutex<Cluster>>>,
122    // TODO(binier): move inside cluster state
123    locked_block_producer_keys: BTreeMap<u16, BTreeSet<AccountPublicKey>>,
124}
125
126impl AppStateInner {
127    pub fn new(host: Host, ssl_port: Option<u16>) -> Self {
128        Self {
129            host,
130            ssl_port,
131            rng: StdRng::seed_from_u64(0),
132            clusters: Default::default(),
133            locked_block_producer_keys: Default::default(),
134        }
135    }
136}
137
138#[derive(Clone)]
139pub struct AppState(Arc<Mutex<AppStateInner>>);
140
141impl AppState {
142    pub fn new(host: Host, ssl_port: Option<u16>) -> Self {
143        Self(Arc::new(Mutex::new(AppStateInner::new(host, ssl_port))))
144    }
145
146    pub async fn lock(&self) -> MutexGuard<'_, AppStateInner> {
147        self.0.lock().await
148    }
149
150    async fn cluster_mutex(
151        &self,
152        cluster_id: u16,
153    ) -> Result<Arc<Mutex<Cluster>>, (StatusCode, String)> {
154        self.lock().await.cluster_mutex(cluster_id)
155    }
156
157    pub async fn cluster(
158        &self,
159        cluster_id: u16,
160    ) -> Result<OwnedMutexGuard<Cluster>, (StatusCode, String)> {
161        self.lock().await.cluster(cluster_id).await
162    }
163
164    pub async fn cluster_create(
165        &self,
166        scenario_id: ScenarioId,
167        config: ClusterConfig,
168    ) -> Result<(u16, OwnedMutexGuard<Cluster>), (StatusCode, String)> {
169        let scenario = Scenario::load(&scenario_id)
170            .await
171            .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?;
172
173        let mut cluster = Cluster::new(config);
174        cluster
175            .start(scenario)
176            .await
177            .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
178
179        let mut state = self.lock().await;
180        let id = loop {
181            let id = state.rng.gen();
182            if !state.clusters.contains_key(&id) {
183                break id;
184            }
185        };
186
187        let cluster = Arc::new(Mutex::new(cluster));
188        let cluster_guard = cluster.clone().try_lock_owned().unwrap();
189        state.clusters.insert(id, cluster);
190
191        Ok((id, cluster_guard))
192    }
193
194    pub async fn cluster_create_empty(
195        &self,
196        config: ClusterConfig,
197    ) -> Result<(u16, OwnedMutexGuard<Cluster>), (StatusCode, String)> {
198        let cluster = Cluster::new(config);
199
200        let mut state = self.lock().await;
201        let id = loop {
202            let id = state.rng.gen();
203            if !state.clusters.contains_key(&id) {
204                break id;
205            }
206        };
207
208        let cluster = Arc::new(Mutex::new(cluster));
209        let cluster_guard = cluster.clone().try_lock_owned().unwrap();
210        state.clusters.insert(id, cluster);
211
212        Ok((id, cluster_guard))
213    }
214
215    pub async fn cluster_destroy(&self, cluster_id: u16) -> bool {
216        let mut this = self.lock().await;
217        this.locked_block_producer_keys.remove(&cluster_id);
218        this.clusters.remove(&cluster_id).is_some()
219    }
220}
221
222impl AppStateInner {
223    fn cluster_mutex(&self, cluster_id: u16) -> Result<Arc<Mutex<Cluster>>, (StatusCode, String)> {
224        self.clusters.get(&cluster_id).cloned().ok_or_else(|| {
225            (
226                StatusCode::BAD_REQUEST,
227                format!("cluster {cluster_id} not found"),
228            )
229        })
230    }
231
232    pub async fn cluster(
233        &self,
234        cluster_id: u16,
235    ) -> Result<OwnedMutexGuard<Cluster>, (StatusCode, String)> {
236        Ok(self.cluster_mutex(cluster_id)?.lock_owned().await)
237    }
238}
239
240async fn scenario_list(
241    State(_): State<AppState>,
242) -> Result<Json<Vec<ScenarioInfo>>, (StatusCode, String)> {
243    Scenario::list()
244        .await
245        .map(Json)
246        .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))
247}
248
249async fn scenario_get(
250    State(_): State<AppState>,
251    Path(id): Path<ScenarioId>,
252) -> Result<Json<Scenario>, (StatusCode, String)> {
253    Scenario::load(&id)
254        .await
255        .map(Json)
256        .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))
257}
258
259#[derive(Deserialize)]
260struct ScenariosCreateArgs {
261    id: ScenarioId,
262    description: String,
263    parent_id: Option<ScenarioId>,
264}
265
266async fn scenario_create(
267    State(_): State<AppState>,
268    Json(args): Json<ScenariosCreateArgs>,
269) -> Result<StatusCode, (StatusCode, String)> {
270    if Scenario::exists(&args.id) {
271        return Err((
272            StatusCode::BAD_REQUEST,
273            format!("scenario with same id/name already exists: {}", args.id),
274        ));
275    }
276    let mut scenario = Scenario::new(args.id, args.parent_id);
277    scenario.set_description(args.description);
278    scenario
279        .save()
280        .await
281        .map(|_| StatusCode::CREATED)
282        .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))
283}
284
285async fn scenario_node_add(
286    State(_): State<AppState>,
287    Path(id): Path<ScenarioId>,
288    Json(config): Json<NodeTestingConfig>,
289) -> Result<StatusCode, (StatusCode, String)> {
290    let mut scenario = Scenario::load(&id)
291        .await
292        .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?;
293    scenario.add_node(config);
294    scenario
295        .save()
296        .await
297        .map(|_| StatusCode::CREATED)
298        .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))
299}
300
301async fn scenario_step_add(
302    State(_): State<AppState>,
303    Path(id): Path<ScenarioId>,
304    Json(step): Json<ScenarioStep>,
305) -> Result<StatusCode, (StatusCode, String)> {
306    let mut scenario = Scenario::load(&id)
307        .await
308        .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?;
309    scenario
310        .add_step(step)
311        .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?;
312    scenario
313        .save()
314        .await
315        .map(|_| StatusCode::CREATED)
316        .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))
317}
318
319async fn cluster_list(State(state): State<AppState>) -> Json<Vec<u16>> {
320    let state = state.lock().await;
321    Json(state.clusters.keys().cloned().collect())
322}
323
324#[derive(Serialize)]
325struct ClusterGetResponse {
326    id: u16,
327    target_scenario: Option<ScenarioId>,
328    next: Option<ScenarioWithStep>,
329}
330
331#[derive(Serialize)]
332struct ScenarioWithStep {
333    scenario: ScenarioId,
334    step: usize,
335}
336
337async fn cluster_get(
338    State(state): State<AppState>,
339    Path(cluster_id): Path<u16>,
340) -> Result<Json<ClusterGetResponse>, (StatusCode, String)> {
341    let cluster = state.cluster(cluster_id).await?;
342
343    Ok(Json(ClusterGetResponse {
344        id: cluster_id,
345        target_scenario: cluster.target_scenario().cloned(),
346        next: cluster
347            .next_scenario_and_step()
348            .map(|(scenario, step)| ScenarioWithStep {
349                scenario: scenario.clone(),
350                step,
351            }),
352    }))
353}
354
355#[derive(Serialize)]
356struct ClusterCreateResponse {
357    cluster_id: u16,
358}
359
360async fn cluster_create(
361    State(state): State<AppState>,
362    Path(scenario_id): Path<ScenarioId>,
363    args: Option<Json<ClusterConfig>>,
364) -> Result<Json<ClusterCreateResponse>, (StatusCode, String)> {
365    let config = match args {
366        Some(Json(v)) => v,
367        None => ClusterConfig::new(None)
368            .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?,
369    };
370    state
371        .cluster_create(scenario_id, config)
372        .await
373        .map(|(cluster_id, _)| Json(ClusterCreateResponse { cluster_id }))
374}
375
376#[derive(Deserialize, Default)]
377struct ClusterRunArgs {
378    exec_until: Option<ClusterExecUntil>,
379}
380
381#[derive(Deserialize)]
382struct ClusterExecUntil {
383    scenario: ScenarioId,
384    step: Option<usize>,
385}
386
387async fn cluster_run(
388    State(state): State<AppState>,
389    Path(cluster_id): Path<u16>,
390    args: Option<Json<ClusterRunArgs>>,
391) -> Result<(), (StatusCode, String)> {
392    let Json(args) = args.unwrap_or_default();
393    let mut cluster = state.cluster(cluster_id).await?;
394    let res = match args.exec_until {
395        None => cluster.exec_to_end().await,
396        Some(until) => cluster.exec_until(until.scenario, until.step).await,
397    };
398
399    res.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))
400}
401
402async fn cluster_run_auto(
403    State(state): State<AppState>,
404    Path(cluster_id): Path<u16>,
405) -> Result<(), (StatusCode, String)> {
406    let mut cluster = state.cluster(cluster_id).await?;
407
408    let _ = cluster.target_scenario().ok_or_else(|| {
409        (
410            StatusCode::BAD_REQUEST,
411            "target scenario for cluster isnt set".to_owned(),
412        )
413    })?;
414
415    cluster
416        .exec_to_end()
417        .await
418        .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
419
420    let (tx, rx) = oneshot::channel::<Result<(), String>>();
421
422    tokio::spawn(async move {
423        while !tx.is_closed() {
424            let steps = cluster
425                .pending_events(true)
426                .flat_map(|(node_id, _, pending_events)| {
427                    pending_events.map(move |(_, event)| ScenarioStep::Event {
428                        node_id,
429                        event: event.to_string(),
430                    })
431                })
432                .collect::<Vec<_>>();
433
434            if steps.is_empty() {
435                if cluster
436                    .wait_for_pending_events_with_timeout(Duration::from_secs(5))
437                    .await
438                {
439                    continue;
440                } else {
441                    break;
442                }
443            }
444
445            cluster.add_steps_and_save(steps).await;
446
447            if let Err(err) = cluster.exec_to_end().await {
448                let _ = tx.send(Err(err.to_string()));
449                return;
450            }
451        }
452
453        let _ = tx.send(Ok(()));
454    });
455
456    rx.await
457        .unwrap()
458        .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err))
459}
460
461async fn cluster_scenarios_reload(
462    State(state): State<AppState>,
463    Path(cluster_id): Path<u16>,
464) -> Result<(), (StatusCode, String)> {
465    let mut cluster = state.cluster(cluster_id).await?;
466    cluster
467        .reload_scenarios()
468        .await
469        .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))
470}
471
472async fn cluster_webrtc_signal(
473    State(state): State<AppState>,
474    Path((cluster_id, offer)): Path<(u16, String)>,
475) -> Result<Json<P2pConnectionResponse>, (StatusCode, Json<P2pConnectionResponse>)> {
476    let offer: Offer = Err(())
477        .or_else(move |_| {
478            let json = bs58::decode(&offer).into_vec().or(Err(()))?;
479            serde_json::from_slice(&json).or(Err(()))
480        })
481        .map_err(|_| {
482            (
483                StatusCode::BAD_REQUEST,
484                Json(P2pConnectionResponse::SignalDecryptionFailed),
485            )
486        })?;
487
488    let internal_err = || {
489        (
490            StatusCode::INTERNAL_SERVER_ERROR,
491            Json(P2pConnectionResponse::InternalError),
492        )
493    };
494
495    let http_url = {
496        let cluster = state
497            .cluster(cluster_id)
498            .await
499            .map_err(|_| internal_err())?;
500        let node = cluster
501            .node_by_peer_id(offer.target_peer_id)
502            .ok_or_else(internal_err)?;
503        let http_url = match node.dial_addr() {
504            P2pConnectionOutgoingInitOpts::WebRTC { signaling, .. } => signaling.http_url(),
505            _ => None,
506        };
507        http_url.ok_or_else(internal_err)?
508    };
509    let resp = webrtc_signal_send(&http_url, offer)
510        .await
511        .map_err(|_| internal_err())?;
512    Ok(Json(resp))
513}
514
515async fn cluster_seeds(
516    State(state): State<AppState>,
517    Path(cluster_id): Path<u16>,
518) -> Result<String, (StatusCode, String)> {
519    let state = state.lock().await;
520    let host = state.host.clone();
521    let ssl_port = state.ssl_port;
522    state.cluster(cluster_id).await.map(|cluster| {
523        let list = cluster
524            .nodes_iter()
525            .filter(|(_, node)| node.config().initial_peers.is_empty())
526            .map(|(_, node)| {
527                let mut addr = node.dial_addr();
528                if let P2pConnectionOutgoingInitOpts::WebRTC { signaling, .. } = &mut addr {
529                    if let SignalingMethod::Http(http) = signaling {
530                        if let Some(port) = ssl_port {
531                            http.host = host.clone();
532                            http.port = port;
533                            *signaling = SignalingMethod::HttpsProxy(cluster_id, http.clone());
534                        }
535                    }
536                }
537                addr = addr.to_string().parse().unwrap();
538                addr.to_string()
539            })
540            .collect::<Vec<_>>();
541        list.join("\n")
542    })
543}
544
545async fn cluster_genesis_config(
546    State(state): State<AppState>,
547    Path(cluster_id): Path<u16>,
548) -> Result<Vec<u8>, (StatusCode, String)> {
549    let cluster = state.cluster(cluster_id).await?;
550    let genesis = cluster
551        .nodes_iter()
552        .next()
553        .map(|(_, node)| node.config().genesis.clone());
554    let genesis = genesis.ok_or_else(|| {
555        (
556            StatusCode::BAD_REQUEST,
557            "no nodes in the cluster".to_owned(),
558        )
559    })?;
560    if let GenesisConfig::Prebuilt(encoded) = &*genesis {
561        return Ok(encoded.clone().into_owned());
562    }
563    tokio::task::spawn_blocking(move || {
564        let res = genesis.load().map_err(|err| {
565            (
566                StatusCode::INTERNAL_SERVER_ERROR,
567                format!("failed to load genesis config. err: {err}"),
568            )
569        })?;
570        let mut encoded = Vec::new();
571        PrebuiltGenesisConfig::from_loaded(res)
572            .map_err(|_| {
573                (
574                    StatusCode::INTERNAL_SERVER_ERROR,
575                    "failed to build `PrebuiltGenesisConfig` from loaded data".to_owned(),
576                )
577            })?
578            .store(&mut encoded)
579            .map_err(|_| {
580                (
581                    StatusCode::INTERNAL_SERVER_ERROR,
582                    "failed to encode `PrebuiltGenesisConfig`".to_owned(),
583                )
584            })?;
585        Ok(encoded)
586    })
587    .await
588    .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "join error".to_owned()))?
589}
590
591#[derive(Serialize)]
592struct ClusterNodePendingEvents {
593    node_id: ClusterNodeId,
594    pending_events: Vec<ClusterNodePendingEvent>,
595}
596
597#[derive(Serialize)]
598struct ClusterNodePendingEvent {
599    id: PendingEventId,
600    event: String,
601    details: Option<String>,
602}
603
604async fn cluster_events_pending(
605    State(state): State<AppState>,
606    Path(cluster_id): Path<u16>,
607) -> Result<Json<Vec<ClusterNodePendingEvents>>, (StatusCode, String)> {
608    state
609        .cluster(cluster_id)
610        .await
611        .map(|mut cluster| {
612            cluster
613                .pending_events(true)
614                .map(|(node_id, state, iter)| {
615                    let pending_events = iter
616                        .map(|(id, event)| ClusterNodePendingEvent {
617                            id,
618                            event: event.to_string(),
619                            details: event_details(state, event),
620                        })
621                        .collect();
622                    ClusterNodePendingEvents {
623                        node_id,
624                        pending_events,
625                    }
626                })
627                .collect()
628        })
629        .map(Json)
630}
631
632async fn cluster_node_events_pending(
633    State(state): State<AppState>,
634    Path((cluster_id, node_id)): Path<(u16, ClusterNodeId)>,
635) -> Result<Json<Vec<ClusterNodePendingEvent>>, (StatusCode, String)> {
636    let mut cluster = state.cluster(cluster_id).await?;
637    cluster
638        .node_pending_events(node_id, true)
639        .map(|(state, iter)| {
640            iter.map(|(id, event)| ClusterNodePendingEvent {
641                id,
642                event: event.to_string(),
643                details: event_details(state, event),
644            })
645            .collect()
646        })
647        .map(Json)
648        .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))
649}
650
651#[derive(Serialize)]
652struct ClusterDestroyResponse {
653    existed: bool,
654}
655
656async fn cluster_destroy(
657    State(state): State<AppState>,
658    Path(cluster_id): Path<u16>,
659) -> Json<ClusterDestroyResponse> {
660    let existed = state.cluster_destroy(cluster_id).await;
661    Json(ClusterDestroyResponse { existed })
662}