openmina_node_native/
http_server.rs

1use std::{convert::Infallible, mem::size_of, str::FromStr};
2
3use mina_p2p_messages::binprot::BinProtWrite;
4use serde::{de::DeserializeOwned, Deserialize, Serialize};
5use warp::{
6    http::HeaderValue,
7    hyper::{header::CONTENT_TYPE, Response, StatusCode},
8    reply::with_status,
9    Filter, Rejection, Reply,
10};
11
12use node::{core::snark::SnarkJobId, rpc::*};
13
14use openmina_node_common::rpc::{
15    RpcActionStatsGetResponse, RpcSender, RpcSnarkPoolGetResponse, RpcSnarkerJobCommitResponse,
16    RpcSnarkerJobSpecResponse, RpcStateGetResponse, RpcSyncStatsGetResponse,
17};
18
19macro_rules! compose_route {
20    ($x:expr $(,)?) => (
21        $x
22    );
23    ($x0:expr, $($x:expr),+ $(,)?) => (
24        $x0$(.or($x))+.boxed()
25    );
26}
27
28pub async fn run(port: u16, rpc_sender: RpcSender) {
29    let build_env_get = warp::path!("build_env")
30        .and(warp::get())
31        .then(move || async { with_json_reply(&node::BuildEnv::get(), StatusCode::OK) });
32
33    #[cfg(feature = "p2p-webrtc")]
34    let signaling = {
35        use node::p2p::{
36            connection::{
37                incoming::{IncomingSignalingMethod, P2pConnectionIncomingInitOpts},
38                P2pConnectionResponse,
39            },
40            webrtc, PeerId,
41        };
42
43        use super::rpc::RpcP2pConnectionIncomingResponse;
44
45        let handle = |sender: RpcSender, offer: Box<webrtc::Offer>| async move {
46            let mut rx = sender
47                .multishot_request(
48                    2,
49                    RpcRequest::P2pConnectionIncoming(P2pConnectionIncomingInitOpts {
50                        peer_id: PeerId::from_public_key(offer.identity_pub_key.clone()),
51                        signaling: IncomingSignalingMethod::Http,
52                        offer,
53                    }),
54                )
55                .await;
56
57            match rx.recv().await {
58                Some(RpcP2pConnectionIncomingResponse::Answer(answer)) => {
59                    let status = match &answer {
60                        P2pConnectionResponse::Accepted(_) => StatusCode::OK,
61                        P2pConnectionResponse::Rejected(reason) => match reason.is_bad() {
62                            false => StatusCode::OK,
63                            true => StatusCode::BAD_REQUEST,
64                        },
65                        P2pConnectionResponse::SignalDecryptionFailed => StatusCode::BAD_REQUEST,
66                        P2pConnectionResponse::InternalError => StatusCode::INTERNAL_SERVER_ERROR,
67                    };
68                    with_json_reply(&answer, status)
69                }
70                _ => {
71                    let resp = P2pConnectionResponse::internal_error_str();
72                    with_json_reply(&resp, StatusCode::INTERNAL_SERVER_ERROR)
73                }
74            }
75        };
76
77        let rpc_sender_clone = rpc_sender.clone();
78        let get = warp::path!("mina" / "webrtc" / "signal" / String)
79            .and(warp::get())
80            .then(move |offer: String| {
81                let rpc_sender_clone = rpc_sender_clone.clone();
82                async move {
83                    let decode_res = Err(()).or_else(move |_| {
84                        let json = bs58::decode(&offer).into_vec().or(Err(()))?;
85                        serde_json::from_slice(&json).or(Err(()))
86                    });
87                    match decode_res {
88                        Err(()) => with_json_reply(
89                            &P2pConnectionResponse::SignalDecryptionFailed,
90                            StatusCode::BAD_REQUEST,
91                        ),
92                        Ok(offer) => handle(rpc_sender_clone.clone(), offer).await,
93                    }
94                }
95            });
96
97        let rpc_sender_clone = rpc_sender.clone();
98        let post = warp::path!("mina" / "webrtc" / "signal")
99            .and(warp::post())
100            .and(warp::filters::body::json())
101            .then(move |offer: Box<webrtc::Offer>| handle(rpc_sender_clone.clone(), offer));
102        get.or(post)
103    };
104
105    // TODO(binier): make endpoint only accessible locally.
106    #[derive(Deserialize)]
107    struct StateQueryParams {
108        filter: Option<String>,
109    }
110    #[derive(Debug)]
111    struct StateGetRejection(RpcStateGetError);
112    impl warp::reject::Reject for StateGetRejection {}
113
114    let state_get = warp::path!("state")
115        .and(warp::get())
116        .and(with_rpc_sender(rpc_sender.clone()))
117        .and(warp::query())
118        .and_then(state_handler)
119        .recover(state_recover);
120
121    let state_post = warp::path!("state")
122        .and(warp::post())
123        .and(with_rpc_sender(rpc_sender.clone()))
124        .and(warp::body::json())
125        .and_then(state_handler)
126        .recover(state_recover);
127
128    async fn state_handler(
129        rpc_sender: RpcSender,
130        StateQueryParams { filter }: StateQueryParams,
131    ) -> Result<impl warp::Reply, warp::Rejection> {
132        rpc_sender
133            .oneshot_request(RpcRequest::StateGet(filter))
134            .await
135            .ok_or_else(|| warp::reject::custom(DroppedChannel))
136            .and_then(|reply: RpcStateGetResponse| {
137                reply.map_or_else(
138                    |err| Err(warp::reject::custom(StateGetRejection(err))),
139                    |state| Ok(warp::reply::json(&state)),
140                )
141            })
142    }
143
144    async fn state_recover(reject: warp::Rejection) -> Result<impl warp::Reply, warp::Rejection> {
145        if let Some(StateGetRejection(error)) = reject.find() {
146            Ok(warp::reply::with_status(
147                warp::reply::json(error),
148                StatusCode::BAD_REQUEST,
149            ))
150        } else {
151            Err(reject)
152        }
153    }
154
155    let rpc_sender_clone = rpc_sender.clone();
156    let status = warp::path!("status").and(warp::get()).then(move || {
157        let rpc_sender_clone = rpc_sender_clone.clone();
158        async move {
159            let result: RpcStatusGetResponse = rpc_sender_clone
160                .oneshot_request(RpcRequest::StatusGet)
161                .await
162                .flatten();
163
164            with_json_reply(&result, StatusCode::OK)
165        }
166    });
167
168    let rpc_sender_clone = rpc_sender.clone();
169    let make_heartbeat = warp::path!("make_heartbeat")
170        .and(warp::post())
171        .then(move || {
172            let rpc_sender_clone = rpc_sender_clone.clone();
173            async move {
174                let result: RpcHeartbeatGetResponse = rpc_sender_clone
175                    .oneshot_request(RpcRequest::HeartbeatGet)
176                    .await
177                    .flatten();
178
179                with_json_reply(&result, StatusCode::OK)
180            }
181        });
182
183    let rpc_sender_clone = rpc_sender.clone();
184    let peers_get = warp::path!("state" / "peers")
185        .and(warp::get())
186        .then(move || {
187            let rpc_sender_clone = rpc_sender_clone.clone();
188            async move {
189                let result = rpc_sender_clone
190                    .oneshot_request::<RpcPeersGetResponse>(RpcRequest::PeersGet)
191                    .await;
192
193                with_json_reply(&result, StatusCode::OK)
194            }
195        });
196
197    let rpc_sender_clone = rpc_sender.clone();
198    let message_progress_get = warp::path!("state" / "message-progress")
199        .and(warp::get())
200        .then(move || {
201            let rpc_sender_clone = rpc_sender_clone.clone();
202            async move {
203                let result = rpc_sender_clone
204                    .oneshot_request::<RpcMessageProgressResponse>(RpcRequest::MessageProgressGet)
205                    .await;
206
207                with_json_reply(&result, StatusCode::OK)
208            }
209        });
210
211    // TODO(binier): make endpoint only accessible locally.
212    let stats = {
213        let rpc_sender_clone = rpc_sender.clone();
214        #[derive(Deserialize, Default)]
215        struct ActionQueryParams {
216            id: Option<String>,
217        }
218        let action_stats = warp::path!("stats" / "actions")
219            .and(warp::get())
220            .and(optq::<ActionQueryParams>())
221            .then(move |query: ActionQueryParams| {
222                let rpc_sender_clone = rpc_sender_clone.clone();
223                async move {
224                    let id_filter = query.id.as_deref();
225                    let result: RpcActionStatsGetResponse = rpc_sender_clone
226                        .oneshot_request(RpcRequest::ActionStatsGet(match id_filter {
227                            None => ActionStatsQuery::SinceStart,
228                            Some("latest") => ActionStatsQuery::ForLatestBlock,
229                            Some(id) => {
230                                let id = match id.parse() {
231                                    Ok(v) => v,
232                                    Err(err) => {
233                                        return with_json_reply(
234                                            &format!(
235                                                "'id' must be an u64 integer: {err}, instead passed: {id}"
236                                            ),
237                                            StatusCode::BAD_REQUEST,
238                                        );
239                                    }
240                                };
241                                ActionStatsQuery::ForBlockWithId(id)
242                            }
243                        }))
244                        .await
245                        .flatten();
246
247                    with_json_reply(&result, StatusCode::OK)
248                }
249            });
250
251        let rpc_sender_clone = rpc_sender.clone();
252        #[derive(Deserialize, Default)]
253        struct SyncQueryParams {
254            limit: Option<usize>,
255        }
256        let sync_stats = warp::path!("stats" / "sync")
257            .and(warp::get())
258            .and(optq::<SyncQueryParams>())
259            .then(move |query: SyncQueryParams| {
260                let rpc_sender_clone = rpc_sender_clone.clone();
261                async move {
262                    let result: RpcSyncStatsGetResponse = rpc_sender_clone
263                        .oneshot_request(RpcRequest::SyncStatsGet(SyncStatsQuery {
264                            limit: query.limit,
265                        }))
266                        .await
267                        .flatten();
268
269                    with_json_reply(&result, StatusCode::OK)
270                }
271            });
272
273        let rpc_sender_clone = rpc_sender.clone();
274        let block_producer_stats = warp::path!("stats" / "block_producer")
275            .and(warp::get())
276            .then(move || {
277                let rpc_sender_clone = rpc_sender_clone.clone();
278                async move {
279                    let result: RpcBlockProducerStatsGetResponse = rpc_sender_clone
280                        .oneshot_request(RpcRequest::BlockProducerStatsGet)
281                        .await
282                        .flatten();
283
284                    with_json_reply(&result, StatusCode::OK)
285                }
286            });
287
288        action_stats.or(sync_stats).or(block_producer_stats)
289    };
290
291    let rpc_sender_clone = rpc_sender.clone();
292    let scan_state_summary_get = warp::path!("scan-state" / "summary" / ..)
293        .and(warp::get())
294        .and(
295            warp::path::param::<String>()
296                .map(Some)
297                .or_else(|_| async { Ok::<(Option<String>,), std::convert::Infallible>((None,)) }),
298        )
299        .and(warp::path::end())
300        .then(move |query: Option<String>| {
301            let rpc_sender_clone = rpc_sender_clone.clone();
302            let query = match query {
303                None => Ok(RpcScanStateSummaryGetQuery::ForBestTip),
304                Some(query) => None
305                    .or_else(|| {
306                        Some(RpcScanStateSummaryGetQuery::ForBlockWithHeight(
307                            query.parse().ok()?,
308                        ))
309                    })
310                    .ok_or(())
311                    .or_else(|_| match query.parse() {
312                        Err(_) => Err("invalid arg! Expected block hash or height"),
313                        Ok(v) => Ok(RpcScanStateSummaryGetQuery::ForBlockWithHash(v)),
314                    }),
315            };
316            async move {
317                let query = match query {
318                    Ok(v) => v,
319                    Err(err) => {
320                        return with_json_reply(&err, StatusCode::BAD_REQUEST);
321                    }
322                };
323                let res: Option<RpcScanStateSummaryGetResponse> = rpc_sender_clone
324                    .oneshot_request(RpcRequest::ScanStateSummaryGet(query))
325                    .await;
326                match res {
327                    None => with_json_reply(
328                        &"response channel dropped",
329                        StatusCode::INTERNAL_SERVER_ERROR,
330                    ),
331                    Some(Err(err)) => with_json_reply(&err, StatusCode::INTERNAL_SERVER_ERROR),
332                    Some(Ok(data)) => with_json_reply(&data, StatusCode::OK),
333                }
334            }
335        });
336
337    let rpc_sender_clone = rpc_sender.clone();
338    let snark_pool_jobs_get = warp::path!("snark-pool" / "jobs")
339        .and(warp::get())
340        .then(move || {
341            let rpc_sender_clone = rpc_sender_clone.clone();
342            async move {
343                let res: Option<RpcSnarkPoolGetResponse> = rpc_sender_clone
344                    .oneshot_request(RpcRequest::SnarkPoolGet)
345                    .await;
346                match res {
347                    None => with_json_reply(
348                        &"response channel dropped",
349                        StatusCode::INTERNAL_SERVER_ERROR,
350                    ),
351                    Some(resp) => with_json_reply(&resp, StatusCode::OK),
352                }
353            }
354        });
355
356    let rpc_sender_clone = rpc_sender.clone();
357    let snark_pool_job_get = warp::path!("snark-pool" / "job" / SnarkJobId).then(move |job_id| {
358        let rpc_sender_clone = rpc_sender_clone.clone();
359        async move {
360            let res: Option<RpcSnarkPoolJobGetResponse> = rpc_sender_clone
361                .oneshot_request(RpcRequest::SnarkPoolJobGet { job_id })
362                .await;
363            match res {
364                None => with_json_reply(
365                    &"response channel dropped",
366                    StatusCode::INTERNAL_SERVER_ERROR,
367                ),
368                Some(resp) => with_json_reply(&resp, StatusCode::OK),
369            }
370        }
371    });
372
373    // TODO(binier): make endpoint only accessible locally.
374    let rpc_sender_clone = rpc_sender.clone();
375    let snarker_job_commit = warp::path!("snarker" / "job" / "commit")
376        .and(warp::post())
377        .and(warp::filters::body::bytes())
378        .then(move |body: bytes::Bytes| {
379            let rpc_sender_clone = rpc_sender_clone.clone();
380            async move {
381                let Ok(job_id) = String::from_utf8(body.to_vec())
382                    .or(Err(()))
383                    .and_then(|s| SnarkJobId::from_str(&s).or(Err(())))
384                else {
385                    return with_json_reply(&"invalid_input", StatusCode::BAD_REQUEST);
386                };
387
388                let res: Option<RpcSnarkerJobCommitResponse> = rpc_sender_clone
389                    .oneshot_request(RpcRequest::SnarkerJobCommit { job_id })
390                    .await;
391                match res {
392                    None => with_json_reply(
393                        &"response channel dropped",
394                        StatusCode::INTERNAL_SERVER_ERROR,
395                    ),
396                    Some(resp) => {
397                        let status = match &resp {
398                            RpcSnarkerJobCommitResponse::Ok => StatusCode::CREATED,
399                            _ => StatusCode::BAD_REQUEST,
400                        };
401                        with_json_reply(&resp, status)
402                    }
403                }
404            }
405        });
406
407    #[derive(Deserialize)]
408    struct JobIdParam {
409        id: SnarkJobId,
410    }
411
412    let rpc_sender_clone = rpc_sender.clone();
413    let snarker_job_spec = warp::path!("snarker" / "job" / "spec")
414        .and(warp::get())
415        .and(warp::header::optional("accept"))
416        .and(warp::query())
417        .then(
418            move |accept: Option<String>, JobIdParam { id: job_id }: JobIdParam| {
419                let rpc_sender_clone = rpc_sender_clone.clone();
420                async move {
421                    rpc_sender_clone
422                        .oneshot_request(RpcRequest::SnarkerJobSpec { job_id })
423                        .await
424                        .map_or_else(
425                            || {
426                                JsonOrBinary::error(
427                                    "response channel dropped",
428                                    StatusCode::INTERNAL_SERVER_ERROR,
429                                )
430                            },
431                            |resp| match resp {
432                                RpcSnarkerJobSpecResponse::Ok(spec)
433                                    if accept.as_deref() == Some("application/octet-stream") =>
434                                {
435                                    JsonOrBinary::binary(spec)
436                                }
437                                RpcSnarkerJobSpecResponse::Ok(spec) => JsonOrBinary::json(spec),
438                                _ => JsonOrBinary::error("error", StatusCode::BAD_REQUEST),
439                            },
440                        )
441                }
442            },
443        );
444
445    let dropped_channel_response = || {
446        with_json_reply(
447            &"response channel dropped",
448            StatusCode::INTERNAL_SERVER_ERROR,
449        )
450    };
451
452    let rpc_sender_clone = rpc_sender.clone();
453    let snark_workers = warp::path!("snarker" / "workers")
454        .and(warp::get())
455        .then(move || {
456            let rpc_sender_clone = rpc_sender_clone.clone();
457            async move {
458                rpc_sender_clone
459                    .oneshot_request(RpcRequest::SnarkerWorkers)
460                    .await
461                    .map_or_else(
462                        dropped_channel_response,
463                        |reply: RpcSnarkerWorkersResponse| with_json_reply(&reply, StatusCode::OK),
464                    )
465            }
466        });
467
468    let rpc_sender_clone = rpc_sender.clone();
469    let snarker_config = warp::path!("snarker" / "config")
470        .and(warp::get())
471        .then(move || {
472            let rpc_sender_clone = rpc_sender_clone.clone();
473            async move {
474                rpc_sender_clone
475                    .oneshot_request(RpcRequest::SnarkerConfig)
476                    .await
477                    .map_or_else(
478                        dropped_channel_response,
479                        |reply: node::rpc::RpcSnarkerConfigGetResponse| {
480                            with_json_reply(&reply, StatusCode::OK)
481                        },
482                    )
483            }
484        });
485
486    let rpc_sender_clone = rpc_sender.clone();
487    let transaction_pool = warp::path!("transaction-pool")
488        .and(warp::get())
489        .then(move || {
490            let rpc_sender_clone = rpc_sender_clone.clone();
491            async move {
492                rpc_sender_clone
493                    .transaction_pool()
494                    .get()
495                    .await
496                    .map_or_else(dropped_channel_response, |reply| {
497                        with_json_reply(&reply, StatusCode::OK)
498                    })
499            }
500        });
501
502    let rpc_sender_clone = rpc_sender.clone();
503    let accounts = warp::path("accounts").and(warp::get()).then(move || {
504        let rpc_sender_clone = rpc_sender_clone.clone();
505
506        async move {
507            rpc_sender_clone
508                .ledger()
509                .latest()
510                .accounts()
511                .all()
512                .await
513                .map_or_else(
514                    dropped_channel_response,
515                    |reply: node::rpc::RpcLedgerSlimAccountsResponse| {
516                        with_json_reply(&reply, StatusCode::OK)
517                    },
518                )
519        }
520    });
521
522    let rpc_sender_clone = rpc_sender.clone();
523    let transaction_post = warp::path("send-payment")
524        .and(warp::post())
525        .and(warp::filters::body::json())
526        .then(move |body: Vec<RpcInjectPayment>| {
527            let rpc_sender_clone = rpc_sender_clone.clone();
528
529            async move {
530                match rpc_sender_clone
531                    .transaction_pool()
532                    .inject()
533                    .payment(body)
534                    .await
535                {
536                    Err(err) => with_status(
537                        warp::reply::json(&serde_json::json!({"error": err})),
538                        StatusCode::INTERNAL_SERVER_ERROR,
539                    ),
540                    Ok(res) => res.map_or_else(
541                        dropped_channel_response,
542                        |reply: node::rpc::RpcTransactionInjectResponse| {
543                            with_json_reply(&reply, StatusCode::OK)
544                        },
545                    ),
546                }
547            }
548        });
549
550    let rpc_sender_clone = rpc_sender.clone();
551    let transition_frontier_user_commands = warp::path("best-chain-user-commands")
552        .and(warp::get())
553        .then(move || {
554            let rpc_sender_clone = rpc_sender_clone.clone();
555
556            async move {
557                rpc_sender_clone
558                    .transition_frontier()
559                    .best_chain()
560                    .user_commands()
561                    .await
562                    .map_or_else(dropped_channel_response, |reply| {
563                        with_json_reply(&reply, StatusCode::OK)
564                    })
565            }
566        });
567
568    let cors = warp::cors()
569        .allow_any_origin()
570        .allow_methods(["GET", "POST", "PUT", "DELETE", "OPTIONS"])
571        .allow_headers([
572            "User-Agent",
573            "Sec-Fetch-Mode",
574            "Referer",
575            "Origin",
576            "Access-Control-Request-Method",
577            "Access-Control-Request-Headers",
578            "Content-Type",
579        ]);
580    #[cfg(not(feature = "p2p-webrtc"))]
581    let routes = state_get.or(state_post);
582    #[cfg(feature = "p2p-webrtc")]
583    let routes = signaling.or(state_get).or(state_post);
584    let routes = compose_route!(
585        build_env_get,
586        routes,
587        status,
588        make_heartbeat,
589        peers_get,
590        message_progress_get,
591        stats,
592        scan_state_summary_get,
593        snark_pool_jobs_get,
594        snark_pool_job_get,
595        snarker_config,
596        snarker_job_commit,
597        snarker_job_spec,
598        snark_workers,
599        transaction_pool,
600        accounts,
601        transaction_post,
602        transition_frontier_user_commands,
603        healthcheck(rpc_sender.clone()),
604        readiness(rpc_sender.clone()),
605        discovery::routing_table(rpc_sender.clone()),
606        discovery::bootstrap_stats(rpc_sender.clone()),
607        super::graphql::routes(rpc_sender),
608    );
609
610    let routes = routes.recover(recover).with(cors);
611
612    warp::serve(routes).run(([0, 0, 0, 0], port)).await;
613}
614
615fn healthcheck(
616    rpc_sender: RpcSender,
617) -> impl Filter<Error = Rejection, Extract = impl Reply> + Clone {
618    warp::path!("healthz").and(warp::get()).then(move || {
619        let rpc_sender = rpc_sender.clone();
620        async move {
621            rpc_sender
622                .oneshot_request(RpcRequest::HealthCheck)
623                .await
624                .map_or_else(
625                    || {
626                        with_status(
627                            String::from(DROPPED_CHANNEL),
628                            StatusCode::INTERNAL_SERVER_ERROR,
629                        )
630                    },
631                    |reply: node::rpc::RpcHealthCheckResponse| match reply {
632                        Ok(()) => with_status(String::new(), StatusCode::OK),
633                        Err(err) => with_status(err, StatusCode::SERVICE_UNAVAILABLE),
634                    },
635                )
636        }
637    })
638}
639
640fn readiness(
641    rpc_sender: RpcSender,
642) -> impl Filter<Error = Rejection, Extract = impl Reply> + Clone {
643    warp::path!("readyz").and(warp::get()).then(move || {
644        let rpc_sender = rpc_sender.clone();
645        async move {
646            rpc_sender
647                .oneshot_request(RpcRequest::ReadinessCheck)
648                .await
649                .map_or_else(
650                    || {
651                        with_status(
652                            String::from(DROPPED_CHANNEL),
653                            StatusCode::INTERNAL_SERVER_ERROR,
654                        )
655                    },
656                    |reply: node::rpc::RpcReadinessCheckResponse| match reply {
657                        Ok(()) => with_status(String::new(), StatusCode::OK),
658                        Err(err) => with_status(err, StatusCode::SERVICE_UNAVAILABLE),
659                    },
660                )
661        }
662    })
663}
664
665mod discovery {
666    use node::rpc::{
667        RpcDiscoveryBoostrapStatsResponse, RpcDiscoveryRoutingTableResponse, RpcRequest,
668    };
669    use openmina_node_common::rpc::RpcSender;
670    use warp::Filter;
671
672    use super::{with_rpc_sender, DroppedChannel};
673
674    pub fn routing_table(
675        rpc_sender: RpcSender,
676    ) -> impl warp::Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
677        warp::path!("discovery" / "routing_table")
678            .and(warp::get())
679            .and(with_rpc_sender(rpc_sender))
680            .and_then(get_routing_table)
681    }
682
683    pub fn bootstrap_stats(
684        rpc_sender: RpcSender,
685    ) -> impl warp::Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
686        warp::path!("discovery" / "bootstrap_stats")
687            .and(warp::get())
688            .and(with_rpc_sender(rpc_sender))
689            .and_then(get_bootstrap_stats)
690    }
691
692    async fn get_routing_table(rpc_sender: RpcSender) -> Result<impl warp::Reply, warp::Rejection> {
693        rpc_sender
694            .oneshot_request(RpcRequest::DiscoveryRoutingTable)
695            .await
696            .map_or_else(
697                || Err(warp::reject::custom(DroppedChannel)),
698                |reply: RpcDiscoveryRoutingTableResponse| Ok(warp::reply::json(&reply)),
699            )
700    }
701
702    async fn get_bootstrap_stats(
703        rpc_sender: RpcSender,
704    ) -> Result<impl warp::Reply, warp::Rejection> {
705        rpc_sender
706            .oneshot_request(RpcRequest::DiscoveryBoostrapStats)
707            .await
708            .map_or_else(
709                || Err(warp::reject::custom(DroppedChannel)),
710                |reply: RpcDiscoveryBoostrapStatsResponse| Ok(warp::reply::json(&reply)),
711            )
712    }
713}
714
715fn with_rpc_sender(
716    rpc_sender: RpcSender,
717) -> impl warp::Filter<Extract = (RpcSender,), Error = Infallible> + Clone {
718    warp::any().map(move || rpc_sender.clone())
719}
720
721const DROPPED_CHANNEL: &str = "response channel dropped, see error log for details";
722
723#[derive(Debug)]
724struct DroppedChannel;
725
726impl warp::reject::Reject for DroppedChannel {}
727
728async fn recover(rejection: warp::Rejection) -> Result<impl warp::Reply, warp::Rejection> {
729    if let Some(DroppedChannel) = rejection.find() {
730        Ok(warp::reply::with_status(
731            warp::reply::json(&serde_json::json!({"error": DROPPED_CHANNEL})),
732            StatusCode::INTERNAL_SERVER_ERROR,
733        ))
734    } else {
735        Err(rejection)
736    }
737}
738
739use warp::{
740    filters::BoxedFilter,
741    reply::{json, Json, WithStatus},
742};
743
744fn optq<T: 'static + Default + Send + DeserializeOwned>() -> BoxedFilter<(T,)> {
745    warp::any()
746        .and(warp::query().or(warp::any().map(|| T::default())))
747        .unify()
748        .boxed()
749}
750
751pub enum JsonOrBinary {
752    Json(Vec<u8>),
753    Binary(Vec<u8>),
754    Error(String, StatusCode),
755}
756
757impl JsonOrBinary {
758    fn json<T: Serialize>(reply: T) -> Self {
759        match serde_json::to_vec(&reply) {
760            Ok(v) => JsonOrBinary::Json(v),
761            Err(err) => JsonOrBinary::error(err, StatusCode::INTERNAL_SERVER_ERROR),
762        }
763    }
764    fn binary<T: BinProtWrite>(reply: T) -> Self {
765        let mut vec = Vec::new();
766        match reply.binprot_write(&mut vec) {
767            Ok(()) => {}
768            Err(err) => return JsonOrBinary::error(err, StatusCode::INTERNAL_SERVER_ERROR),
769        }
770        let mut result = Vec::with_capacity(vec.len() + size_of::<u64>());
771        result.extend((vec.len() as u64).to_le_bytes());
772        result.extend(vec);
773        JsonOrBinary::Binary(result)
774    }
775    fn error<T: ToString>(err: T, code: StatusCode) -> Self {
776        JsonOrBinary::Error(err.to_string(), code)
777    }
778}
779
780impl Reply for JsonOrBinary {
781    fn into_response(self) -> warp::reply::Response {
782        match self {
783            JsonOrBinary::Json(body) => {
784                let mut res = Response::new(body.into());
785                res.headers_mut()
786                    .insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
787                *res.status_mut() = StatusCode::OK;
788                res
789            }
790            JsonOrBinary::Binary(body) => {
791                let mut res = Response::new(body.into());
792                res.headers_mut().insert(
793                    CONTENT_TYPE,
794                    HeaderValue::from_static("application/octet-stream"),
795                );
796                *res.status_mut() = StatusCode::OK;
797                res
798            }
799            JsonOrBinary::Error(err, code) => {
800                let mut res = Response::new(err.into());
801                res.headers_mut()
802                    .insert(CONTENT_TYPE, HeaderValue::from_static("plain/text"));
803                *res.status_mut() = code;
804                res
805            }
806        }
807    }
808}
809
810fn with_json_reply<T: Serialize>(reply: &T, status: StatusCode) -> WithStatus<Json> {
811    with_status(json(reply), status)
812}