1use ledger::scan_state::transaction_logic::valid;
2use mina_p2p_messages::v2::{
3 MinaBaseSignedCommandStableV2, MinaBaseZkappCommandTStableV1WireStableV1, NonZeroCurvePoint,
4 TransactionSnarkWorkTStableV2,
5};
6use openmina_core::{
7 block::AppliedBlock,
8 bug_condition,
9 requests::{RequestId, RpcId, RpcIdType},
10 transaction::{TransactionPoolMessageSource, TransactionWithHash},
11};
12use p2p::{
13 connection::{incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction},
14 webrtc::P2pConnectionResponse,
15 PeerId,
16};
17use redux::ActionWithMeta;
18
19use crate::{
20 ledger::read::{LedgerReadAction, LedgerReadInitCallback, LedgerReadRequest},
21 p2p_ready,
22 rpc::{GetBlockQuery, PooledCommandsQuery},
23 rpc_effectful::RpcEffectfulAction,
24 TransactionPoolAction,
25};
26
27use super::{
28 ConsensusTimeQuery, PeerConnectionStatus, RpcAction, RpcPeerInfo, RpcRequest,
29 RpcRequestExtraData, RpcRequestState, RpcRequestStatus, RpcScanStateSummaryGetQuery,
30 RpcSnarkerConfig, RpcState,
31};
32
33impl RpcState {
34 pub fn reducer(mut state_context: crate::Substate<Self>, action: ActionWithMeta<&RpcAction>) {
35 let Ok(state) = state_context.get_substate_mut() else {
36 return;
37 };
38
39 let (action, meta) = action.split();
40 match action {
41 RpcAction::GlobalStateGet { rpc_id, filter } => {
42 let dispatcher = state_context.into_dispatcher();
43 dispatcher.push(RpcEffectfulAction::GlobalStateGet {
44 rpc_id: *rpc_id,
45 filter: filter.clone(),
46 });
47 }
48 RpcAction::StatusGet { rpc_id } => {
49 let dispatcher = state_context.into_dispatcher();
50 dispatcher.push(RpcEffectfulAction::StatusGet { rpc_id: *rpc_id });
51 }
52 RpcAction::HeartbeatGet { rpc_id } => {
53 let dispatcher = state_context.into_dispatcher();
54 dispatcher.push(RpcEffectfulAction::HeartbeatGet { rpc_id: *rpc_id });
55 }
56 RpcAction::ActionStatsGet { rpc_id, query } => {
57 let dispatcher = state_context.into_dispatcher();
58 dispatcher.push(RpcEffectfulAction::ActionStatsGet {
59 rpc_id: *rpc_id,
60 query: *query,
61 });
62 }
63 RpcAction::SyncStatsGet { rpc_id, query } => {
64 let dispatcher = state_context.into_dispatcher();
65 dispatcher.push(RpcEffectfulAction::SyncStatsGet {
66 rpc_id: *rpc_id,
67 query: *query,
68 });
69 }
70 RpcAction::BlockProducerStatsGet { rpc_id } => {
71 let dispatcher = state_context.into_dispatcher();
72 dispatcher.push(RpcEffectfulAction::BlockProducerStatsGet { rpc_id: *rpc_id });
73 }
74 RpcAction::MessageProgressGet { rpc_id } => {
75 let dispatcher = state_context.into_dispatcher();
76 dispatcher.push(RpcEffectfulAction::MessageProgressGet { rpc_id: *rpc_id });
77 }
78 RpcAction::PeersGet { rpc_id } => {
79 let (dispatcher, state) = state_context.into_dispatcher_and_state();
80 let peers = collect_rpc_peers_info(state);
81 dispatcher.push(RpcEffectfulAction::PeersGet {
82 rpc_id: *rpc_id,
83 peers,
84 });
85 }
86 RpcAction::P2pConnectionOutgoingInit { rpc_id, opts } => {
87 let rpc_state = RpcRequestState {
88 req: RpcRequest::P2pConnectionOutgoing(opts.clone()),
89 status: RpcRequestStatus::Init { time: meta.time() },
90 data: Default::default(),
91 };
92 state.requests.insert(*rpc_id, rpc_state);
93
94 let dispatcher = state_context.into_dispatcher();
95
96 dispatcher.push(P2pConnectionOutgoingAction::Init {
97 opts: opts.clone(),
98 rpc_id: Some(*rpc_id),
99 on_success: Some(redux::callback!(
100 on_p2p_connection_outgoing_rpc_connection_success((peer_id: PeerId, rpc_id: Option<RpcId>)) -> crate::Action {
101 let Some(rpc_id) = rpc_id else {
102 unreachable!("RPC ID not provided");
103 };
104
105 RpcAction::P2pConnectionOutgoingPending{ rpc_id }
106 }
107 )),
108 });
109 }
110 RpcAction::P2pConnectionOutgoingPending { rpc_id } => {
111 let Some(rpc) = state.requests.get_mut(rpc_id) else {
112 bug_condition!(
113 "Rpc state not found for RpcAction::P2pConnectionOutgoingPending({})",
114 rpc_id
115 );
116 return;
117 };
118 rpc.status = RpcRequestStatus::Pending { time: meta.time() };
119 }
120 RpcAction::P2pConnectionOutgoingError { rpc_id, error } => {
121 let Some(rpc) = state.requests.get_mut(rpc_id) else {
122 bug_condition!(
123 "Rpc state not found for RpcAction::P2pConnectionOutgoingError({})",
124 rpc_id
125 );
126 return;
127 };
128 rpc.status = RpcRequestStatus::Error {
129 time: meta.time(),
130 error: format!("{:?}", error),
131 };
132
133 let dispatcher = state_context.into_dispatcher();
134 dispatcher.push(RpcEffectfulAction::P2pConnectionOutgoingError {
135 rpc_id: *rpc_id,
136 error: format!("{:?}", error),
137 });
138 }
139 RpcAction::P2pConnectionOutgoingSuccess { rpc_id } => {
140 let Some(rpc) = state.requests.get_mut(rpc_id) else {
141 bug_condition!(
142 "Rpc state not found for RpcAction::P2pConnectionOutgoingSuccess({})",
143 rpc_id
144 );
145 return;
146 };
147 rpc.status = RpcRequestStatus::Success { time: meta.time() };
148 let dispatcher = state_context.into_dispatcher();
149 dispatcher
150 .push(RpcEffectfulAction::P2pConnectionOutgoingSuccess { rpc_id: *rpc_id });
151 }
152 RpcAction::P2pConnectionIncomingInit { rpc_id, opts } => {
153 let rpc_state = RpcRequestState {
154 req: RpcRequest::P2pConnectionIncoming(opts.clone()),
155 status: RpcRequestStatus::Init { time: meta.time() },
156 data: Default::default(),
157 };
158 state.requests.insert(*rpc_id, rpc_state);
159
160 let (dispatcher, state) = state_context.into_dispatcher_and_state();
161 let p2p = p2p_ready!(state.p2p, meta.time());
162
163 match p2p.incoming_accept(opts.peer_id, &opts.offer) {
164 Ok(_) => {
165 dispatcher.push(P2pConnectionIncomingAction::Init {
166 opts: opts.clone(),
167 rpc_id: Some(*rpc_id),
168 });
169 dispatcher
170 .push(RpcAction::P2pConnectionIncomingPending { rpc_id: *rpc_id });
171 }
172 Err(reason) => {
173 let response = P2pConnectionResponse::Rejected(reason);
174 dispatcher.push(RpcAction::P2pConnectionIncomingRespond {
175 rpc_id: *rpc_id,
176 response,
177 });
178 }
179 }
180 }
181 RpcAction::P2pConnectionIncomingPending { rpc_id } => {
182 let Some(rpc) = state.requests.get_mut(rpc_id) else {
183 bug_condition!(
184 "Rpc state not found for RpcAction::P2pConnectionIncomingPending({})",
185 rpc_id
186 );
187 return;
188 };
189 rpc.status = RpcRequestStatus::Pending { time: meta.time() };
190 }
191 RpcAction::P2pConnectionIncomingRespond { rpc_id, response } => {
192 let dispatcher = state_context.into_dispatcher();
193 dispatcher.push(RpcEffectfulAction::P2pConnectionIncomingRespond {
194 rpc_id: *rpc_id,
195 response: response.clone(),
196 });
197 }
198 RpcAction::P2pConnectionIncomingError { rpc_id, error } => {
199 let Some(rpc) = state.requests.get_mut(rpc_id) else {
200 bug_condition!(
201 "Rpc state not found for RpcAction::P2pConnectionIncomingError({})",
202 rpc_id
203 );
204 return;
205 };
206 rpc.status = RpcRequestStatus::Error {
207 time: meta.time(),
208 error: format!("{:?}", error),
209 };
210
211 let dispatcher = state_context.into_dispatcher();
212 dispatcher.push(RpcEffectfulAction::P2pConnectionIncomingError {
213 rpc_id: *rpc_id,
214 error: error.to_owned(),
215 });
216 }
217 RpcAction::P2pConnectionIncomingSuccess { rpc_id } => {
218 let Some(rpc) = state.requests.get_mut(rpc_id) else {
219 bug_condition!(
220 "Rpc state not found for RpcAction::P2pConnectionIncomingSuccess({})",
221 rpc_id
222 );
223 return;
224 };
225 rpc.status = RpcRequestStatus::Success { time: meta.time() };
226 let dispatcher = state_context.into_dispatcher();
227 dispatcher
228 .push(RpcEffectfulAction::P2pConnectionIncomingSuccess { rpc_id: *rpc_id });
229 }
230 RpcAction::ScanStateSummaryGetInit { rpc_id, query } => {
231 let rpc_state = RpcRequestState {
232 req: RpcRequest::ScanStateSummaryGet(query.clone()),
233 status: RpcRequestStatus::Init { time: meta.time() },
234 data: Default::default(),
235 };
236 state.requests.insert(*rpc_id, rpc_state);
237
238 let dispatcher = state_context.into_dispatcher();
239 dispatcher.push(RpcAction::ScanStateSummaryLedgerGetInit { rpc_id: *rpc_id });
240 }
241 RpcAction::ScanStateSummaryLedgerGetInit { rpc_id } => {
242 let (dispatcher, state) = state_context.into_dispatcher_and_state();
243 let transition_frontier = &state.transition_frontier;
244
245 let Some(query) = None.or_else(|| {
246 let req = state.rpc.requests.get(rpc_id)?;
247 match &req.req {
248 RpcRequest::ScanStateSummaryGet(query) => Some(query),
249 _ => None,
250 }
251 }) else {
252 return;
253 };
254
255 let block = match query {
256 RpcScanStateSummaryGetQuery::ForBestTip => {
257 transition_frontier.best_tip_breadcrumb()
258 }
259 RpcScanStateSummaryGetQuery::ForBlockWithHash(hash) => transition_frontier
260 .best_chain
261 .iter()
262 .rev()
263 .find(|b| b.hash() == hash),
264 RpcScanStateSummaryGetQuery::ForBlockWithHeight(height) => transition_frontier
265 .best_chain
266 .iter()
267 .rev()
268 .find(|b| b.height() == *height),
269 };
270 let block = match block {
271 Some(v) => v.clone(),
272 None => {
273 dispatcher.push(RpcAction::ScanStateSummaryGetPending {
274 rpc_id: *rpc_id,
275 block: None,
276 });
277 dispatcher.push(RpcAction::ScanStateSummaryGetSuccess {
278 rpc_id: *rpc_id,
279 scan_state: Ok(Vec::new()),
280 });
281 return;
282 }
283 };
284
285 dispatcher.push(LedgerReadAction::Init {
286 request: LedgerReadRequest::ScanStateSummary(
287 block.staged_ledger_hashes().clone(),
288 ),
289 callback: LedgerReadInitCallback::RpcScanStateSummaryGetPending {
290 callback: redux::callback!(
291 on_ledger_read_init_rpc_scan_state_summary_get_pending((rpc_id: RequestId<RpcIdType>, block: AppliedBlock)) -> crate::Action{
292 RpcAction::ScanStateSummaryGetPending { rpc_id, block: Some(block) }
293 }
294 ),
295 args: (*rpc_id, block),
296 },
297 });
298 }
299 RpcAction::ScanStateSummaryGetPending { rpc_id, block } => {
300 let Some(rpc) = state.requests.get_mut(rpc_id) else {
301 bug_condition!(
302 "Rpc state not found for RpcAction::ScanStateSummaryGetPending({})",
303 rpc_id
304 );
305 return;
306 };
307 rpc.status = RpcRequestStatus::Pending { time: meta.time() };
308 rpc.data = RpcRequestExtraData::FullBlockOpt(block.clone());
309 }
310 RpcAction::ScanStateSummaryGetSuccess { rpc_id, scan_state } => {
311 let Some(rpc) = state.requests.get_mut(rpc_id) else {
312 bug_condition!(
313 "Rpc state not found for RpcAction::ScanStateSummaryGetSuccess({})",
314 rpc_id
315 );
316 return;
317 };
318 rpc.status = RpcRequestStatus::Success { time: meta.time() };
319 let dispatcher = state_context.into_dispatcher();
320 dispatcher.push(RpcEffectfulAction::ScanStateSummaryGetSuccess {
321 rpc_id: *rpc_id,
322 scan_state: scan_state.clone(),
323 });
324 }
325 RpcAction::SnarkPoolAvailableJobsGet { rpc_id } => {
326 let dispatcher = state_context.into_dispatcher();
327 dispatcher.push(RpcEffectfulAction::SnarkPoolAvailableJobsGet { rpc_id: *rpc_id });
328 }
329 RpcAction::SnarkPoolJobGet { rpc_id, job_id } => {
330 let dispatcher = state_context.into_dispatcher();
331 dispatcher.push(RpcEffectfulAction::SnarkPoolJobGet {
332 rpc_id: *rpc_id,
333 job_id: job_id.clone(),
334 });
335 }
336 RpcAction::SnarkPoolCompletedJobsGet { rpc_id } => {
337 let (dispatcher, state) = state_context.into_dispatcher_and_state();
338
339 let jobs = state
340 .snark_pool
341 .completed_snarks_iter()
342 .map(|s| TransactionSnarkWorkTStableV2::from(s.clone()))
343 .collect::<Vec<_>>();
344
345 dispatcher.push(RpcEffectfulAction::SnarkPoolCompletedJobsGet {
346 rpc_id: *rpc_id,
347 jobs,
348 });
349 }
350 RpcAction::SnarkPoolPendingJobsGet { rpc_id } => {
351 let (dispatcher, state) = state_context.into_dispatcher_and_state();
352
353 let jobs = state
354 .snark_pool
355 .available_jobs_iter()
356 .cloned()
357 .collect::<Vec<_>>();
358
359 dispatcher.push(RpcEffectfulAction::SnarkPoolPendingJobsGet {
360 rpc_id: *rpc_id,
361 jobs,
362 })
363 }
364 RpcAction::SnarkerConfigGet { rpc_id } => {
365 let (dispatcher, state) = state_context.into_dispatcher_and_state();
366
367 let config = state
368 .config
369 .snarker
370 .as_ref()
371 .map(|config| RpcSnarkerConfig {
372 public_key: config.public_key.as_ref().clone(),
373 fee: config.fee.clone(),
374 });
375
376 dispatcher.push(RpcEffectfulAction::SnarkerConfigGet {
377 rpc_id: *rpc_id,
378 config,
379 });
380 }
381 RpcAction::SnarkerJobCommit { rpc_id, job_id } => {
382 let dispatcher = state_context.into_dispatcher();
383 dispatcher.push(RpcEffectfulAction::SnarkerJobCommit {
384 rpc_id: *rpc_id,
385 job_id: job_id.clone(),
386 });
387 }
388 RpcAction::SnarkerJobSpec { rpc_id, job_id } => {
389 let dispatcher = state_context.into_dispatcher();
390 dispatcher.push(RpcEffectfulAction::SnarkerJobSpec {
391 rpc_id: *rpc_id,
392 job_id: job_id.clone(),
393 });
394 }
395 RpcAction::SnarkerWorkersGet { rpc_id } => {
396 let (dispatcher, state) = state_context.into_dispatcher_and_state();
397 let snark_worker = state.external_snark_worker.0.clone();
398 dispatcher.push(RpcEffectfulAction::SnarkerWorkersGet {
399 rpc_id: *rpc_id,
400 snark_worker,
401 });
402 }
403 RpcAction::HealthCheck { rpc_id } => {
404 let (dispatcher, state) = state_context.into_dispatcher_and_state();
405
406 let has_peers = state.p2p.ready_peers_iter().map(|(peer_id, _peer)| {
407 openmina_core::log::debug!(meta.time(); "found ready peer: {peer_id}")
408 })
409 .next()
410 .ok_or_else(|| {
411 openmina_core::log::warn!(meta.time(); "no ready peers");
412 String::from("no ready peers")
413 });
414
415 dispatcher.push(RpcEffectfulAction::HealthCheck {
416 rpc_id: *rpc_id,
417 has_peers,
418 });
419 }
420 RpcAction::ReadinessCheck { rpc_id } => {
421 let dispatcher = state_context.into_dispatcher();
422 dispatcher.push(RpcEffectfulAction::ReadinessCheck { rpc_id: *rpc_id });
423 }
424 RpcAction::DiscoveryRoutingTable { rpc_id } => {
425 let (dispatcher, state) = state_context.into_dispatcher_and_state();
426
427 let response = state
428 .p2p
429 .ready()
430 .and_then(|p2p| p2p.network.scheduler.discovery_state())
431 .and_then(|discovery_state| {
432 match (&discovery_state.routing_table).try_into() {
433 Ok(resp) => Some(resp),
434 Err(err) => {
435 bug_condition!(
436 "{:?} error converting routing table into response: {:?}",
437 err,
438 action
439 );
440 None
441 }
442 }
443 });
444
445 dispatcher.push(RpcEffectfulAction::DiscoveryRoutingTable {
446 rpc_id: *rpc_id,
447 response,
448 });
449 }
450 RpcAction::DiscoveryBoostrapStats { rpc_id } => {
451 let (dispatcher, state) = state_context.into_dispatcher_and_state();
452
453 let response = state
454 .p2p
455 .ready()
456 .and_then(|p2p| p2p.network.scheduler.discovery_state())
457 .and_then(|discovery_state: &p2p::P2pNetworkKadState| {
458 discovery_state.bootstrap_stats().cloned()
459 });
460
461 dispatcher.push(RpcEffectfulAction::DiscoveryBoostrapStats {
462 rpc_id: *rpc_id,
463 response,
464 });
465 }
466 RpcAction::Finish { rpc_id } => {
467 state.requests.remove(rpc_id);
468 }
469 RpcAction::TransactionPool { rpc_id } => {
470 let (dispatcher, state) = state_context.into_dispatcher_and_state();
471 let response = state.transaction_pool.get_all_transactions();
472 dispatcher.push(RpcEffectfulAction::TransactionPool {
473 rpc_id: *rpc_id,
474 response,
475 });
476 }
477 RpcAction::LedgerAccountsGetInit {
478 rpc_id,
479 account_query,
480 } => {
481 let rpc_state = RpcRequestState {
482 req: RpcRequest::LedgerAccountsGet(account_query.clone()),
483 status: RpcRequestStatus::Init { time: meta.time() },
484 data: Default::default(),
485 };
486 state.requests.insert(*rpc_id, rpc_state);
487
488 let (dispatcher, state) = state_context.into_dispatcher_and_state();
489 let ledger_hash = if let Some(best_tip) = state.transition_frontier.best_tip() {
490 best_tip.merkle_root_hash()
491 } else {
492 return;
493 };
494
495 dispatcher.push(LedgerReadAction::Init {
496 request: LedgerReadRequest::AccountsForRpc(
497 *rpc_id,
498 ledger_hash.clone(),
499 account_query.clone(),
500 ),
501 callback: LedgerReadInitCallback::RpcLedgerAccountsGetPending {
502 callback: redux::callback!(
503 on_ledger_read_init_rpc_actions_get_init(rpc_id: RequestId<RpcIdType>) -> crate::Action{
504 RpcAction::LedgerAccountsGetPending { rpc_id }
505 }
506 ),
507 args: *rpc_id,
508 },
509 })
510 }
511 RpcAction::LedgerAccountsGetPending { rpc_id } => {
512 let Some(rpc) = state.requests.get_mut(rpc_id) else {
513 return;
514 };
515 rpc.status = RpcRequestStatus::Pending { time: meta.time() };
516 }
517 RpcAction::LedgerAccountsGetSuccess {
518 rpc_id,
519 account_query,
520 accounts,
521 } => {
522 let Some(rpc) = state.requests.get_mut(rpc_id) else {
523 return;
524 };
525 rpc.status = RpcRequestStatus::Success { time: meta.time() };
526
527 let dispatcher = state_context.into_dispatcher();
528 dispatcher.push(RpcEffectfulAction::LedgerAccountsGetSuccess {
529 rpc_id: *rpc_id,
530 account_query: account_query.clone(),
531 accounts: accounts.clone(),
532 });
533 }
534 RpcAction::TransactionInjectInit { rpc_id, commands } => {
535 let rpc_state = RpcRequestState {
536 req: RpcRequest::TransactionInject(commands.clone()),
537 status: RpcRequestStatus::Init { time: meta.time() },
538 data: Default::default(),
539 };
540 state.requests.insert(*rpc_id, rpc_state);
541
542 let commands_with_hash = commands
543 .clone()
544 .into_iter()
545 .filter_map(|cmd| TransactionWithHash::try_new(cmd).ok())
547 .collect();
548
549 let dispatcher = state_context.into_dispatcher();
550 dispatcher.push(RpcAction::TransactionInjectPending { rpc_id: *rpc_id });
551 dispatcher.push(TransactionPoolAction::StartVerify {
552 commands: commands_with_hash,
553 from_source: TransactionPoolMessageSource::rpc(*rpc_id),
554 });
555 }
556 RpcAction::TransactionInjectPending { rpc_id } => {
557 let Some(rpc) = state.requests.get_mut(rpc_id) else {
558 return;
559 };
560 rpc.status = RpcRequestStatus::Pending { time: meta.time() };
561 }
562 RpcAction::TransactionInjectSuccess { rpc_id, response } => {
563 let Some(rpc) = state.requests.get_mut(rpc_id) else {
564 return;
565 };
566 rpc.status = RpcRequestStatus::Success { time: meta.time() };
567
568 let dispatcher = state_context.into_dispatcher();
569 let response = response.clone().into_iter().map(|cmd| cmd.data).collect();
570 dispatcher.push(RpcEffectfulAction::TransactionInjectSuccess {
571 rpc_id: *rpc_id,
572 response,
573 });
574 }
575 RpcAction::TransactionInjectRejected { rpc_id, response } => {
576 let Some(rpc) = state.requests.get_mut(rpc_id) else {
577 return;
578 };
579 rpc.status = RpcRequestStatus::Success { time: meta.time() };
580
581 let dispatcher = state_context.into_dispatcher();
582 let response = response
583 .clone()
584 .into_iter()
585 .map(|(cmd, failure)| (cmd.data, failure))
586 .collect();
587
588 dispatcher.push(RpcEffectfulAction::TransactionInjectRejected {
589 rpc_id: *rpc_id,
590 response,
591 });
592 }
593 RpcAction::TransactionInjectFailure { rpc_id, errors } => {
594 let Some(rpc) = state.requests.get_mut(rpc_id) else {
595 return;
596 };
597 rpc.status = RpcRequestStatus::Error {
598 time: meta.time(),
599 error: "Transaction injection failed".to_string(),
600 };
601
602 let dispatcher = state_context.into_dispatcher();
603 dispatcher.push(RpcEffectfulAction::TransactionInjectFailure {
604 rpc_id: *rpc_id,
605 errors: errors.clone(),
606 });
607 }
608 RpcAction::TransitionFrontierUserCommandsGet { rpc_id } => {
609 let (dispatcher, state) = state_context.into_dispatcher_and_state();
610
611 let commands = state
612 .transition_frontier
613 .best_chain
614 .iter()
615 .flat_map(|block| block.body().commands_iter().map(|v| v.data.clone()))
616 .collect::<Vec<_>>();
617
618 dispatcher.push(RpcEffectfulAction::TransitionFrontierUserCommandsGet {
619 rpc_id: *rpc_id,
620 commands,
621 });
622 }
623 RpcAction::BestChain { rpc_id, max_length } => {
624 let (dispatcher, state) = state_context.into_dispatcher_and_state();
625
626 let best_chain = state
627 .transition_frontier
628 .best_chain
629 .iter()
630 .rev()
631 .take(*max_length as usize)
632 .cloned()
633 .rev()
634 .collect();
635
636 dispatcher.push(RpcEffectfulAction::BestChain {
637 rpc_id: *rpc_id,
638 best_chain,
639 });
640 }
641 RpcAction::ConsensusConstantsGet { rpc_id } => {
642 let (dispatcher, state) = state_context.into_dispatcher_and_state();
643 let response = state.config.consensus_constants.clone();
644 dispatcher.push(RpcEffectfulAction::ConsensusConstantsGet {
645 rpc_id: *rpc_id,
646 response,
647 });
648 }
649 RpcAction::TransactionStatusGet { rpc_id, tx } => {
650 let dispatcher = state_context.into_dispatcher();
651 dispatcher.push(RpcEffectfulAction::TransactionStatusGet {
652 rpc_id: *rpc_id,
653 tx: tx.clone(),
654 });
655 }
656 RpcAction::BlockGet { rpc_id, query } => {
657 let (dispatcher, state) = state_context.into_dispatcher_and_state();
658
659 let find_fn = |block: &&AppliedBlock| match query {
660 GetBlockQuery::Hash(hash) => block.hash() == hash,
661 GetBlockQuery::Height(height) => block.height() == *height,
662 };
663
664 let block = state
665 .transition_frontier
666 .best_chain
667 .iter()
668 .find(find_fn)
669 .cloned();
670
671 dispatcher.push(RpcEffectfulAction::BlockGet {
672 rpc_id: *rpc_id,
673 block,
674 });
675 }
676 RpcAction::P2pConnectionIncomingAnswerReady {
677 rpc_id,
678 answer,
679 peer_id,
680 } => {
681 let dispatcher = state_context.into_dispatcher();
682 dispatcher.push(RpcAction::P2pConnectionIncomingRespond {
683 rpc_id: *rpc_id,
684 response: answer.clone(),
685 });
686 dispatcher
687 .push(P2pConnectionIncomingAction::AnswerSendSuccess { peer_id: *peer_id });
688 }
689 RpcAction::PooledUserCommands { rpc_id, query } => {
690 let (dispatcher, state) = state_context.into_dispatcher_and_state();
691
692 let PooledCommandsQuery {
693 public_key,
694 hashes,
695 ids,
696 } = query;
697
698 let all_transactions = state.transaction_pool.get_all_transactions();
699
700 let mut user_commands: Vec<_> = all_transactions
701 .into_iter()
702 .filter_map(|tx| match tx.data {
703 valid::UserCommand::SignedCommand(signed_command) => Some((
704 tx.hash,
705 MinaBaseSignedCommandStableV2::from(*signed_command),
706 )),
707 valid::UserCommand::ZkAppCommand(_) => None,
708 })
709 .collect();
710
711 if let Some(pk) = public_key {
712 let pk = NonZeroCurvePoint::from(pk.clone());
713 user_commands.retain(|(_, tx)| tx.signer == pk)
714 }
715
716 if let Some(hashes) = hashes {
717 user_commands.retain(|(hash, _)| hashes.contains(hash))
718 }
719
720 if let Some(ids) = ids {
721 user_commands.retain(|(_, tx)| ids.contains(tx))
722 }
723
724 dispatcher.push(RpcEffectfulAction::PooledUserCommands {
725 rpc_id: *rpc_id,
726 user_commands: user_commands.into_iter().map(|(_, tx)| tx).collect(),
727 });
728 }
729 RpcAction::GenesisBlock { rpc_id } => {
730 let (dispatcher, state) = state_context.into_dispatcher_and_state();
731 let genesis_block = state.genesis_block();
732 dispatcher.push(RpcEffectfulAction::GenesisBlock {
733 rpc_id: *rpc_id,
734 genesis_block,
735 });
736 }
737 RpcAction::PooledZkappCommands { rpc_id, query } => {
738 let (dispatcher, state) = state_context.into_dispatcher_and_state();
739
740 let PooledCommandsQuery {
741 public_key,
742 hashes,
743 ids,
744 } = query;
745
746 let all_transactions = state.transaction_pool.get_all_transactions();
747
748 let mut zkapp_commands: Vec<_> = all_transactions
749 .into_iter()
750 .filter_map(|tx| match tx.data {
751 valid::UserCommand::SignedCommand(_) => None,
752 valid::UserCommand::ZkAppCommand(zkapp) => Some((
753 tx.hash,
754 MinaBaseZkappCommandTStableV1WireStableV1::from(&zkapp.zkapp_command),
755 )),
756 })
757 .collect();
758
759 if let Some(pk) = public_key {
760 let pk = NonZeroCurvePoint::from(pk.clone());
761 zkapp_commands.retain(|(_, tx)| tx.fee_payer.body.public_key == pk);
762 }
763
764 if let Some(hashes) = hashes {
765 zkapp_commands.retain(|(hash, _)| hashes.contains(hash));
766 }
767
768 if let Some(ids) = ids {
769 zkapp_commands.retain(|(_, tx)| ids.contains(tx));
770 }
771
772 dispatcher.push(RpcEffectfulAction::PooledZkappCommands {
773 rpc_id: *rpc_id,
774 zkapp_commands: zkapp_commands.into_iter().map(|(_, tx)| tx).collect(),
775 });
776 }
777 RpcAction::ConsensusTimeGet { rpc_id, query } => {
778 let (dispatcher, state) = state_context.into_dispatcher_and_state();
779 let consensus_time = match query {
780 ConsensusTimeQuery::Now => state.consensus_time_now(),
781 ConsensusTimeQuery::BestTip => state.consensus_time_best_tip(),
782 };
783 println!("consensus_time: {:?}", consensus_time);
784 dispatcher.push(RpcEffectfulAction::ConsensusTimeGet {
785 rpc_id: *rpc_id,
786 consensus_time,
787 });
788 }
789 RpcAction::LedgerStatusGetInit {
790 rpc_id,
791 ledger_hash,
792 } => {
793 let rpc_state = RpcRequestState {
794 req: RpcRequest::LedgerStatusGet(ledger_hash.clone()),
795 status: RpcRequestStatus::Init { time: meta.time() },
796 data: Default::default(),
797 };
798 state.requests.insert(*rpc_id, rpc_state);
799
800 let (dispatcher, state) = state_context.into_dispatcher_and_state();
801 let ledger_hash = if let Some(best_tip) = state.transition_frontier.best_tip() {
802 best_tip.merkle_root_hash()
803 } else {
804 return;
805 };
806
807 dispatcher.push(LedgerReadAction::Init {
808 request: LedgerReadRequest::GetLedgerStatus(*rpc_id, ledger_hash.clone()),
809 callback: LedgerReadInitCallback::RpcLedgerStatusGetPending {
810 callback: redux::callback!(
811 on_ledger_read_init_rpc_actions_get_init(rpc_id: RequestId<RpcIdType>) -> crate::Action{
812 RpcAction::LedgerStatusGetPending { rpc_id }
813 }
814 ),
815 args: *rpc_id,
816 },
817 })
818 }
819 RpcAction::LedgerStatusGetPending { rpc_id } => {
820 let Some(rpc) = state.requests.get_mut(rpc_id) else {
821 return;
822 };
823 rpc.status = RpcRequestStatus::Pending { time: meta.time() };
824 }
825 RpcAction::LedgerStatusGetSuccess { rpc_id, response } => {
826 let Some(rpc) = state.requests.get_mut(rpc_id) else {
827 return;
828 };
829 rpc.status = RpcRequestStatus::Success { time: meta.time() };
830
831 let dispatcher = state_context.into_dispatcher();
832 dispatcher.push(RpcEffectfulAction::LedgerStatusGetSuccess {
833 rpc_id: *rpc_id,
834 response: response.clone(),
835 });
836 }
837 RpcAction::LedgerAccountDelegatorsGetInit {
838 rpc_id,
839 ledger_hash,
840 account_id,
841 } => {
842 let rpc_state = RpcRequestState {
843 req: RpcRequest::LedgerAccountDelegatorsGet(
844 ledger_hash.clone(),
845 account_id.clone(),
846 ),
847 status: RpcRequestStatus::Init { time: meta.time() },
848 data: Default::default(),
849 };
850 state.requests.insert(*rpc_id, rpc_state);
851
852 let dispatcher = state_context.into_dispatcher();
853
854 dispatcher.push(LedgerReadAction::Init {
855 request: LedgerReadRequest::GetAccountDelegators(*rpc_id, ledger_hash.clone(), account_id.clone()),
856 callback: LedgerReadInitCallback::RpcLedgerAccountDelegatorsGetPending {
857 callback: redux::callback!(
858 on_ledger_read_init_rpc_actions_get_init(rpc_id: RequestId<RpcIdType>) -> crate::Action{
859 RpcAction::LedgerAccountDelegatorsGetPending { rpc_id }
860 }
861 ),
862 args: *rpc_id,
863 },
864 })
865 }
866 RpcAction::LedgerAccountDelegatorsGetPending { rpc_id } => {
867 let Some(rpc) = state.requests.get_mut(rpc_id) else {
868 return;
869 };
870 rpc.status = RpcRequestStatus::Pending { time: meta.time() };
871 }
872 RpcAction::LedgerAccountDelegatorsGetSuccess { rpc_id, response } => {
873 let Some(rpc) = state.requests.get_mut(rpc_id) else {
874 return;
875 };
876 rpc.status = RpcRequestStatus::Success { time: meta.time() };
877
878 let dispatcher = state_context.into_dispatcher();
879 dispatcher.push(RpcEffectfulAction::LedgerAccountDelegatorsGetSuccess {
880 rpc_id: *rpc_id,
881 response: response.clone(),
882 });
883 }
884 }
885 }
886}
887
888pub fn collect_rpc_peers_info(state: &crate::State) -> Vec<RpcPeerInfo> {
889 state.p2p.ready().map_or_else(Vec::new, |p2p| {
890 p2p.peers
891 .iter()
892 .map(|(peer_id, state)| {
893 let best_tip = state.status.as_ready().and_then(|r| r.best_tip.as_ref());
894 let (connection_status, time, incoming, connecting_details) = match &state.status {
895 p2p::P2pPeerStatus::Connecting(c) => match c {
896 p2p::connection::P2pConnectionState::Outgoing(o) => (
897 PeerConnectionStatus::Connecting,
898 o.time().into(),
899 false,
900 Some(format!("{o:?}")),
901 ),
902 p2p::connection::P2pConnectionState::Incoming(i) => (
903 PeerConnectionStatus::Connecting,
904 i.time().into(),
905 true,
906 Some(format!("{i:?}")),
907 ),
908 },
909 p2p::P2pPeerStatus::Disconnecting { time } => (
910 PeerConnectionStatus::Disconnecting,
911 (*time).into(),
912 false,
913 None,
914 ),
915 p2p::P2pPeerStatus::Disconnected { time } => (
916 PeerConnectionStatus::Disconnected,
917 (*time).into(),
918 false,
919 None,
920 ),
921 p2p::P2pPeerStatus::Ready(r) => (
922 PeerConnectionStatus::Connected,
923 r.connected_since.into(),
924 r.is_incoming,
925 None,
926 ),
927 };
928 RpcPeerInfo {
929 peer_id: *peer_id,
930 connection_status,
931 connecting_details,
932 address: state.dial_opts.as_ref().map(|opts| opts.to_string()),
933 is_libp2p: state.is_libp2p,
934 incoming,
935 best_tip: best_tip.map(|bt| bt.hash.clone()),
936 best_tip_height: best_tip.map(|bt| bt.height()),
937 best_tip_global_slot: best_tip.map(|bt| bt.global_slot_since_genesis()),
938 best_tip_timestamp: best_tip.map(|bt| bt.timestamp().into()),
939 time,
940 }
941 })
942 .collect()
943 })
944}