node/rpc/
rpc_reducer.rs

1use ledger::scan_state::transaction_logic::valid;
2use mina_p2p_messages::v2::{
3    MinaBaseSignedCommandStableV2, MinaBaseZkappCommandTStableV1WireStableV1, NonZeroCurvePoint,
4    TransactionSnarkWorkTStableV2,
5};
6use openmina_core::{
7    block::AppliedBlock,
8    bug_condition,
9    requests::{RequestId, RpcId, RpcIdType},
10    transaction::{TransactionPoolMessageSource, TransactionWithHash},
11};
12use p2p::{
13    connection::{incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction},
14    webrtc::P2pConnectionResponse,
15    PeerId,
16};
17use redux::ActionWithMeta;
18
19use crate::{
20    ledger::read::{LedgerReadAction, LedgerReadInitCallback, LedgerReadRequest},
21    p2p_ready,
22    rpc::{GetBlockQuery, PooledCommandsQuery},
23    rpc_effectful::RpcEffectfulAction,
24    TransactionPoolAction,
25};
26
27use super::{
28    ConsensusTimeQuery, PeerConnectionStatus, RpcAction, RpcPeerInfo, RpcRequest,
29    RpcRequestExtraData, RpcRequestState, RpcRequestStatus, RpcScanStateSummaryGetQuery,
30    RpcSnarkerConfig, RpcState,
31};
32
33impl RpcState {
34    pub fn reducer(mut state_context: crate::Substate<Self>, action: ActionWithMeta<&RpcAction>) {
35        let Ok(state) = state_context.get_substate_mut() else {
36            return;
37        };
38
39        let (action, meta) = action.split();
40        match action {
41            RpcAction::GlobalStateGet { rpc_id, filter } => {
42                let dispatcher = state_context.into_dispatcher();
43                dispatcher.push(RpcEffectfulAction::GlobalStateGet {
44                    rpc_id: *rpc_id,
45                    filter: filter.clone(),
46                });
47            }
48            RpcAction::StatusGet { rpc_id } => {
49                let dispatcher = state_context.into_dispatcher();
50                dispatcher.push(RpcEffectfulAction::StatusGet { rpc_id: *rpc_id });
51            }
52            RpcAction::HeartbeatGet { rpc_id } => {
53                let dispatcher = state_context.into_dispatcher();
54                dispatcher.push(RpcEffectfulAction::HeartbeatGet { rpc_id: *rpc_id });
55            }
56            RpcAction::ActionStatsGet { rpc_id, query } => {
57                let dispatcher = state_context.into_dispatcher();
58                dispatcher.push(RpcEffectfulAction::ActionStatsGet {
59                    rpc_id: *rpc_id,
60                    query: *query,
61                });
62            }
63            RpcAction::SyncStatsGet { rpc_id, query } => {
64                let dispatcher = state_context.into_dispatcher();
65                dispatcher.push(RpcEffectfulAction::SyncStatsGet {
66                    rpc_id: *rpc_id,
67                    query: *query,
68                });
69            }
70            RpcAction::BlockProducerStatsGet { rpc_id } => {
71                let dispatcher = state_context.into_dispatcher();
72                dispatcher.push(RpcEffectfulAction::BlockProducerStatsGet { rpc_id: *rpc_id });
73            }
74            RpcAction::MessageProgressGet { rpc_id } => {
75                let dispatcher = state_context.into_dispatcher();
76                dispatcher.push(RpcEffectfulAction::MessageProgressGet { rpc_id: *rpc_id });
77            }
78            RpcAction::PeersGet { rpc_id } => {
79                let (dispatcher, state) = state_context.into_dispatcher_and_state();
80                let peers = collect_rpc_peers_info(state);
81                dispatcher.push(RpcEffectfulAction::PeersGet {
82                    rpc_id: *rpc_id,
83                    peers,
84                });
85            }
86            RpcAction::P2pConnectionOutgoingInit { rpc_id, opts } => {
87                let rpc_state = RpcRequestState {
88                    req: RpcRequest::P2pConnectionOutgoing(opts.clone()),
89                    status: RpcRequestStatus::Init { time: meta.time() },
90                    data: Default::default(),
91                };
92                state.requests.insert(*rpc_id, rpc_state);
93
94                let dispatcher = state_context.into_dispatcher();
95
96                dispatcher.push(P2pConnectionOutgoingAction::Init {
97                    opts: opts.clone(),
98                    rpc_id: Some(*rpc_id),
99                    on_success: Some(redux::callback!(
100                        on_p2p_connection_outgoing_rpc_connection_success((peer_id: PeerId, rpc_id: Option<RpcId>)) -> crate::Action {
101                            let Some(rpc_id) = rpc_id else {
102                                unreachable!("RPC ID not provided");
103                            };
104
105                            RpcAction::P2pConnectionOutgoingPending{ rpc_id }
106                        }
107                    )),
108                });
109            }
110            RpcAction::P2pConnectionOutgoingPending { rpc_id } => {
111                let Some(rpc) = state.requests.get_mut(rpc_id) else {
112                    bug_condition!(
113                        "Rpc state not found for RpcAction::P2pConnectionOutgoingPending({})",
114                        rpc_id
115                    );
116                    return;
117                };
118                rpc.status = RpcRequestStatus::Pending { time: meta.time() };
119            }
120            RpcAction::P2pConnectionOutgoingError { rpc_id, error } => {
121                let Some(rpc) = state.requests.get_mut(rpc_id) else {
122                    bug_condition!(
123                        "Rpc state not found for RpcAction::P2pConnectionOutgoingError({})",
124                        rpc_id
125                    );
126                    return;
127                };
128                rpc.status = RpcRequestStatus::Error {
129                    time: meta.time(),
130                    error: format!("{:?}", error),
131                };
132
133                let dispatcher = state_context.into_dispatcher();
134                dispatcher.push(RpcEffectfulAction::P2pConnectionOutgoingError {
135                    rpc_id: *rpc_id,
136                    error: format!("{:?}", error),
137                });
138            }
139            RpcAction::P2pConnectionOutgoingSuccess { rpc_id } => {
140                let Some(rpc) = state.requests.get_mut(rpc_id) else {
141                    bug_condition!(
142                        "Rpc state not found for RpcAction::P2pConnectionOutgoingSuccess({})",
143                        rpc_id
144                    );
145                    return;
146                };
147                rpc.status = RpcRequestStatus::Success { time: meta.time() };
148                let dispatcher = state_context.into_dispatcher();
149                dispatcher
150                    .push(RpcEffectfulAction::P2pConnectionOutgoingSuccess { rpc_id: *rpc_id });
151            }
152            RpcAction::P2pConnectionIncomingInit { rpc_id, opts } => {
153                let rpc_state = RpcRequestState {
154                    req: RpcRequest::P2pConnectionIncoming(opts.clone()),
155                    status: RpcRequestStatus::Init { time: meta.time() },
156                    data: Default::default(),
157                };
158                state.requests.insert(*rpc_id, rpc_state);
159
160                let (dispatcher, state) = state_context.into_dispatcher_and_state();
161                let p2p = p2p_ready!(state.p2p, meta.time());
162
163                match p2p.incoming_accept(opts.peer_id, &opts.offer) {
164                    Ok(_) => {
165                        dispatcher.push(P2pConnectionIncomingAction::Init {
166                            opts: opts.clone(),
167                            rpc_id: Some(*rpc_id),
168                        });
169                        dispatcher
170                            .push(RpcAction::P2pConnectionIncomingPending { rpc_id: *rpc_id });
171                    }
172                    Err(reason) => {
173                        let response = P2pConnectionResponse::Rejected(reason);
174                        dispatcher.push(RpcAction::P2pConnectionIncomingRespond {
175                            rpc_id: *rpc_id,
176                            response,
177                        });
178                    }
179                }
180            }
181            RpcAction::P2pConnectionIncomingPending { rpc_id } => {
182                let Some(rpc) = state.requests.get_mut(rpc_id) else {
183                    bug_condition!(
184                        "Rpc state not found for RpcAction::P2pConnectionIncomingPending({})",
185                        rpc_id
186                    );
187                    return;
188                };
189                rpc.status = RpcRequestStatus::Pending { time: meta.time() };
190            }
191            RpcAction::P2pConnectionIncomingRespond { rpc_id, response } => {
192                let dispatcher = state_context.into_dispatcher();
193                dispatcher.push(RpcEffectfulAction::P2pConnectionIncomingRespond {
194                    rpc_id: *rpc_id,
195                    response: response.clone(),
196                });
197            }
198            RpcAction::P2pConnectionIncomingError { rpc_id, error } => {
199                let Some(rpc) = state.requests.get_mut(rpc_id) else {
200                    bug_condition!(
201                        "Rpc state not found for RpcAction::P2pConnectionIncomingError({})",
202                        rpc_id
203                    );
204                    return;
205                };
206                rpc.status = RpcRequestStatus::Error {
207                    time: meta.time(),
208                    error: format!("{:?}", error),
209                };
210
211                let dispatcher = state_context.into_dispatcher();
212                dispatcher.push(RpcEffectfulAction::P2pConnectionIncomingError {
213                    rpc_id: *rpc_id,
214                    error: error.to_owned(),
215                });
216            }
217            RpcAction::P2pConnectionIncomingSuccess { rpc_id } => {
218                let Some(rpc) = state.requests.get_mut(rpc_id) else {
219                    bug_condition!(
220                        "Rpc state not found for RpcAction::P2pConnectionIncomingSuccess({})",
221                        rpc_id
222                    );
223                    return;
224                };
225                rpc.status = RpcRequestStatus::Success { time: meta.time() };
226                let dispatcher = state_context.into_dispatcher();
227                dispatcher
228                    .push(RpcEffectfulAction::P2pConnectionIncomingSuccess { rpc_id: *rpc_id });
229            }
230            RpcAction::ScanStateSummaryGetInit { rpc_id, query } => {
231                let rpc_state = RpcRequestState {
232                    req: RpcRequest::ScanStateSummaryGet(query.clone()),
233                    status: RpcRequestStatus::Init { time: meta.time() },
234                    data: Default::default(),
235                };
236                state.requests.insert(*rpc_id, rpc_state);
237
238                let dispatcher = state_context.into_dispatcher();
239                dispatcher.push(RpcAction::ScanStateSummaryLedgerGetInit { rpc_id: *rpc_id });
240            }
241            RpcAction::ScanStateSummaryLedgerGetInit { rpc_id } => {
242                let (dispatcher, state) = state_context.into_dispatcher_and_state();
243                let transition_frontier = &state.transition_frontier;
244
245                let Some(query) = None.or_else(|| {
246                    let req = state.rpc.requests.get(rpc_id)?;
247                    match &req.req {
248                        RpcRequest::ScanStateSummaryGet(query) => Some(query),
249                        _ => None,
250                    }
251                }) else {
252                    return;
253                };
254
255                let block = match query {
256                    RpcScanStateSummaryGetQuery::ForBestTip => {
257                        transition_frontier.best_tip_breadcrumb()
258                    }
259                    RpcScanStateSummaryGetQuery::ForBlockWithHash(hash) => transition_frontier
260                        .best_chain
261                        .iter()
262                        .rev()
263                        .find(|b| b.hash() == hash),
264                    RpcScanStateSummaryGetQuery::ForBlockWithHeight(height) => transition_frontier
265                        .best_chain
266                        .iter()
267                        .rev()
268                        .find(|b| b.height() == *height),
269                };
270                let block = match block {
271                    Some(v) => v.clone(),
272                    None => {
273                        dispatcher.push(RpcAction::ScanStateSummaryGetPending {
274                            rpc_id: *rpc_id,
275                            block: None,
276                        });
277                        dispatcher.push(RpcAction::ScanStateSummaryGetSuccess {
278                            rpc_id: *rpc_id,
279                            scan_state: Ok(Vec::new()),
280                        });
281                        return;
282                    }
283                };
284
285                dispatcher.push(LedgerReadAction::Init {
286                    request: LedgerReadRequest::ScanStateSummary(
287                        block.staged_ledger_hashes().clone(),
288                    ),
289                    callback: LedgerReadInitCallback::RpcScanStateSummaryGetPending {
290                        callback: redux::callback!(
291                            on_ledger_read_init_rpc_scan_state_summary_get_pending((rpc_id: RequestId<RpcIdType>, block: AppliedBlock)) -> crate::Action{
292                                RpcAction::ScanStateSummaryGetPending { rpc_id, block: Some(block) }
293                            }
294                        ),
295                        args: (*rpc_id, block),
296                    },
297                });
298            }
299            RpcAction::ScanStateSummaryGetPending { rpc_id, block } => {
300                let Some(rpc) = state.requests.get_mut(rpc_id) else {
301                    bug_condition!(
302                        "Rpc state not found for RpcAction::ScanStateSummaryGetPending({})",
303                        rpc_id
304                    );
305                    return;
306                };
307                rpc.status = RpcRequestStatus::Pending { time: meta.time() };
308                rpc.data = RpcRequestExtraData::FullBlockOpt(block.clone());
309            }
310            RpcAction::ScanStateSummaryGetSuccess { rpc_id, scan_state } => {
311                let Some(rpc) = state.requests.get_mut(rpc_id) else {
312                    bug_condition!(
313                        "Rpc state not found for RpcAction::ScanStateSummaryGetSuccess({})",
314                        rpc_id
315                    );
316                    return;
317                };
318                rpc.status = RpcRequestStatus::Success { time: meta.time() };
319                let dispatcher = state_context.into_dispatcher();
320                dispatcher.push(RpcEffectfulAction::ScanStateSummaryGetSuccess {
321                    rpc_id: *rpc_id,
322                    scan_state: scan_state.clone(),
323                });
324            }
325            RpcAction::SnarkPoolAvailableJobsGet { rpc_id } => {
326                let dispatcher = state_context.into_dispatcher();
327                dispatcher.push(RpcEffectfulAction::SnarkPoolAvailableJobsGet { rpc_id: *rpc_id });
328            }
329            RpcAction::SnarkPoolJobGet { rpc_id, job_id } => {
330                let dispatcher = state_context.into_dispatcher();
331                dispatcher.push(RpcEffectfulAction::SnarkPoolJobGet {
332                    rpc_id: *rpc_id,
333                    job_id: job_id.clone(),
334                });
335            }
336            RpcAction::SnarkPoolCompletedJobsGet { rpc_id } => {
337                let (dispatcher, state) = state_context.into_dispatcher_and_state();
338
339                let jobs = state
340                    .snark_pool
341                    .completed_snarks_iter()
342                    .map(|s| TransactionSnarkWorkTStableV2::from(s.clone()))
343                    .collect::<Vec<_>>();
344
345                dispatcher.push(RpcEffectfulAction::SnarkPoolCompletedJobsGet {
346                    rpc_id: *rpc_id,
347                    jobs,
348                });
349            }
350            RpcAction::SnarkPoolPendingJobsGet { rpc_id } => {
351                let (dispatcher, state) = state_context.into_dispatcher_and_state();
352
353                let jobs = state
354                    .snark_pool
355                    .available_jobs_iter()
356                    .cloned()
357                    .collect::<Vec<_>>();
358
359                dispatcher.push(RpcEffectfulAction::SnarkPoolPendingJobsGet {
360                    rpc_id: *rpc_id,
361                    jobs,
362                })
363            }
364            RpcAction::SnarkerConfigGet { rpc_id } => {
365                let (dispatcher, state) = state_context.into_dispatcher_and_state();
366
367                let config = state
368                    .config
369                    .snarker
370                    .as_ref()
371                    .map(|config| RpcSnarkerConfig {
372                        public_key: config.public_key.as_ref().clone(),
373                        fee: config.fee.clone(),
374                    });
375
376                dispatcher.push(RpcEffectfulAction::SnarkerConfigGet {
377                    rpc_id: *rpc_id,
378                    config,
379                });
380            }
381            RpcAction::SnarkerJobCommit { rpc_id, job_id } => {
382                let dispatcher = state_context.into_dispatcher();
383                dispatcher.push(RpcEffectfulAction::SnarkerJobCommit {
384                    rpc_id: *rpc_id,
385                    job_id: job_id.clone(),
386                });
387            }
388            RpcAction::SnarkerJobSpec { rpc_id, job_id } => {
389                let dispatcher = state_context.into_dispatcher();
390                dispatcher.push(RpcEffectfulAction::SnarkerJobSpec {
391                    rpc_id: *rpc_id,
392                    job_id: job_id.clone(),
393                });
394            }
395            RpcAction::SnarkerWorkersGet { rpc_id } => {
396                let (dispatcher, state) = state_context.into_dispatcher_and_state();
397                let snark_worker = state.external_snark_worker.0.clone();
398                dispatcher.push(RpcEffectfulAction::SnarkerWorkersGet {
399                    rpc_id: *rpc_id,
400                    snark_worker,
401                });
402            }
403            RpcAction::HealthCheck { rpc_id } => {
404                let (dispatcher, state) = state_context.into_dispatcher_and_state();
405
406                let has_peers = state.p2p.ready_peers_iter().map(|(peer_id, _peer)| {
407                    openmina_core::log::debug!(meta.time(); "found ready peer: {peer_id}")
408                })
409                .next()
410                .ok_or_else(|| {
411                    openmina_core::log::warn!(meta.time(); "no ready peers");
412                    String::from("no ready peers")
413                });
414
415                dispatcher.push(RpcEffectfulAction::HealthCheck {
416                    rpc_id: *rpc_id,
417                    has_peers,
418                });
419            }
420            RpcAction::ReadinessCheck { rpc_id } => {
421                let dispatcher = state_context.into_dispatcher();
422                dispatcher.push(RpcEffectfulAction::ReadinessCheck { rpc_id: *rpc_id });
423            }
424            RpcAction::DiscoveryRoutingTable { rpc_id } => {
425                let (dispatcher, state) = state_context.into_dispatcher_and_state();
426
427                let response = state
428                    .p2p
429                    .ready()
430                    .and_then(|p2p| p2p.network.scheduler.discovery_state())
431                    .and_then(|discovery_state| {
432                        match (&discovery_state.routing_table).try_into() {
433                            Ok(resp) => Some(resp),
434                            Err(err) => {
435                                bug_condition!(
436                                    "{:?} error converting routing table into response: {:?}",
437                                    err,
438                                    action
439                                );
440                                None
441                            }
442                        }
443                    });
444
445                dispatcher.push(RpcEffectfulAction::DiscoveryRoutingTable {
446                    rpc_id: *rpc_id,
447                    response,
448                });
449            }
450            RpcAction::DiscoveryBoostrapStats { rpc_id } => {
451                let (dispatcher, state) = state_context.into_dispatcher_and_state();
452
453                let response = state
454                    .p2p
455                    .ready()
456                    .and_then(|p2p| p2p.network.scheduler.discovery_state())
457                    .and_then(|discovery_state: &p2p::P2pNetworkKadState| {
458                        discovery_state.bootstrap_stats().cloned()
459                    });
460
461                dispatcher.push(RpcEffectfulAction::DiscoveryBoostrapStats {
462                    rpc_id: *rpc_id,
463                    response,
464                });
465            }
466            RpcAction::Finish { rpc_id } => {
467                state.requests.remove(rpc_id);
468            }
469            RpcAction::TransactionPool { rpc_id } => {
470                let (dispatcher, state) = state_context.into_dispatcher_and_state();
471                let response = state.transaction_pool.get_all_transactions();
472                dispatcher.push(RpcEffectfulAction::TransactionPool {
473                    rpc_id: *rpc_id,
474                    response,
475                });
476            }
477            RpcAction::LedgerAccountsGetInit {
478                rpc_id,
479                account_query,
480            } => {
481                let rpc_state = RpcRequestState {
482                    req: RpcRequest::LedgerAccountsGet(account_query.clone()),
483                    status: RpcRequestStatus::Init { time: meta.time() },
484                    data: Default::default(),
485                };
486                state.requests.insert(*rpc_id, rpc_state);
487
488                let (dispatcher, state) = state_context.into_dispatcher_and_state();
489                let ledger_hash = if let Some(best_tip) = state.transition_frontier.best_tip() {
490                    best_tip.merkle_root_hash()
491                } else {
492                    return;
493                };
494
495                dispatcher.push(LedgerReadAction::Init {
496                    request: LedgerReadRequest::AccountsForRpc(
497                        *rpc_id,
498                        ledger_hash.clone(),
499                        account_query.clone(),
500                    ),
501                    callback: LedgerReadInitCallback::RpcLedgerAccountsGetPending {
502                        callback: redux::callback!(
503                            on_ledger_read_init_rpc_actions_get_init(rpc_id: RequestId<RpcIdType>) -> crate::Action{
504                                RpcAction::LedgerAccountsGetPending { rpc_id }
505                            }
506                        ),
507                        args: *rpc_id,
508                    },
509                })
510            }
511            RpcAction::LedgerAccountsGetPending { rpc_id } => {
512                let Some(rpc) = state.requests.get_mut(rpc_id) else {
513                    return;
514                };
515                rpc.status = RpcRequestStatus::Pending { time: meta.time() };
516            }
517            RpcAction::LedgerAccountsGetSuccess {
518                rpc_id,
519                account_query,
520                accounts,
521            } => {
522                let Some(rpc) = state.requests.get_mut(rpc_id) else {
523                    return;
524                };
525                rpc.status = RpcRequestStatus::Success { time: meta.time() };
526
527                let dispatcher = state_context.into_dispatcher();
528                dispatcher.push(RpcEffectfulAction::LedgerAccountsGetSuccess {
529                    rpc_id: *rpc_id,
530                    account_query: account_query.clone(),
531                    accounts: accounts.clone(),
532                });
533            }
534            RpcAction::TransactionInjectInit { rpc_id, commands } => {
535                let rpc_state = RpcRequestState {
536                    req: RpcRequest::TransactionInject(commands.clone()),
537                    status: RpcRequestStatus::Init { time: meta.time() },
538                    data: Default::default(),
539                };
540                state.requests.insert(*rpc_id, rpc_state);
541
542                let commands_with_hash = commands
543                    .clone()
544                    .into_iter()
545                    // TODO: do something it it cannot be hashed?
546                    .filter_map(|cmd| TransactionWithHash::try_new(cmd).ok())
547                    .collect();
548
549                let dispatcher = state_context.into_dispatcher();
550                dispatcher.push(RpcAction::TransactionInjectPending { rpc_id: *rpc_id });
551                dispatcher.push(TransactionPoolAction::StartVerify {
552                    commands: commands_with_hash,
553                    from_source: TransactionPoolMessageSource::rpc(*rpc_id),
554                });
555            }
556            RpcAction::TransactionInjectPending { rpc_id } => {
557                let Some(rpc) = state.requests.get_mut(rpc_id) else {
558                    return;
559                };
560                rpc.status = RpcRequestStatus::Pending { time: meta.time() };
561            }
562            RpcAction::TransactionInjectSuccess { rpc_id, response } => {
563                let Some(rpc) = state.requests.get_mut(rpc_id) else {
564                    return;
565                };
566                rpc.status = RpcRequestStatus::Success { time: meta.time() };
567
568                let dispatcher = state_context.into_dispatcher();
569                let response = response.clone().into_iter().map(|cmd| cmd.data).collect();
570                dispatcher.push(RpcEffectfulAction::TransactionInjectSuccess {
571                    rpc_id: *rpc_id,
572                    response,
573                });
574            }
575            RpcAction::TransactionInjectRejected { rpc_id, response } => {
576                let Some(rpc) = state.requests.get_mut(rpc_id) else {
577                    return;
578                };
579                rpc.status = RpcRequestStatus::Success { time: meta.time() };
580
581                let dispatcher = state_context.into_dispatcher();
582                let response = response
583                    .clone()
584                    .into_iter()
585                    .map(|(cmd, failure)| (cmd.data, failure))
586                    .collect();
587
588                dispatcher.push(RpcEffectfulAction::TransactionInjectRejected {
589                    rpc_id: *rpc_id,
590                    response,
591                });
592            }
593            RpcAction::TransactionInjectFailure { rpc_id, errors } => {
594                let Some(rpc) = state.requests.get_mut(rpc_id) else {
595                    return;
596                };
597                rpc.status = RpcRequestStatus::Error {
598                    time: meta.time(),
599                    error: "Transaction injection failed".to_string(),
600                };
601
602                let dispatcher = state_context.into_dispatcher();
603                dispatcher.push(RpcEffectfulAction::TransactionInjectFailure {
604                    rpc_id: *rpc_id,
605                    errors: errors.clone(),
606                });
607            }
608            RpcAction::TransitionFrontierUserCommandsGet { rpc_id } => {
609                let (dispatcher, state) = state_context.into_dispatcher_and_state();
610
611                let commands = state
612                    .transition_frontier
613                    .best_chain
614                    .iter()
615                    .flat_map(|block| block.body().commands_iter().map(|v| v.data.clone()))
616                    .collect::<Vec<_>>();
617
618                dispatcher.push(RpcEffectfulAction::TransitionFrontierUserCommandsGet {
619                    rpc_id: *rpc_id,
620                    commands,
621                });
622            }
623            RpcAction::BestChain { rpc_id, max_length } => {
624                let (dispatcher, state) = state_context.into_dispatcher_and_state();
625
626                let best_chain = state
627                    .transition_frontier
628                    .best_chain
629                    .iter()
630                    .rev()
631                    .take(*max_length as usize)
632                    .cloned()
633                    .rev()
634                    .collect();
635
636                dispatcher.push(RpcEffectfulAction::BestChain {
637                    rpc_id: *rpc_id,
638                    best_chain,
639                });
640            }
641            RpcAction::ConsensusConstantsGet { rpc_id } => {
642                let (dispatcher, state) = state_context.into_dispatcher_and_state();
643                let response = state.config.consensus_constants.clone();
644                dispatcher.push(RpcEffectfulAction::ConsensusConstantsGet {
645                    rpc_id: *rpc_id,
646                    response,
647                });
648            }
649            RpcAction::TransactionStatusGet { rpc_id, tx } => {
650                let dispatcher = state_context.into_dispatcher();
651                dispatcher.push(RpcEffectfulAction::TransactionStatusGet {
652                    rpc_id: *rpc_id,
653                    tx: tx.clone(),
654                });
655            }
656            RpcAction::BlockGet { rpc_id, query } => {
657                let (dispatcher, state) = state_context.into_dispatcher_and_state();
658
659                let find_fn = |block: &&AppliedBlock| match query {
660                    GetBlockQuery::Hash(hash) => block.hash() == hash,
661                    GetBlockQuery::Height(height) => block.height() == *height,
662                };
663
664                let block = state
665                    .transition_frontier
666                    .best_chain
667                    .iter()
668                    .find(find_fn)
669                    .cloned();
670
671                dispatcher.push(RpcEffectfulAction::BlockGet {
672                    rpc_id: *rpc_id,
673                    block,
674                });
675            }
676            RpcAction::P2pConnectionIncomingAnswerReady {
677                rpc_id,
678                answer,
679                peer_id,
680            } => {
681                let dispatcher = state_context.into_dispatcher();
682                dispatcher.push(RpcAction::P2pConnectionIncomingRespond {
683                    rpc_id: *rpc_id,
684                    response: answer.clone(),
685                });
686                dispatcher
687                    .push(P2pConnectionIncomingAction::AnswerSendSuccess { peer_id: *peer_id });
688            }
689            RpcAction::PooledUserCommands { rpc_id, query } => {
690                let (dispatcher, state) = state_context.into_dispatcher_and_state();
691
692                let PooledCommandsQuery {
693                    public_key,
694                    hashes,
695                    ids,
696                } = query;
697
698                let all_transactions = state.transaction_pool.get_all_transactions();
699
700                let mut user_commands: Vec<_> = all_transactions
701                    .into_iter()
702                    .filter_map(|tx| match tx.data {
703                        valid::UserCommand::SignedCommand(signed_command) => Some((
704                            tx.hash,
705                            MinaBaseSignedCommandStableV2::from(*signed_command),
706                        )),
707                        valid::UserCommand::ZkAppCommand(_) => None,
708                    })
709                    .collect();
710
711                if let Some(pk) = public_key {
712                    let pk = NonZeroCurvePoint::from(pk.clone());
713                    user_commands.retain(|(_, tx)| tx.signer == pk)
714                }
715
716                if let Some(hashes) = hashes {
717                    user_commands.retain(|(hash, _)| hashes.contains(hash))
718                }
719
720                if let Some(ids) = ids {
721                    user_commands.retain(|(_, tx)| ids.contains(tx))
722                }
723
724                dispatcher.push(RpcEffectfulAction::PooledUserCommands {
725                    rpc_id: *rpc_id,
726                    user_commands: user_commands.into_iter().map(|(_, tx)| tx).collect(),
727                });
728            }
729            RpcAction::GenesisBlock { rpc_id } => {
730                let (dispatcher, state) = state_context.into_dispatcher_and_state();
731                let genesis_block = state.genesis_block();
732                dispatcher.push(RpcEffectfulAction::GenesisBlock {
733                    rpc_id: *rpc_id,
734                    genesis_block,
735                });
736            }
737            RpcAction::PooledZkappCommands { rpc_id, query } => {
738                let (dispatcher, state) = state_context.into_dispatcher_and_state();
739
740                let PooledCommandsQuery {
741                    public_key,
742                    hashes,
743                    ids,
744                } = query;
745
746                let all_transactions = state.transaction_pool.get_all_transactions();
747
748                let mut zkapp_commands: Vec<_> = all_transactions
749                    .into_iter()
750                    .filter_map(|tx| match tx.data {
751                        valid::UserCommand::SignedCommand(_) => None,
752                        valid::UserCommand::ZkAppCommand(zkapp) => Some((
753                            tx.hash,
754                            MinaBaseZkappCommandTStableV1WireStableV1::from(&zkapp.zkapp_command),
755                        )),
756                    })
757                    .collect();
758
759                if let Some(pk) = public_key {
760                    let pk = NonZeroCurvePoint::from(pk.clone());
761                    zkapp_commands.retain(|(_, tx)| tx.fee_payer.body.public_key == pk);
762                }
763
764                if let Some(hashes) = hashes {
765                    zkapp_commands.retain(|(hash, _)| hashes.contains(hash));
766                }
767
768                if let Some(ids) = ids {
769                    zkapp_commands.retain(|(_, tx)| ids.contains(tx));
770                }
771
772                dispatcher.push(RpcEffectfulAction::PooledZkappCommands {
773                    rpc_id: *rpc_id,
774                    zkapp_commands: zkapp_commands.into_iter().map(|(_, tx)| tx).collect(),
775                });
776            }
777            RpcAction::ConsensusTimeGet { rpc_id, query } => {
778                let (dispatcher, state) = state_context.into_dispatcher_and_state();
779                let consensus_time = match query {
780                    ConsensusTimeQuery::Now => state.consensus_time_now(),
781                    ConsensusTimeQuery::BestTip => state.consensus_time_best_tip(),
782                };
783                println!("consensus_time: {:?}", consensus_time);
784                dispatcher.push(RpcEffectfulAction::ConsensusTimeGet {
785                    rpc_id: *rpc_id,
786                    consensus_time,
787                });
788            }
789            RpcAction::LedgerStatusGetInit {
790                rpc_id,
791                ledger_hash,
792            } => {
793                let rpc_state = RpcRequestState {
794                    req: RpcRequest::LedgerStatusGet(ledger_hash.clone()),
795                    status: RpcRequestStatus::Init { time: meta.time() },
796                    data: Default::default(),
797                };
798                state.requests.insert(*rpc_id, rpc_state);
799
800                let (dispatcher, state) = state_context.into_dispatcher_and_state();
801                let ledger_hash = if let Some(best_tip) = state.transition_frontier.best_tip() {
802                    best_tip.merkle_root_hash()
803                } else {
804                    return;
805                };
806
807                dispatcher.push(LedgerReadAction::Init {
808                    request: LedgerReadRequest::GetLedgerStatus(*rpc_id, ledger_hash.clone()),
809                    callback: LedgerReadInitCallback::RpcLedgerStatusGetPending {
810                        callback: redux::callback!(
811                            on_ledger_read_init_rpc_actions_get_init(rpc_id: RequestId<RpcIdType>) -> crate::Action{
812                                RpcAction::LedgerStatusGetPending { rpc_id }
813                            }
814                        ),
815                        args: *rpc_id,
816                    },
817                })
818            }
819            RpcAction::LedgerStatusGetPending { rpc_id } => {
820                let Some(rpc) = state.requests.get_mut(rpc_id) else {
821                    return;
822                };
823                rpc.status = RpcRequestStatus::Pending { time: meta.time() };
824            }
825            RpcAction::LedgerStatusGetSuccess { rpc_id, response } => {
826                let Some(rpc) = state.requests.get_mut(rpc_id) else {
827                    return;
828                };
829                rpc.status = RpcRequestStatus::Success { time: meta.time() };
830
831                let dispatcher = state_context.into_dispatcher();
832                dispatcher.push(RpcEffectfulAction::LedgerStatusGetSuccess {
833                    rpc_id: *rpc_id,
834                    response: response.clone(),
835                });
836            }
837            RpcAction::LedgerAccountDelegatorsGetInit {
838                rpc_id,
839                ledger_hash,
840                account_id,
841            } => {
842                let rpc_state = RpcRequestState {
843                    req: RpcRequest::LedgerAccountDelegatorsGet(
844                        ledger_hash.clone(),
845                        account_id.clone(),
846                    ),
847                    status: RpcRequestStatus::Init { time: meta.time() },
848                    data: Default::default(),
849                };
850                state.requests.insert(*rpc_id, rpc_state);
851
852                let dispatcher = state_context.into_dispatcher();
853
854                dispatcher.push(LedgerReadAction::Init {
855                    request: LedgerReadRequest::GetAccountDelegators(*rpc_id, ledger_hash.clone(), account_id.clone()),
856                    callback: LedgerReadInitCallback::RpcLedgerAccountDelegatorsGetPending {
857                        callback: redux::callback!(
858                            on_ledger_read_init_rpc_actions_get_init(rpc_id: RequestId<RpcIdType>) -> crate::Action{
859                                RpcAction::LedgerAccountDelegatorsGetPending { rpc_id }
860                            }
861                        ),
862                        args: *rpc_id,
863                    },
864                })
865            }
866            RpcAction::LedgerAccountDelegatorsGetPending { rpc_id } => {
867                let Some(rpc) = state.requests.get_mut(rpc_id) else {
868                    return;
869                };
870                rpc.status = RpcRequestStatus::Pending { time: meta.time() };
871            }
872            RpcAction::LedgerAccountDelegatorsGetSuccess { rpc_id, response } => {
873                let Some(rpc) = state.requests.get_mut(rpc_id) else {
874                    return;
875                };
876                rpc.status = RpcRequestStatus::Success { time: meta.time() };
877
878                let dispatcher = state_context.into_dispatcher();
879                dispatcher.push(RpcEffectfulAction::LedgerAccountDelegatorsGetSuccess {
880                    rpc_id: *rpc_id,
881                    response: response.clone(),
882                });
883            }
884        }
885    }
886}
887
888pub fn collect_rpc_peers_info(state: &crate::State) -> Vec<RpcPeerInfo> {
889    state.p2p.ready().map_or_else(Vec::new, |p2p| {
890        p2p.peers
891            .iter()
892            .map(|(peer_id, state)| {
893                let best_tip = state.status.as_ready().and_then(|r| r.best_tip.as_ref());
894                let (connection_status, time, incoming, connecting_details) = match &state.status {
895                    p2p::P2pPeerStatus::Connecting(c) => match c {
896                        p2p::connection::P2pConnectionState::Outgoing(o) => (
897                            PeerConnectionStatus::Connecting,
898                            o.time().into(),
899                            false,
900                            Some(format!("{o:?}")),
901                        ),
902                        p2p::connection::P2pConnectionState::Incoming(i) => (
903                            PeerConnectionStatus::Connecting,
904                            i.time().into(),
905                            true,
906                            Some(format!("{i:?}")),
907                        ),
908                    },
909                    p2p::P2pPeerStatus::Disconnecting { time } => (
910                        PeerConnectionStatus::Disconnecting,
911                        (*time).into(),
912                        false,
913                        None,
914                    ),
915                    p2p::P2pPeerStatus::Disconnected { time } => (
916                        PeerConnectionStatus::Disconnected,
917                        (*time).into(),
918                        false,
919                        None,
920                    ),
921                    p2p::P2pPeerStatus::Ready(r) => (
922                        PeerConnectionStatus::Connected,
923                        r.connected_since.into(),
924                        r.is_incoming,
925                        None,
926                    ),
927                };
928                RpcPeerInfo {
929                    peer_id: *peer_id,
930                    connection_status,
931                    connecting_details,
932                    address: state.dial_opts.as_ref().map(|opts| opts.to_string()),
933                    is_libp2p: state.is_libp2p,
934                    incoming,
935                    best_tip: best_tip.map(|bt| bt.hash.clone()),
936                    best_tip_height: best_tip.map(|bt| bt.height()),
937                    best_tip_global_slot: best_tip.map(|bt| bt.global_slot_since_genesis()),
938                    best_tip_timestamp: best_tip.map(|bt| bt.timestamp().into()),
939                    time,
940                }
941            })
942            .collect()
943    })
944}