1pub mod simulator;
2pub mod webnode;
3
4use crate::{
5 cluster::{Cluster, ClusterConfig, ClusterNodeId},
6 node::NodeTestingConfig,
7 scenario::{event_details, Scenario, ScenarioId, ScenarioInfo, ScenarioStep},
8 service::PendingEventId,
9};
10
11use std::{
12 collections::{BTreeMap, BTreeSet},
13 path::PathBuf,
14 sync::Arc,
15 time::Duration,
16};
17
18use axum::{
19 extract::{Path, State},
20 http::{header, StatusCode},
21 middleware,
22 routing::{get, get_service, post, put},
23 Json, Router,
24};
25use mina_node_native::p2p::webrtc::webrtc_signal_send;
26use node::{
27 account::AccountPublicKey,
28 p2p::{
29 connection::outgoing::P2pConnectionOutgoingInitOpts,
30 webrtc::{Host, Offer, P2pConnectionResponse, SignalingMethod},
31 },
32 transition_frontier::genesis::{GenesisConfig, PrebuiltGenesisConfig},
33};
34use rand::{rngs::StdRng, Rng, SeedableRng};
35use serde::{Deserialize, Serialize};
36use tokio::{
37 net::TcpListener,
38 runtime::Runtime,
39 sync::{oneshot, Mutex, MutexGuard, OwnedMutexGuard},
40};
41use tower_http::{cors::CorsLayer, services::ServeDir};
42
43pub fn server(rt: Runtime, host: Host, port: u16, ssl_port: Option<u16>) {
44 let fe_dist_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
45 .join("../../")
46 .join("frontend/dist/frontend/");
47 eprintln!("scenarios path: {}", Scenario::PATH);
48 eprintln!("FrontEnd dist path: {fe_dist_dir:?}");
49
50 let state = AppState::new(host, ssl_port);
51
52 let scenarios_router = Router::new()
53 .route("/", get(scenario_list))
54 .route("/", put(scenario_create))
55 .route("/:id", get(scenario_get))
56 .route("/:id/nodes", put(scenario_node_add))
57 .route("/:id/steps", put(scenario_step_add));
58
59 let clusters_router = Router::new()
60 .route("/", get(cluster_list))
61 .route("/create/:scenario_id", put(cluster_create))
62 .route("/:cluster_id", get(cluster_get))
63 .nest("/:cluster_id/webnode", webnode::router())
64 .route("/:cluster_id/run", post(cluster_run))
65 .route("/:cluster_id/run/auto", post(cluster_run_auto))
66 .route(
67 "/:cluster_id/scenarios/reload",
68 post(cluster_scenarios_reload),
69 )
70 .route(
71 "/:cluster_id/mina/webrtc/signal/:offer",
72 get(cluster_webrtc_signal),
73 )
74 .route("/:cluster_id/seeds", get(cluster_seeds))
75 .route("/:cluster_id/genesis/config", get(cluster_genesis_config))
76 .route(
77 "/:cluster_id/nodes/events/pending",
78 get(cluster_events_pending),
79 )
80 .route(
81 "/:cluster_id/nodes/:node_id/events/pending",
82 get(cluster_node_events_pending),
83 )
84 .route("/:cluster_id/destroy", post(cluster_destroy));
85
86 let cors = CorsLayer::very_permissive();
87 let coop_coep = middleware::from_fn(|req, next: middleware::Next| async {
88 let mut resp = next.run(req).await;
89 resp.headers_mut().insert(
90 header::HeaderName::from_static("cross-origin-embedder-policy"),
91 header::HeaderValue::from_static("require-corp"),
92 );
93 resp.headers_mut().insert(
94 header::HeaderName::from_static("cross-origin-opener-policy"),
95 header::HeaderValue::from_static("same-origin"),
96 );
97 resp
98 });
99
100 let app = Router::new()
101 .nest("/scenarios", scenarios_router)
102 .nest("/clusters", clusters_router)
103 .nest("/simulations", simulator::simulations_router())
104 .fallback(get_service(ServeDir::new(&fe_dist_dir)).layer(coop_coep.clone()))
105 .with_state(state)
106 .layer(cors);
107
108 rt.block_on(async {
109 let addr = std::net::SocketAddr::from(([0, 0, 0, 0], port));
110 let listener = TcpListener::bind(addr).await.unwrap();
111 axum::serve(listener, app.into_make_service())
112 .await
113 .unwrap();
114 });
115}
116
117pub struct AppStateInner {
118 host: Host,
119 ssl_port: Option<u16>,
120 rng: StdRng,
121 clusters: BTreeMap<u16, Arc<Mutex<Cluster>>>,
122 locked_block_producer_keys: BTreeMap<u16, BTreeSet<AccountPublicKey>>,
124}
125
126impl AppStateInner {
127 pub fn new(host: Host, ssl_port: Option<u16>) -> Self {
128 Self {
129 host,
130 ssl_port,
131 rng: StdRng::seed_from_u64(0),
132 clusters: Default::default(),
133 locked_block_producer_keys: Default::default(),
134 }
135 }
136}
137
138#[derive(Clone)]
139pub struct AppState(Arc<Mutex<AppStateInner>>);
140
141impl AppState {
142 pub fn new(host: Host, ssl_port: Option<u16>) -> Self {
143 Self(Arc::new(Mutex::new(AppStateInner::new(host, ssl_port))))
144 }
145
146 pub async fn lock(&self) -> MutexGuard<'_, AppStateInner> {
147 self.0.lock().await
148 }
149
150 async fn cluster_mutex(
151 &self,
152 cluster_id: u16,
153 ) -> Result<Arc<Mutex<Cluster>>, (StatusCode, String)> {
154 self.lock().await.cluster_mutex(cluster_id)
155 }
156
157 pub async fn cluster(
158 &self,
159 cluster_id: u16,
160 ) -> Result<OwnedMutexGuard<Cluster>, (StatusCode, String)> {
161 self.lock().await.cluster(cluster_id).await
162 }
163
164 pub async fn cluster_create(
165 &self,
166 scenario_id: ScenarioId,
167 config: ClusterConfig,
168 ) -> Result<(u16, OwnedMutexGuard<Cluster>), (StatusCode, String)> {
169 let scenario = Scenario::load(&scenario_id)
170 .await
171 .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?;
172
173 let mut cluster = Cluster::new(config);
174 cluster
175 .start(scenario)
176 .await
177 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
178
179 let mut state = self.lock().await;
180 let id = loop {
181 let id = state.rng.gen();
182 if !state.clusters.contains_key(&id) {
183 break id;
184 }
185 };
186
187 let cluster = Arc::new(Mutex::new(cluster));
188 let cluster_guard = cluster.clone().try_lock_owned().unwrap();
189 state.clusters.insert(id, cluster);
190
191 Ok((id, cluster_guard))
192 }
193
194 pub async fn cluster_create_empty(
195 &self,
196 config: ClusterConfig,
197 ) -> Result<(u16, OwnedMutexGuard<Cluster>), (StatusCode, String)> {
198 let cluster = Cluster::new(config);
199
200 let mut state = self.lock().await;
201 let id = loop {
202 let id = state.rng.gen();
203 if !state.clusters.contains_key(&id) {
204 break id;
205 }
206 };
207
208 let cluster = Arc::new(Mutex::new(cluster));
209 let cluster_guard = cluster.clone().try_lock_owned().unwrap();
210 state.clusters.insert(id, cluster);
211
212 Ok((id, cluster_guard))
213 }
214
215 pub async fn cluster_destroy(&self, cluster_id: u16) -> bool {
216 let mut this = self.lock().await;
217 this.locked_block_producer_keys.remove(&cluster_id);
218 this.clusters.remove(&cluster_id).is_some()
219 }
220}
221
222impl AppStateInner {
223 fn cluster_mutex(&self, cluster_id: u16) -> Result<Arc<Mutex<Cluster>>, (StatusCode, String)> {
224 self.clusters.get(&cluster_id).cloned().ok_or_else(|| {
225 (
226 StatusCode::BAD_REQUEST,
227 format!("cluster {cluster_id} not found"),
228 )
229 })
230 }
231
232 pub async fn cluster(
233 &self,
234 cluster_id: u16,
235 ) -> Result<OwnedMutexGuard<Cluster>, (StatusCode, String)> {
236 Ok(self.cluster_mutex(cluster_id)?.lock_owned().await)
237 }
238}
239
240async fn scenario_list(
241 State(_): State<AppState>,
242) -> Result<Json<Vec<ScenarioInfo>>, (StatusCode, String)> {
243 Scenario::list()
244 .await
245 .map(Json)
246 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))
247}
248
249async fn scenario_get(
250 State(_): State<AppState>,
251 Path(id): Path<ScenarioId>,
252) -> Result<Json<Scenario>, (StatusCode, String)> {
253 Scenario::load(&id)
254 .await
255 .map(Json)
256 .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))
257}
258
259#[derive(Deserialize)]
260struct ScenariosCreateArgs {
261 id: ScenarioId,
262 description: String,
263 parent_id: Option<ScenarioId>,
264}
265
266async fn scenario_create(
267 State(_): State<AppState>,
268 Json(args): Json<ScenariosCreateArgs>,
269) -> Result<StatusCode, (StatusCode, String)> {
270 if Scenario::exists(&args.id) {
271 return Err((
272 StatusCode::BAD_REQUEST,
273 format!("scenario with same id/name already exists: {}", args.id),
274 ));
275 }
276 let mut scenario = Scenario::new(args.id, args.parent_id);
277 scenario.set_description(args.description);
278 scenario
279 .save()
280 .await
281 .map(|_| StatusCode::CREATED)
282 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))
283}
284
285async fn scenario_node_add(
286 State(_): State<AppState>,
287 Path(id): Path<ScenarioId>,
288 Json(config): Json<NodeTestingConfig>,
289) -> Result<StatusCode, (StatusCode, String)> {
290 let mut scenario = Scenario::load(&id)
291 .await
292 .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?;
293 scenario.add_node(config);
294 scenario
295 .save()
296 .await
297 .map(|_| StatusCode::CREATED)
298 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))
299}
300
301async fn scenario_step_add(
302 State(_): State<AppState>,
303 Path(id): Path<ScenarioId>,
304 Json(step): Json<ScenarioStep>,
305) -> Result<StatusCode, (StatusCode, String)> {
306 let mut scenario = Scenario::load(&id)
307 .await
308 .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?;
309 scenario
310 .add_step(step)
311 .map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?;
312 scenario
313 .save()
314 .await
315 .map(|_| StatusCode::CREATED)
316 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))
317}
318
319async fn cluster_list(State(state): State<AppState>) -> Json<Vec<u16>> {
320 let state = state.lock().await;
321 Json(state.clusters.keys().cloned().collect())
322}
323
324#[derive(Serialize)]
325struct ClusterGetResponse {
326 id: u16,
327 target_scenario: Option<ScenarioId>,
328 next: Option<ScenarioWithStep>,
329}
330
331#[derive(Serialize)]
332struct ScenarioWithStep {
333 scenario: ScenarioId,
334 step: usize,
335}
336
337async fn cluster_get(
338 State(state): State<AppState>,
339 Path(cluster_id): Path<u16>,
340) -> Result<Json<ClusterGetResponse>, (StatusCode, String)> {
341 let cluster = state.cluster(cluster_id).await?;
342
343 Ok(Json(ClusterGetResponse {
344 id: cluster_id,
345 target_scenario: cluster.target_scenario().cloned(),
346 next: cluster
347 .next_scenario_and_step()
348 .map(|(scenario, step)| ScenarioWithStep {
349 scenario: scenario.clone(),
350 step,
351 }),
352 }))
353}
354
355#[derive(Serialize)]
356struct ClusterCreateResponse {
357 cluster_id: u16,
358}
359
360async fn cluster_create(
361 State(state): State<AppState>,
362 Path(scenario_id): Path<ScenarioId>,
363 args: Option<Json<ClusterConfig>>,
364) -> Result<Json<ClusterCreateResponse>, (StatusCode, String)> {
365 let config = match args {
366 Some(Json(v)) => v,
367 None => ClusterConfig::new(None)
368 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?,
369 };
370 state
371 .cluster_create(scenario_id, config)
372 .await
373 .map(|(cluster_id, _)| Json(ClusterCreateResponse { cluster_id }))
374}
375
376#[derive(Deserialize, Default)]
377struct ClusterRunArgs {
378 exec_until: Option<ClusterExecUntil>,
379}
380
381#[derive(Deserialize)]
382struct ClusterExecUntil {
383 scenario: ScenarioId,
384 step: Option<usize>,
385}
386
387async fn cluster_run(
388 State(state): State<AppState>,
389 Path(cluster_id): Path<u16>,
390 args: Option<Json<ClusterRunArgs>>,
391) -> Result<(), (StatusCode, String)> {
392 let Json(args) = args.unwrap_or_default();
393 let mut cluster = state.cluster(cluster_id).await?;
394 let res = match args.exec_until {
395 None => cluster.exec_to_end().await,
396 Some(until) => cluster.exec_until(until.scenario, until.step).await,
397 };
398
399 res.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))
400}
401
402async fn cluster_run_auto(
403 State(state): State<AppState>,
404 Path(cluster_id): Path<u16>,
405) -> Result<(), (StatusCode, String)> {
406 let mut cluster = state.cluster(cluster_id).await?;
407
408 let _ = cluster.target_scenario().ok_or_else(|| {
409 (
410 StatusCode::BAD_REQUEST,
411 "target scenario for cluster isnt set".to_owned(),
412 )
413 })?;
414
415 cluster
416 .exec_to_end()
417 .await
418 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
419
420 let (tx, rx) = oneshot::channel::<Result<(), String>>();
421
422 tokio::spawn(async move {
423 while !tx.is_closed() {
424 let steps = cluster
425 .pending_events(true)
426 .flat_map(|(node_id, _, pending_events)| {
427 pending_events.map(move |(_, event)| ScenarioStep::Event {
428 node_id,
429 event: event.to_string(),
430 })
431 })
432 .collect::<Vec<_>>();
433
434 if steps.is_empty() {
435 if cluster
436 .wait_for_pending_events_with_timeout(Duration::from_secs(5))
437 .await
438 {
439 continue;
440 } else {
441 break;
442 }
443 }
444
445 cluster.add_steps_and_save(steps).await;
446
447 if let Err(err) = cluster.exec_to_end().await {
448 let _ = tx.send(Err(err.to_string()));
449 return;
450 }
451 }
452
453 let _ = tx.send(Ok(()));
454 });
455
456 rx.await
457 .unwrap()
458 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err))
459}
460
461async fn cluster_scenarios_reload(
462 State(state): State<AppState>,
463 Path(cluster_id): Path<u16>,
464) -> Result<(), (StatusCode, String)> {
465 let mut cluster = state.cluster(cluster_id).await?;
466 cluster
467 .reload_scenarios()
468 .await
469 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))
470}
471
472async fn cluster_webrtc_signal(
473 State(state): State<AppState>,
474 Path((cluster_id, offer)): Path<(u16, String)>,
475) -> Result<Json<P2pConnectionResponse>, (StatusCode, Json<P2pConnectionResponse>)> {
476 let offer: Offer = Err(())
477 .or_else(move |_| {
478 let json = bs58::decode(&offer).into_vec().or(Err(()))?;
479 serde_json::from_slice(&json).or(Err(()))
480 })
481 .map_err(|_| {
482 (
483 StatusCode::BAD_REQUEST,
484 Json(P2pConnectionResponse::SignalDecryptionFailed),
485 )
486 })?;
487
488 let internal_err = || {
489 (
490 StatusCode::INTERNAL_SERVER_ERROR,
491 Json(P2pConnectionResponse::InternalError),
492 )
493 };
494
495 let http_url = {
496 let cluster = state
497 .cluster(cluster_id)
498 .await
499 .map_err(|_| internal_err())?;
500 let node = cluster
501 .node_by_peer_id(offer.target_peer_id)
502 .ok_or_else(internal_err)?;
503 let http_url = match node.dial_addr() {
504 P2pConnectionOutgoingInitOpts::WebRTC { signaling, .. } => signaling.http_url(),
505 _ => None,
506 };
507 http_url.ok_or_else(internal_err)?
508 };
509 let resp = webrtc_signal_send(&http_url, offer)
510 .await
511 .map_err(|_| internal_err())?;
512 Ok(Json(resp))
513}
514
515async fn cluster_seeds(
516 State(state): State<AppState>,
517 Path(cluster_id): Path<u16>,
518) -> Result<String, (StatusCode, String)> {
519 let state = state.lock().await;
520 let host = state.host.clone();
521 let ssl_port = state.ssl_port;
522 state.cluster(cluster_id).await.map(|cluster| {
523 let list = cluster
524 .nodes_iter()
525 .filter(|(_, node)| node.config().initial_peers.is_empty())
526 .map(|(_, node)| {
527 let mut addr = node.dial_addr();
528 if let P2pConnectionOutgoingInitOpts::WebRTC { signaling, .. } = &mut addr {
529 if let SignalingMethod::Http(http) = signaling {
530 if let Some(port) = ssl_port {
531 http.host = host.clone();
532 http.port = port;
533 *signaling = SignalingMethod::HttpsProxy(cluster_id, http.clone());
534 }
535 }
536 }
537 addr = addr.to_string().parse().unwrap();
538 addr.to_string()
539 })
540 .collect::<Vec<_>>();
541 list.join("\n")
542 })
543}
544
545async fn cluster_genesis_config(
546 State(state): State<AppState>,
547 Path(cluster_id): Path<u16>,
548) -> Result<Vec<u8>, (StatusCode, String)> {
549 let cluster = state.cluster(cluster_id).await?;
550 let genesis = cluster
551 .nodes_iter()
552 .next()
553 .map(|(_, node)| node.config().genesis.clone());
554 let genesis = genesis.ok_or_else(|| {
555 (
556 StatusCode::BAD_REQUEST,
557 "no nodes in the cluster".to_owned(),
558 )
559 })?;
560 if let GenesisConfig::Prebuilt(encoded) = &*genesis {
561 return Ok(encoded.clone().into_owned());
562 }
563 tokio::task::spawn_blocking(move || {
564 let res = genesis.load().map_err(|err| {
565 (
566 StatusCode::INTERNAL_SERVER_ERROR,
567 format!("failed to load genesis config. err: {err}"),
568 )
569 })?;
570 let mut encoded = Vec::new();
571 PrebuiltGenesisConfig::from_loaded(res)
572 .map_err(|_| {
573 (
574 StatusCode::INTERNAL_SERVER_ERROR,
575 "failed to build `PrebuiltGenesisConfig` from loaded data".to_owned(),
576 )
577 })?
578 .store(&mut encoded)
579 .map_err(|_| {
580 (
581 StatusCode::INTERNAL_SERVER_ERROR,
582 "failed to encode `PrebuiltGenesisConfig`".to_owned(),
583 )
584 })?;
585 Ok(encoded)
586 })
587 .await
588 .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "join error".to_owned()))?
589}
590
591#[derive(Serialize)]
592struct ClusterNodePendingEvents {
593 node_id: ClusterNodeId,
594 pending_events: Vec<ClusterNodePendingEvent>,
595}
596
597#[derive(Serialize)]
598struct ClusterNodePendingEvent {
599 id: PendingEventId,
600 event: String,
601 details: Option<String>,
602}
603
604async fn cluster_events_pending(
605 State(state): State<AppState>,
606 Path(cluster_id): Path<u16>,
607) -> Result<Json<Vec<ClusterNodePendingEvents>>, (StatusCode, String)> {
608 state
609 .cluster(cluster_id)
610 .await
611 .map(|mut cluster| {
612 cluster
613 .pending_events(true)
614 .map(|(node_id, state, iter)| {
615 let pending_events = iter
616 .map(|(id, event)| ClusterNodePendingEvent {
617 id,
618 event: event.to_string(),
619 details: event_details(state, event),
620 })
621 .collect();
622 ClusterNodePendingEvents {
623 node_id,
624 pending_events,
625 }
626 })
627 .collect()
628 })
629 .map(Json)
630}
631
632async fn cluster_node_events_pending(
633 State(state): State<AppState>,
634 Path((cluster_id, node_id)): Path<(u16, ClusterNodeId)>,
635) -> Result<Json<Vec<ClusterNodePendingEvent>>, (StatusCode, String)> {
636 let mut cluster = state.cluster(cluster_id).await?;
637 cluster
638 .node_pending_events(node_id, true)
639 .map(|(state, iter)| {
640 iter.map(|(id, event)| ClusterNodePendingEvent {
641 id,
642 event: event.to_string(),
643 details: event_details(state, event),
644 })
645 .collect()
646 })
647 .map(Json)
648 .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))
649}
650
651#[derive(Serialize)]
652struct ClusterDestroyResponse {
653 existed: bool,
654}
655
656async fn cluster_destroy(
657 State(state): State<AppState>,
658 Path(cluster_id): Path<u16>,
659) -> Json<ClusterDestroyResponse> {
660 let existed = state.cluster_destroy(cluster_id).await;
661 Json(ClusterDestroyResponse { existed })
662}