1use std::{convert::Infallible, mem::size_of, str::FromStr};
2
3use mina_p2p_messages::binprot::BinProtWrite;
4use serde::{de::DeserializeOwned, Deserialize, Serialize};
5use warp::{
6 http::HeaderValue,
7 hyper::{header::CONTENT_TYPE, Response, StatusCode},
8 reply::with_status,
9 Filter, Rejection, Reply,
10};
11
12use node::{core::snark::SnarkJobId, rpc::*};
13
14use openmina_node_common::rpc::{
15 RpcActionStatsGetResponse, RpcSender, RpcSnarkPoolGetResponse, RpcSnarkerJobCommitResponse,
16 RpcSnarkerJobSpecResponse, RpcStateGetResponse, RpcSyncStatsGetResponse,
17};
18
19macro_rules! compose_route {
20 ($x:expr $(,)?) => (
21 $x
22 );
23 ($x0:expr, $($x:expr),+ $(,)?) => (
24 $x0$(.or($x))+.boxed()
25 );
26}
27
28pub async fn run(port: u16, rpc_sender: RpcSender) {
29 let build_env_get = warp::path!("build_env")
30 .and(warp::get())
31 .then(move || async { with_json_reply(&node::BuildEnv::get(), StatusCode::OK) });
32
33 #[cfg(feature = "p2p-webrtc")]
34 let signaling = {
35 use node::p2p::{
36 connection::{
37 incoming::{IncomingSignalingMethod, P2pConnectionIncomingInitOpts},
38 P2pConnectionResponse,
39 },
40 webrtc, PeerId,
41 };
42
43 use super::rpc::RpcP2pConnectionIncomingResponse;
44
45 let handle = |sender: RpcSender, offer: Box<webrtc::Offer>| async move {
46 let mut rx = sender
47 .multishot_request(
48 2,
49 RpcRequest::P2pConnectionIncoming(P2pConnectionIncomingInitOpts {
50 peer_id: PeerId::from_public_key(offer.identity_pub_key.clone()),
51 signaling: IncomingSignalingMethod::Http,
52 offer,
53 }),
54 )
55 .await;
56
57 match rx.recv().await {
58 Some(RpcP2pConnectionIncomingResponse::Answer(answer)) => {
59 let status = match &answer {
60 P2pConnectionResponse::Accepted(_) => StatusCode::OK,
61 P2pConnectionResponse::Rejected(reason) => match reason.is_bad() {
62 false => StatusCode::OK,
63 true => StatusCode::BAD_REQUEST,
64 },
65 P2pConnectionResponse::SignalDecryptionFailed => StatusCode::BAD_REQUEST,
66 P2pConnectionResponse::InternalError => StatusCode::INTERNAL_SERVER_ERROR,
67 };
68 with_json_reply(&answer, status)
69 }
70 _ => {
71 let resp = P2pConnectionResponse::internal_error_str();
72 with_json_reply(&resp, StatusCode::INTERNAL_SERVER_ERROR)
73 }
74 }
75 };
76
77 let rpc_sender_clone = rpc_sender.clone();
78 let get = warp::path!("mina" / "webrtc" / "signal" / String)
79 .and(warp::get())
80 .then(move |offer: String| {
81 let rpc_sender_clone = rpc_sender_clone.clone();
82 async move {
83 let decode_res = Err(()).or_else(move |_| {
84 let json = bs58::decode(&offer).into_vec().or(Err(()))?;
85 serde_json::from_slice(&json).or(Err(()))
86 });
87 match decode_res {
88 Err(()) => with_json_reply(
89 &P2pConnectionResponse::SignalDecryptionFailed,
90 StatusCode::BAD_REQUEST,
91 ),
92 Ok(offer) => handle(rpc_sender_clone.clone(), offer).await,
93 }
94 }
95 });
96
97 let rpc_sender_clone = rpc_sender.clone();
98 let post = warp::path!("mina" / "webrtc" / "signal")
99 .and(warp::post())
100 .and(warp::filters::body::json())
101 .then(move |offer: Box<webrtc::Offer>| handle(rpc_sender_clone.clone(), offer));
102 get.or(post)
103 };
104
105 #[derive(Deserialize)]
107 struct StateQueryParams {
108 filter: Option<String>,
109 }
110 #[derive(Debug)]
111 struct StateGetRejection(RpcStateGetError);
112 impl warp::reject::Reject for StateGetRejection {}
113
114 let state_get = warp::path!("state")
115 .and(warp::get())
116 .and(with_rpc_sender(rpc_sender.clone()))
117 .and(warp::query())
118 .and_then(state_handler)
119 .recover(state_recover);
120
121 let state_post = warp::path!("state")
122 .and(warp::post())
123 .and(with_rpc_sender(rpc_sender.clone()))
124 .and(warp::body::json())
125 .and_then(state_handler)
126 .recover(state_recover);
127
128 async fn state_handler(
129 rpc_sender: RpcSender,
130 StateQueryParams { filter }: StateQueryParams,
131 ) -> Result<impl warp::Reply, warp::Rejection> {
132 rpc_sender
133 .oneshot_request(RpcRequest::StateGet(filter))
134 .await
135 .ok_or_else(|| warp::reject::custom(DroppedChannel))
136 .and_then(|reply: RpcStateGetResponse| {
137 reply.map_or_else(
138 |err| Err(warp::reject::custom(StateGetRejection(err))),
139 |state| Ok(warp::reply::json(&state)),
140 )
141 })
142 }
143
144 async fn state_recover(reject: warp::Rejection) -> Result<impl warp::Reply, warp::Rejection> {
145 if let Some(StateGetRejection(error)) = reject.find() {
146 Ok(warp::reply::with_status(
147 warp::reply::json(error),
148 StatusCode::BAD_REQUEST,
149 ))
150 } else {
151 Err(reject)
152 }
153 }
154
155 let rpc_sender_clone = rpc_sender.clone();
156 let status = warp::path!("status").and(warp::get()).then(move || {
157 let rpc_sender_clone = rpc_sender_clone.clone();
158 async move {
159 let result: RpcStatusGetResponse = rpc_sender_clone
160 .oneshot_request(RpcRequest::StatusGet)
161 .await
162 .flatten();
163
164 with_json_reply(&result, StatusCode::OK)
165 }
166 });
167
168 let rpc_sender_clone = rpc_sender.clone();
169 let make_heartbeat = warp::path!("make_heartbeat")
170 .and(warp::post())
171 .then(move || {
172 let rpc_sender_clone = rpc_sender_clone.clone();
173 async move {
174 let result: RpcHeartbeatGetResponse = rpc_sender_clone
175 .oneshot_request(RpcRequest::HeartbeatGet)
176 .await
177 .flatten();
178
179 with_json_reply(&result, StatusCode::OK)
180 }
181 });
182
183 let rpc_sender_clone = rpc_sender.clone();
184 let peers_get = warp::path!("state" / "peers")
185 .and(warp::get())
186 .then(move || {
187 let rpc_sender_clone = rpc_sender_clone.clone();
188 async move {
189 let result = rpc_sender_clone
190 .oneshot_request::<RpcPeersGetResponse>(RpcRequest::PeersGet)
191 .await;
192
193 with_json_reply(&result, StatusCode::OK)
194 }
195 });
196
197 let rpc_sender_clone = rpc_sender.clone();
198 let message_progress_get = warp::path!("state" / "message-progress")
199 .and(warp::get())
200 .then(move || {
201 let rpc_sender_clone = rpc_sender_clone.clone();
202 async move {
203 let result = rpc_sender_clone
204 .oneshot_request::<RpcMessageProgressResponse>(RpcRequest::MessageProgressGet)
205 .await;
206
207 with_json_reply(&result, StatusCode::OK)
208 }
209 });
210
211 let stats = {
213 let rpc_sender_clone = rpc_sender.clone();
214 #[derive(Deserialize, Default)]
215 struct ActionQueryParams {
216 id: Option<String>,
217 }
218 let action_stats = warp::path!("stats" / "actions")
219 .and(warp::get())
220 .and(optq::<ActionQueryParams>())
221 .then(move |query: ActionQueryParams| {
222 let rpc_sender_clone = rpc_sender_clone.clone();
223 async move {
224 let id_filter = query.id.as_deref();
225 let result: RpcActionStatsGetResponse = rpc_sender_clone
226 .oneshot_request(RpcRequest::ActionStatsGet(match id_filter {
227 None => ActionStatsQuery::SinceStart,
228 Some("latest") => ActionStatsQuery::ForLatestBlock,
229 Some(id) => {
230 let id = match id.parse() {
231 Ok(v) => v,
232 Err(err) => {
233 return with_json_reply(
234 &format!(
235 "'id' must be an u64 integer: {err}, instead passed: {id}"
236 ),
237 StatusCode::BAD_REQUEST,
238 );
239 }
240 };
241 ActionStatsQuery::ForBlockWithId(id)
242 }
243 }))
244 .await
245 .flatten();
246
247 with_json_reply(&result, StatusCode::OK)
248 }
249 });
250
251 let rpc_sender_clone = rpc_sender.clone();
252 #[derive(Deserialize, Default)]
253 struct SyncQueryParams {
254 limit: Option<usize>,
255 }
256 let sync_stats = warp::path!("stats" / "sync")
257 .and(warp::get())
258 .and(optq::<SyncQueryParams>())
259 .then(move |query: SyncQueryParams| {
260 let rpc_sender_clone = rpc_sender_clone.clone();
261 async move {
262 let result: RpcSyncStatsGetResponse = rpc_sender_clone
263 .oneshot_request(RpcRequest::SyncStatsGet(SyncStatsQuery {
264 limit: query.limit,
265 }))
266 .await
267 .flatten();
268
269 with_json_reply(&result, StatusCode::OK)
270 }
271 });
272
273 let rpc_sender_clone = rpc_sender.clone();
274 let block_producer_stats = warp::path!("stats" / "block_producer")
275 .and(warp::get())
276 .then(move || {
277 let rpc_sender_clone = rpc_sender_clone.clone();
278 async move {
279 let result: RpcBlockProducerStatsGetResponse = rpc_sender_clone
280 .oneshot_request(RpcRequest::BlockProducerStatsGet)
281 .await
282 .flatten();
283
284 with_json_reply(&result, StatusCode::OK)
285 }
286 });
287
288 action_stats.or(sync_stats).or(block_producer_stats)
289 };
290
291 let rpc_sender_clone = rpc_sender.clone();
292 let scan_state_summary_get = warp::path!("scan-state" / "summary" / ..)
293 .and(warp::get())
294 .and(
295 warp::path::param::<String>()
296 .map(Some)
297 .or_else(|_| async { Ok::<(Option<String>,), std::convert::Infallible>((None,)) }),
298 )
299 .and(warp::path::end())
300 .then(move |query: Option<String>| {
301 let rpc_sender_clone = rpc_sender_clone.clone();
302 let query = match query {
303 None => Ok(RpcScanStateSummaryGetQuery::ForBestTip),
304 Some(query) => None
305 .or_else(|| {
306 Some(RpcScanStateSummaryGetQuery::ForBlockWithHeight(
307 query.parse().ok()?,
308 ))
309 })
310 .ok_or(())
311 .or_else(|_| match query.parse() {
312 Err(_) => Err("invalid arg! Expected block hash or height"),
313 Ok(v) => Ok(RpcScanStateSummaryGetQuery::ForBlockWithHash(v)),
314 }),
315 };
316 async move {
317 let query = match query {
318 Ok(v) => v,
319 Err(err) => {
320 return with_json_reply(&err, StatusCode::BAD_REQUEST);
321 }
322 };
323 let res: Option<RpcScanStateSummaryGetResponse> = rpc_sender_clone
324 .oneshot_request(RpcRequest::ScanStateSummaryGet(query))
325 .await;
326 match res {
327 None => with_json_reply(
328 &"response channel dropped",
329 StatusCode::INTERNAL_SERVER_ERROR,
330 ),
331 Some(Err(err)) => with_json_reply(&err, StatusCode::INTERNAL_SERVER_ERROR),
332 Some(Ok(data)) => with_json_reply(&data, StatusCode::OK),
333 }
334 }
335 });
336
337 let rpc_sender_clone = rpc_sender.clone();
338 let snark_pool_jobs_get = warp::path!("snark-pool" / "jobs")
339 .and(warp::get())
340 .then(move || {
341 let rpc_sender_clone = rpc_sender_clone.clone();
342 async move {
343 let res: Option<RpcSnarkPoolGetResponse> = rpc_sender_clone
344 .oneshot_request(RpcRequest::SnarkPoolGet)
345 .await;
346 match res {
347 None => with_json_reply(
348 &"response channel dropped",
349 StatusCode::INTERNAL_SERVER_ERROR,
350 ),
351 Some(resp) => with_json_reply(&resp, StatusCode::OK),
352 }
353 }
354 });
355
356 let rpc_sender_clone = rpc_sender.clone();
357 let snark_pool_job_get = warp::path!("snark-pool" / "job" / SnarkJobId).then(move |job_id| {
358 let rpc_sender_clone = rpc_sender_clone.clone();
359 async move {
360 let res: Option<RpcSnarkPoolJobGetResponse> = rpc_sender_clone
361 .oneshot_request(RpcRequest::SnarkPoolJobGet { job_id })
362 .await;
363 match res {
364 None => with_json_reply(
365 &"response channel dropped",
366 StatusCode::INTERNAL_SERVER_ERROR,
367 ),
368 Some(resp) => with_json_reply(&resp, StatusCode::OK),
369 }
370 }
371 });
372
373 let rpc_sender_clone = rpc_sender.clone();
375 let snarker_job_commit = warp::path!("snarker" / "job" / "commit")
376 .and(warp::post())
377 .and(warp::filters::body::bytes())
378 .then(move |body: bytes::Bytes| {
379 let rpc_sender_clone = rpc_sender_clone.clone();
380 async move {
381 let Ok(job_id) = String::from_utf8(body.to_vec())
382 .or(Err(()))
383 .and_then(|s| SnarkJobId::from_str(&s).or(Err(())))
384 else {
385 return with_json_reply(&"invalid_input", StatusCode::BAD_REQUEST);
386 };
387
388 let res: Option<RpcSnarkerJobCommitResponse> = rpc_sender_clone
389 .oneshot_request(RpcRequest::SnarkerJobCommit { job_id })
390 .await;
391 match res {
392 None => with_json_reply(
393 &"response channel dropped",
394 StatusCode::INTERNAL_SERVER_ERROR,
395 ),
396 Some(resp) => {
397 let status = match &resp {
398 RpcSnarkerJobCommitResponse::Ok => StatusCode::CREATED,
399 _ => StatusCode::BAD_REQUEST,
400 };
401 with_json_reply(&resp, status)
402 }
403 }
404 }
405 });
406
407 #[derive(Deserialize)]
408 struct JobIdParam {
409 id: SnarkJobId,
410 }
411
412 let rpc_sender_clone = rpc_sender.clone();
413 let snarker_job_spec = warp::path!("snarker" / "job" / "spec")
414 .and(warp::get())
415 .and(warp::header::optional("accept"))
416 .and(warp::query())
417 .then(
418 move |accept: Option<String>, JobIdParam { id: job_id }: JobIdParam| {
419 let rpc_sender_clone = rpc_sender_clone.clone();
420 async move {
421 rpc_sender_clone
422 .oneshot_request(RpcRequest::SnarkerJobSpec { job_id })
423 .await
424 .map_or_else(
425 || {
426 JsonOrBinary::error(
427 "response channel dropped",
428 StatusCode::INTERNAL_SERVER_ERROR,
429 )
430 },
431 |resp| match resp {
432 RpcSnarkerJobSpecResponse::Ok(spec)
433 if accept.as_deref() == Some("application/octet-stream") =>
434 {
435 JsonOrBinary::binary(spec)
436 }
437 RpcSnarkerJobSpecResponse::Ok(spec) => JsonOrBinary::json(spec),
438 _ => JsonOrBinary::error("error", StatusCode::BAD_REQUEST),
439 },
440 )
441 }
442 },
443 );
444
445 let dropped_channel_response = || {
446 with_json_reply(
447 &"response channel dropped",
448 StatusCode::INTERNAL_SERVER_ERROR,
449 )
450 };
451
452 let rpc_sender_clone = rpc_sender.clone();
453 let snark_workers = warp::path!("snarker" / "workers")
454 .and(warp::get())
455 .then(move || {
456 let rpc_sender_clone = rpc_sender_clone.clone();
457 async move {
458 rpc_sender_clone
459 .oneshot_request(RpcRequest::SnarkerWorkers)
460 .await
461 .map_or_else(
462 dropped_channel_response,
463 |reply: RpcSnarkerWorkersResponse| with_json_reply(&reply, StatusCode::OK),
464 )
465 }
466 });
467
468 let rpc_sender_clone = rpc_sender.clone();
469 let snarker_config = warp::path!("snarker" / "config")
470 .and(warp::get())
471 .then(move || {
472 let rpc_sender_clone = rpc_sender_clone.clone();
473 async move {
474 rpc_sender_clone
475 .oneshot_request(RpcRequest::SnarkerConfig)
476 .await
477 .map_or_else(
478 dropped_channel_response,
479 |reply: node::rpc::RpcSnarkerConfigGetResponse| {
480 with_json_reply(&reply, StatusCode::OK)
481 },
482 )
483 }
484 });
485
486 let rpc_sender_clone = rpc_sender.clone();
487 let transaction_pool = warp::path!("transaction-pool")
488 .and(warp::get())
489 .then(move || {
490 let rpc_sender_clone = rpc_sender_clone.clone();
491 async move {
492 rpc_sender_clone
493 .transaction_pool()
494 .get()
495 .await
496 .map_or_else(dropped_channel_response, |reply| {
497 with_json_reply(&reply, StatusCode::OK)
498 })
499 }
500 });
501
502 let rpc_sender_clone = rpc_sender.clone();
503 let accounts = warp::path("accounts").and(warp::get()).then(move || {
504 let rpc_sender_clone = rpc_sender_clone.clone();
505
506 async move {
507 rpc_sender_clone
508 .ledger()
509 .latest()
510 .accounts()
511 .all()
512 .await
513 .map_or_else(
514 dropped_channel_response,
515 |reply: node::rpc::RpcLedgerSlimAccountsResponse| {
516 with_json_reply(&reply, StatusCode::OK)
517 },
518 )
519 }
520 });
521
522 let rpc_sender_clone = rpc_sender.clone();
523 let transaction_post = warp::path("send-payment")
524 .and(warp::post())
525 .and(warp::filters::body::json())
526 .then(move |body: Vec<RpcInjectPayment>| {
527 let rpc_sender_clone = rpc_sender_clone.clone();
528
529 async move {
530 match rpc_sender_clone
531 .transaction_pool()
532 .inject()
533 .payment(body)
534 .await
535 {
536 Err(err) => with_status(
537 warp::reply::json(&serde_json::json!({"error": err})),
538 StatusCode::INTERNAL_SERVER_ERROR,
539 ),
540 Ok(res) => res.map_or_else(
541 dropped_channel_response,
542 |reply: node::rpc::RpcTransactionInjectResponse| {
543 with_json_reply(&reply, StatusCode::OK)
544 },
545 ),
546 }
547 }
548 });
549
550 let rpc_sender_clone = rpc_sender.clone();
551 let transition_frontier_user_commands = warp::path("best-chain-user-commands")
552 .and(warp::get())
553 .then(move || {
554 let rpc_sender_clone = rpc_sender_clone.clone();
555
556 async move {
557 rpc_sender_clone
558 .transition_frontier()
559 .best_chain()
560 .user_commands()
561 .await
562 .map_or_else(dropped_channel_response, |reply| {
563 with_json_reply(&reply, StatusCode::OK)
564 })
565 }
566 });
567
568 let cors = warp::cors()
569 .allow_any_origin()
570 .allow_methods(["GET", "POST", "PUT", "DELETE", "OPTIONS"])
571 .allow_headers([
572 "User-Agent",
573 "Sec-Fetch-Mode",
574 "Referer",
575 "Origin",
576 "Access-Control-Request-Method",
577 "Access-Control-Request-Headers",
578 "Content-Type",
579 ]);
580 #[cfg(not(feature = "p2p-webrtc"))]
581 let routes = state_get.or(state_post);
582 #[cfg(feature = "p2p-webrtc")]
583 let routes = signaling.or(state_get).or(state_post);
584 let routes = compose_route!(
585 build_env_get,
586 routes,
587 status,
588 make_heartbeat,
589 peers_get,
590 message_progress_get,
591 stats,
592 scan_state_summary_get,
593 snark_pool_jobs_get,
594 snark_pool_job_get,
595 snarker_config,
596 snarker_job_commit,
597 snarker_job_spec,
598 snark_workers,
599 transaction_pool,
600 accounts,
601 transaction_post,
602 transition_frontier_user_commands,
603 healthcheck(rpc_sender.clone()),
604 readiness(rpc_sender.clone()),
605 discovery::routing_table(rpc_sender.clone()),
606 discovery::bootstrap_stats(rpc_sender.clone()),
607 super::graphql::routes(rpc_sender),
608 );
609
610 let routes = routes.recover(recover).with(cors);
611
612 warp::serve(routes).run(([0, 0, 0, 0], port)).await;
613}
614
615fn healthcheck(
616 rpc_sender: RpcSender,
617) -> impl Filter<Error = Rejection, Extract = impl Reply> + Clone {
618 warp::path!("healthz").and(warp::get()).then(move || {
619 let rpc_sender = rpc_sender.clone();
620 async move {
621 rpc_sender
622 .oneshot_request(RpcRequest::HealthCheck)
623 .await
624 .map_or_else(
625 || {
626 with_status(
627 String::from(DROPPED_CHANNEL),
628 StatusCode::INTERNAL_SERVER_ERROR,
629 )
630 },
631 |reply: node::rpc::RpcHealthCheckResponse| match reply {
632 Ok(()) => with_status(String::new(), StatusCode::OK),
633 Err(err) => with_status(err, StatusCode::SERVICE_UNAVAILABLE),
634 },
635 )
636 }
637 })
638}
639
640fn readiness(
641 rpc_sender: RpcSender,
642) -> impl Filter<Error = Rejection, Extract = impl Reply> + Clone {
643 warp::path!("readyz").and(warp::get()).then(move || {
644 let rpc_sender = rpc_sender.clone();
645 async move {
646 rpc_sender
647 .oneshot_request(RpcRequest::ReadinessCheck)
648 .await
649 .map_or_else(
650 || {
651 with_status(
652 String::from(DROPPED_CHANNEL),
653 StatusCode::INTERNAL_SERVER_ERROR,
654 )
655 },
656 |reply: node::rpc::RpcReadinessCheckResponse| match reply {
657 Ok(()) => with_status(String::new(), StatusCode::OK),
658 Err(err) => with_status(err, StatusCode::SERVICE_UNAVAILABLE),
659 },
660 )
661 }
662 })
663}
664
665mod discovery {
666 use node::rpc::{
667 RpcDiscoveryBoostrapStatsResponse, RpcDiscoveryRoutingTableResponse, RpcRequest,
668 };
669 use openmina_node_common::rpc::RpcSender;
670 use warp::Filter;
671
672 use super::{with_rpc_sender, DroppedChannel};
673
674 pub fn routing_table(
675 rpc_sender: RpcSender,
676 ) -> impl warp::Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
677 warp::path!("discovery" / "routing_table")
678 .and(warp::get())
679 .and(with_rpc_sender(rpc_sender))
680 .and_then(get_routing_table)
681 }
682
683 pub fn bootstrap_stats(
684 rpc_sender: RpcSender,
685 ) -> impl warp::Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
686 warp::path!("discovery" / "bootstrap_stats")
687 .and(warp::get())
688 .and(with_rpc_sender(rpc_sender))
689 .and_then(get_bootstrap_stats)
690 }
691
692 async fn get_routing_table(rpc_sender: RpcSender) -> Result<impl warp::Reply, warp::Rejection> {
693 rpc_sender
694 .oneshot_request(RpcRequest::DiscoveryRoutingTable)
695 .await
696 .map_or_else(
697 || Err(warp::reject::custom(DroppedChannel)),
698 |reply: RpcDiscoveryRoutingTableResponse| Ok(warp::reply::json(&reply)),
699 )
700 }
701
702 async fn get_bootstrap_stats(
703 rpc_sender: RpcSender,
704 ) -> Result<impl warp::Reply, warp::Rejection> {
705 rpc_sender
706 .oneshot_request(RpcRequest::DiscoveryBoostrapStats)
707 .await
708 .map_or_else(
709 || Err(warp::reject::custom(DroppedChannel)),
710 |reply: RpcDiscoveryBoostrapStatsResponse| Ok(warp::reply::json(&reply)),
711 )
712 }
713}
714
715fn with_rpc_sender(
716 rpc_sender: RpcSender,
717) -> impl warp::Filter<Extract = (RpcSender,), Error = Infallible> + Clone {
718 warp::any().map(move || rpc_sender.clone())
719}
720
721const DROPPED_CHANNEL: &str = "response channel dropped, see error log for details";
722
723#[derive(Debug)]
724struct DroppedChannel;
725
726impl warp::reject::Reject for DroppedChannel {}
727
728async fn recover(rejection: warp::Rejection) -> Result<impl warp::Reply, warp::Rejection> {
729 if let Some(DroppedChannel) = rejection.find() {
730 Ok(warp::reply::with_status(
731 warp::reply::json(&serde_json::json!({"error": DROPPED_CHANNEL})),
732 StatusCode::INTERNAL_SERVER_ERROR,
733 ))
734 } else {
735 Err(rejection)
736 }
737}
738
739use warp::{
740 filters::BoxedFilter,
741 reply::{json, Json, WithStatus},
742};
743
744fn optq<T: 'static + Default + Send + DeserializeOwned>() -> BoxedFilter<(T,)> {
745 warp::any()
746 .and(warp::query().or(warp::any().map(|| T::default())))
747 .unify()
748 .boxed()
749}
750
751pub enum JsonOrBinary {
752 Json(Vec<u8>),
753 Binary(Vec<u8>),
754 Error(String, StatusCode),
755}
756
757impl JsonOrBinary {
758 fn json<T: Serialize>(reply: T) -> Self {
759 match serde_json::to_vec(&reply) {
760 Ok(v) => JsonOrBinary::Json(v),
761 Err(err) => JsonOrBinary::error(err, StatusCode::INTERNAL_SERVER_ERROR),
762 }
763 }
764 fn binary<T: BinProtWrite>(reply: T) -> Self {
765 let mut vec = Vec::new();
766 match reply.binprot_write(&mut vec) {
767 Ok(()) => {}
768 Err(err) => return JsonOrBinary::error(err, StatusCode::INTERNAL_SERVER_ERROR),
769 }
770 let mut result = Vec::with_capacity(vec.len() + size_of::<u64>());
771 result.extend((vec.len() as u64).to_le_bytes());
772 result.extend(vec);
773 JsonOrBinary::Binary(result)
774 }
775 fn error<T: ToString>(err: T, code: StatusCode) -> Self {
776 JsonOrBinary::Error(err.to_string(), code)
777 }
778}
779
780impl Reply for JsonOrBinary {
781 fn into_response(self) -> warp::reply::Response {
782 match self {
783 JsonOrBinary::Json(body) => {
784 let mut res = Response::new(body.into());
785 res.headers_mut()
786 .insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
787 *res.status_mut() = StatusCode::OK;
788 res
789 }
790 JsonOrBinary::Binary(body) => {
791 let mut res = Response::new(body.into());
792 res.headers_mut().insert(
793 CONTENT_TYPE,
794 HeaderValue::from_static("application/octet-stream"),
795 );
796 *res.status_mut() = StatusCode::OK;
797 res
798 }
799 JsonOrBinary::Error(err, code) => {
800 let mut res = Response::new(err.into());
801 res.headers_mut()
802 .insert(CONTENT_TYPE, HeaderValue::from_static("plain/text"));
803 *res.status_mut() = code;
804 res
805 }
806 }
807 }
808}
809
810fn with_json_reply<T: Serialize>(reply: &T, status: StatusCode) -> WithStatus<Json> {
811 with_status(json(reply), status)
812}