node/
state.rs

1use std::{sync::Arc, time::Duration};
2
3use malloc_size_of_derive::MallocSizeOf;
4use mina_core::{
5    block::prevalidate::{prevalidate_block, BlockPrevalidationError},
6    consensus::ConsensusTime,
7    transaction::{TransactionInfo, TransactionWithHash},
8};
9use mina_p2p_messages::v2;
10use p2p::P2pNetworkPubsubMessageCacheId;
11use rand::prelude::*;
12
13use mina_core::{
14    block::{ArcBlockWithHash, BlockWithHash},
15    consensus::ConsensusConstants,
16    constants::constraint_constants,
17    requests::RpcId,
18    snark::{Snark, SnarkInfo, SnarkJobCommitment},
19    ChainId,
20};
21use p2p::{
22    bootstrap::P2pNetworkKadBootstrapState,
23    channels::{
24        rpc::{P2pRpcId, P2pRpcRequest, P2pRpcResponse},
25        streaming_rpc::P2pStreamingRpcResponseFull,
26    },
27    connection::{outgoing::P2pConnectionOutgoingError, P2pConnectionResponse},
28    network::identify::P2pNetworkIdentifyState,
29    P2pCallbacks, P2pConfig, P2pNetworkSchedulerState, P2pPeerState, P2pPeerStatusReady, PeerId,
30};
31use redux::{ActionMeta, EnablingCondition, Timestamp};
32use serde::{Deserialize, Serialize};
33use snark::{
34    block_verify::SnarkBlockVerifyState, user_command_verify::SnarkUserCommandVerifyState,
35    work_verify::SnarkWorkVerifyState,
36};
37
38use crate::{
39    block_producer::vrf_evaluator::BlockProducerVrfEvaluatorState,
40    config::GlobalConfig,
41    external_snark_worker::{ExternalSnarkWorker, ExternalSnarkWorkers},
42    ledger::{read::LedgerReadState, write::LedgerWriteState},
43    p2p::callbacks::P2pCallbacksAction,
44    snark_pool::candidate::SnarkPoolCandidateAction,
45    transaction_pool::{
46        candidate::{TransactionPoolCandidateAction, TransactionPoolCandidatesState},
47        TransactionPoolState,
48    },
49    transition_frontier::{
50        candidate::TransitionFrontierCandidateAction,
51        genesis::TransitionFrontierGenesisState,
52        sync::{
53            ledger::{
54                snarked::TransitionFrontierSyncLedgerSnarkedState,
55                staged::TransitionFrontierSyncLedgerStagedState, TransitionFrontierSyncLedgerState,
56            },
57            TransitionFrontierSyncState,
58        },
59    },
60    ActionWithMeta, RpcAction, SnarkPoolAction,
61};
62pub use crate::{
63    block_producer::BlockProducerState,
64    ledger::LedgerState,
65    p2p::P2pState,
66    rpc::RpcState,
67    snark::SnarkState,
68    snark_pool::{candidate::SnarkPoolCandidatesState, SnarkPoolState},
69    transition_frontier::{candidate::TransitionFrontierCandidatesState, TransitionFrontierState},
70    watched_accounts::WatchedAccountsState,
71    Config,
72};
73
74#[derive(Serialize, Deserialize, Debug, Clone)]
75pub struct State {
76    pub config: GlobalConfig,
77
78    pub p2p: P2p,
79    pub ledger: LedgerState,
80    pub snark: SnarkState,
81    pub transition_frontier: TransitionFrontierState,
82    pub snark_pool: SnarkPoolState,
83    pub external_snark_worker: ExternalSnarkWorkers,
84    pub transaction_pool: TransactionPoolState,
85    pub block_producer: BlockProducerState,
86    pub rpc: RpcState,
87
88    pub watched_accounts: WatchedAccountsState,
89
90    // TODO(binier): include action kind in `last_action`.
91    last_action: ActionMeta,
92    applied_actions_count: u64,
93}
94
95// Substate accessors that will be used in reducers
96use mina_core::{bug_condition, impl_substate_access, SubstateAccess};
97
98impl_substate_access!(State, SnarkState, snark);
99impl_substate_access!(State, SnarkBlockVerifyState, snark.block_verify);
100impl_substate_access!(State, SnarkWorkVerifyState, snark.work_verify);
101impl_substate_access!(
102    State,
103    SnarkUserCommandVerifyState,
104    snark.user_command_verify
105);
106impl_substate_access!(State, TransitionFrontierState, transition_frontier);
107impl_substate_access!(
108    State,
109    TransitionFrontierCandidatesState,
110    transition_frontier.candidates
111);
112impl_substate_access!(State, TransactionPoolState, transaction_pool);
113impl_substate_access!(
114    State,
115    TransactionPoolCandidatesState,
116    transaction_pool.candidates
117);
118impl_substate_access!(
119    State,
120    TransitionFrontierGenesisState,
121    transition_frontier.genesis
122);
123impl_substate_access!(State, TransitionFrontierSyncState, transition_frontier.sync);
124impl_substate_access!(State, SnarkPoolState, snark_pool);
125impl_substate_access!(State, SnarkPoolCandidatesState, snark_pool.candidates);
126impl_substate_access!(State, ExternalSnarkWorkers, external_snark_worker);
127impl_substate_access!(State, BlockProducerState, block_producer);
128impl_substate_access!(State, RpcState, rpc);
129impl_substate_access!(State, WatchedAccountsState, watched_accounts);
130impl_substate_access!(State, ExternalSnarkWorker, external_snark_worker.0);
131impl_substate_access!(State, LedgerState, ledger);
132impl_substate_access!(State, LedgerReadState, ledger.read);
133impl_substate_access!(State, LedgerWriteState, ledger.write);
134
135impl mina_core::SubstateAccess<P2pState> for State {
136    fn substate(&self) -> mina_core::SubstateResult<&P2pState> {
137        self.p2p
138            .ready()
139            .ok_or_else(|| "P2P state unavailable. P2P layer is not ready".to_owned())
140    }
141
142    fn substate_mut(&mut self) -> mina_core::SubstateResult<&mut P2pState> {
143        self.p2p
144            .ready_mut()
145            .ok_or_else(|| "P2P state unavailable. P2P layer is not ready".to_owned())
146    }
147}
148
149impl mina_core::SubstateAccess<TransitionFrontierSyncLedgerState> for State {
150    fn substate(&self) -> mina_core::SubstateResult<&TransitionFrontierSyncLedgerState> {
151        self.transition_frontier
152            .sync
153            .ledger()
154            .ok_or_else(|| "Ledger sync state unavailable".to_owned())
155    }
156
157    fn substate_mut(
158        &mut self,
159    ) -> mina_core::SubstateResult<&mut TransitionFrontierSyncLedgerState> {
160        self.transition_frontier
161            .sync
162            .ledger_mut()
163            .ok_or_else(|| "Ledger sync state unavailable".to_owned())
164    }
165}
166
167impl SubstateAccess<BlockProducerVrfEvaluatorState> for State {
168    fn substate(&self) -> mina_core::SubstateResult<&BlockProducerVrfEvaluatorState> {
169        self.block_producer
170            .as_ref()
171            .map(|state| &state.vrf_evaluator)
172            .ok_or_else(|| "Block producer VRF evaluator state unavailable".to_owned())
173    }
174
175    fn substate_mut(&mut self) -> mina_core::SubstateResult<&mut BlockProducerVrfEvaluatorState> {
176        self.block_producer
177            .as_mut()
178            .map(|state| &mut state.vrf_evaluator)
179            .ok_or_else(|| "Block producer VRF evaluator state unavailable".to_owned())
180    }
181}
182
183impl mina_core::SubstateAccess<TransitionFrontierSyncLedgerSnarkedState> for State {
184    fn substate(&self) -> mina_core::SubstateResult<&TransitionFrontierSyncLedgerSnarkedState> {
185        self.transition_frontier
186            .sync
187            .ledger()
188            .ok_or_else(|| {
189                "Snarked ledger state unavailable. Ledger sync state unavailable".to_owned()
190            })?
191            .snarked()
192            .ok_or_else(|| "Snarked ledger state unavailable".to_owned())
193    }
194
195    fn substate_mut(
196        &mut self,
197    ) -> mina_core::SubstateResult<&mut TransitionFrontierSyncLedgerSnarkedState> {
198        self.transition_frontier
199            .sync
200            .ledger_mut()
201            .ok_or_else(|| {
202                "Snarked ledger state unavailable. Ledger sync state unavailable".to_owned()
203            })?
204            .snarked_mut()
205            .ok_or_else(|| "Snarked ledger state unavailable".to_owned())
206    }
207}
208
209impl mina_core::SubstateAccess<TransitionFrontierSyncLedgerStagedState> for State {
210    fn substate(&self) -> mina_core::SubstateResult<&TransitionFrontierSyncLedgerStagedState> {
211        self.transition_frontier
212            .sync
213            .ledger()
214            .ok_or_else(|| {
215                "Staged ledger state unavailable. Ledger sync state unavailable".to_owned()
216            })?
217            .staged()
218            .ok_or_else(|| "Staged ledger state unavailable".to_owned())
219    }
220
221    fn substate_mut(
222        &mut self,
223    ) -> mina_core::SubstateResult<&mut TransitionFrontierSyncLedgerStagedState> {
224        self.transition_frontier
225            .sync
226            .ledger_mut()
227            .ok_or_else(|| {
228                "Staged ledger state unavailable. Ledger sync state unavailable".to_owned()
229            })?
230            .staged_mut()
231            .ok_or_else(|| "Staged ledger state unavailable".to_owned())
232    }
233}
234
235impl SubstateAccess<State> for State {
236    fn substate(&self) -> mina_core::SubstateResult<&State> {
237        Ok(self)
238    }
239
240    fn substate_mut(&mut self) -> mina_core::SubstateResult<&mut State> {
241        Ok(self)
242    }
243}
244
245macro_rules! impl_p2p_state_access {
246    ($state:ty, $substate_type:ty) => {
247        impl mina_core::SubstateAccess<$substate_type> for $state {
248            fn substate(&self) -> mina_core::SubstateResult<&$substate_type> {
249                let substate: &P2pState = self.substate()?;
250                substate.substate()
251            }
252
253            fn substate_mut(&mut self) -> mina_core::SubstateResult<&mut $substate_type> {
254                let substate: &mut P2pState = self.substate_mut()?;
255                substate.substate_mut()
256            }
257        }
258    };
259}
260
261impl_p2p_state_access!(State, P2pNetworkIdentifyState);
262impl_p2p_state_access!(State, p2p::P2pNetworkState);
263impl_p2p_state_access!(State, P2pNetworkKadBootstrapState);
264impl_p2p_state_access!(State, p2p::P2pNetworkKadState);
265impl_p2p_state_access!(State, P2pNetworkSchedulerState);
266impl_p2p_state_access!(State, p2p::P2pLimits);
267impl_p2p_state_access!(State, p2p::P2pNetworkPubsubState);
268impl_p2p_state_access!(State, p2p::P2pConfig);
269
270impl p2p::P2pStateTrait for State {}
271
272pub type Substate<'a, S> = mina_core::Substate<'a, crate::Action, State, S>;
273
274impl State {
275    pub fn new(config: Config, constants: &ConsensusConstants, now: Timestamp) -> Self {
276        Self {
277            p2p: P2p::Pending(config.p2p),
278            ledger: LedgerState::new(config.ledger),
279            snark_pool: SnarkPoolState::new(),
280            snark: SnarkState::new(config.snark),
281            transition_frontier: TransitionFrontierState::new(
282                config.transition_frontier,
283                config.archive.is_some(),
284            ),
285            external_snark_worker: ExternalSnarkWorkers::new(now),
286            block_producer: BlockProducerState::new(now, config.block_producer),
287            rpc: RpcState::new(),
288            transaction_pool: TransactionPoolState::new(config.tx_pool, constants),
289
290            watched_accounts: WatchedAccountsState::new(),
291
292            config: config.global,
293            last_action: ActionMeta::zero_custom(now),
294            applied_actions_count: 0,
295        }
296    }
297
298    pub fn last_action(&self) -> &ActionMeta {
299        &self.last_action
300    }
301
302    /// Latest time observed by the state machine.
303    ///
304    /// Only updated when action is dispatched and reducer is executed.
305    #[inline(always)]
306    pub fn time(&self) -> Timestamp {
307        self.last_action.time()
308    }
309
310    pub fn pseudo_rng(&self) -> StdRng {
311        crate::core::pseudo_rng(self.time())
312    }
313
314    /// Must be called in the global reducer as the last thing only once
315    /// and only there!
316    pub fn action_applied(&mut self, action: &ActionWithMeta) {
317        self.last_action = action.meta().clone();
318        self.applied_actions_count = self.applied_actions_count.checked_add(1).expect("overflow");
319    }
320
321    pub fn genesis_block(&self) -> Option<ArcBlockWithHash> {
322        self.transition_frontier
323            .genesis
324            .block_with_real_or_dummy_proof()
325    }
326
327    fn cur_slot(&self, initial_slot: impl FnOnce(&ArcBlockWithHash) -> u32) -> Option<u32> {
328        let genesis = self.genesis_block()?;
329        let diff_ns = u64::from(self.time()).saturating_sub(u64::from(genesis.timestamp()));
330        let diff_ms = diff_ns / 1_000_000;
331        let slots = diff_ms
332            .checked_div(constraint_constants().block_window_duration_ms)
333            .expect("division by 0");
334        Some(
335            initial_slot(&genesis)
336                .checked_add(slots as u32)
337                .expect("overflow"),
338        )
339    }
340
341    /// Current global slot based on constants and current time.
342    ///
343    /// It's not equal to global slot of the best tip.
344    pub fn cur_global_slot(&self) -> Option<u32> {
345        self.cur_slot(|b| b.global_slot())
346    }
347
348    pub fn current_slot(&self) -> Option<u32> {
349        let slots_per_epoch = self.genesis_block()?.constants().slots_per_epoch.as_u32();
350        Some(
351            self.cur_global_slot()?
352                .checked_rem(slots_per_epoch)
353                .expect("division by 0"),
354        )
355    }
356
357    pub fn cur_global_slot_since_genesis(&self) -> Option<u32> {
358        self.cur_slot(|b| b.global_slot_since_genesis())
359    }
360
361    pub fn current_epoch(&self) -> Option<u32> {
362        let slots_per_epoch = self.genesis_block()?.constants().slots_per_epoch.as_u32();
363        Some(
364            self.cur_global_slot()?
365                .checked_div(slots_per_epoch)
366                .expect("division by 0"),
367        )
368    }
369
370    pub fn slot_time(&self, global_slot: u64) -> Option<(Timestamp, Timestamp)> {
371        let genesis_timestamp = self.genesis_block()?.genesis_timestamp();
372        println!("genesis_timestamp: {}", u64::from(genesis_timestamp));
373
374        let start_time = genesis_timestamp.checked_add(
375            global_slot
376                .checked_mul(constraint_constants().block_window_duration_ms)?
377                .checked_mul(1_000_000)?,
378        )?;
379        let end_time = start_time.checked_add(
380            constraint_constants()
381                .block_window_duration_ms
382                .checked_mul(1_000_000)?,
383        )?;
384
385        Some((start_time, end_time))
386    }
387
388    pub fn producing_block_after_genesis(&self) -> bool {
389        #[allow(clippy::arithmetic_side_effects)]
390        let two_mins_in_future = self.time() + Duration::from_secs(2 * 60);
391        self.block_producer.with(false, |bp| {
392            bp.current.won_slot_should_produce(two_mins_in_future)
393        }) && self.genesis_block().is_some_and(|b| {
394            let slot = &b.consensus_state().curr_global_slot_since_hard_fork;
395            let epoch = slot
396                .slot_number
397                .as_u32()
398                .checked_div(slot.slots_per_epoch.as_u32())
399                .expect("division by 0");
400            self.current_epoch() <= Some(epoch)
401        })
402    }
403
404    pub fn prevalidate_block(
405        &self,
406        block: &ArcBlockWithHash,
407        allow_block_too_late: bool,
408    ) -> Result<(), BlockPrevalidationError> {
409        let Some((genesis, cur_global_slot)) =
410            None.or_else(|| Some((self.genesis_block()?, self.cur_global_slot()?)))
411        else {
412            // we don't have genesis block. This should be impossible
413            // because we don't even know chain_id before we have genesis
414            // block, so we can't be connected to any peers from which
415            // we would receive a block.
416            bug_condition!("Tried to prevalidate a block before the genesis block was ready");
417            return Err(BlockPrevalidationError::GenesisNotReady);
418        };
419
420        prevalidate_block(block, &genesis, cur_global_slot, allow_block_too_late)
421    }
422
423    pub fn should_log_node_id(&self) -> bool {
424        self.config.testing_run
425    }
426
427    pub fn consensus_time_now(&self) -> Option<ConsensusTime> {
428        let (start_time, end_time) = self.slot_time(self.cur_global_slot()?.into())?;
429        let epoch = self.current_epoch()?;
430        let global_slot = self.cur_global_slot()?;
431        let slot = self.current_slot()?;
432        Some(ConsensusTime {
433            start_time,
434            end_time,
435            epoch,
436            global_slot,
437            slot,
438        })
439    }
440
441    pub fn consensus_time_best_tip(&self) -> Option<ConsensusTime> {
442        let best_tip = self.transition_frontier.best_tip()?;
443        let global_slot = best_tip
444            .curr_global_slot_since_hard_fork()
445            .slot_number
446            .as_u32();
447        let (start_time, end_time) = self.slot_time(global_slot.into())?;
448        let epoch = best_tip.consensus_state().epoch_count.as_u32();
449        let slot = best_tip.slot();
450        Some(ConsensusTime {
451            start_time,
452            end_time,
453            epoch,
454            global_slot,
455            slot,
456        })
457    }
458}
459
460#[serde_with::serde_as]
461#[derive(Debug, Clone, Serialize, Deserialize, MallocSizeOf)]
462pub enum P2p {
463    Pending(#[ignore_malloc_size_of = "constant"] P2pConfig),
464    Ready(P2pState),
465}
466
467#[derive(Debug, thiserror::Error)]
468pub enum P2pInitializationError {
469    #[error("p2p is already initialized")]
470    AlreadyInitialized,
471}
472
473#[macro_export]
474macro_rules! p2p_ready {
475    ($p2p:expr, $time:expr) => {
476        p2p_ready!($p2p, "", $time)
477    };
478    ($p2p:expr, $reason:expr, $time:expr) => {
479        match $p2p.ready() {
480            Some(v) => v,
481            None => {
482                //panic!("p2p is not ready: {:?}\nline: {}", $reason, line!());
483                mina_core::error!($time; "p2p is not initialized: {}", $reason);
484                return;
485            }
486        }
487    };
488}
489
490impl P2p {
491    pub fn config(&self) -> &P2pConfig {
492        match self {
493            P2p::Pending(config) => config,
494            P2p::Ready(p2p_state) => &p2p_state.config,
495        }
496    }
497
498    // TODO: add chain id
499    pub fn initialize(&mut self, chain_id: &ChainId) -> Result<(), P2pInitializationError> {
500        let P2p::Pending(config) = self else {
501            return Err(P2pInitializationError::AlreadyInitialized);
502        };
503
504        let callbacks = Self::p2p_callbacks();
505        *self = P2p::Ready(P2pState::new(config.clone(), callbacks, chain_id));
506        Ok(())
507    }
508
509    fn p2p_callbacks() -> P2pCallbacks {
510        P2pCallbacks {
511            on_p2p_channels_transaction_received: Some(redux::callback!(
512                on_p2p_channels_transaction_received((peer_id: PeerId, info: Box<TransactionInfo>)) -> crate::Action {
513                    TransactionPoolCandidateAction::InfoReceived {
514                        peer_id,
515                        info: *info,
516                    }
517                }
518            )),
519            on_p2p_channels_transactions_libp2p_received: Some(redux::callback!(
520                on_p2p_channels_transactions_libp2p_received((peer_id: PeerId, transactions: Vec<TransactionWithHash>, message_id: P2pNetworkPubsubMessageCacheId)) -> crate::Action {
521                    TransactionPoolCandidateAction::Libp2pTransactionsReceived {
522                        message_id,
523                        transactions,
524                        peer_id
525                    }
526                }
527            )),
528            on_p2p_channels_snark_job_commitment_received: Some(redux::callback!(
529                on_p2p_channels_snark_job_commitment_received((peer_id: PeerId, commitment: Box<SnarkJobCommitment>)) -> crate::Action {
530                    SnarkPoolAction::CommitmentAdd { commitment: *commitment, sender: peer_id }
531                }
532            )),
533            on_p2p_channels_snark_received: Some(redux::callback!(
534                on_p2p_channels_snark_received((peer_id: PeerId, snark: Box<SnarkInfo>)) -> crate::Action {
535                    SnarkPoolCandidateAction::InfoReceived { peer_id, info: *snark }
536                }
537            )),
538            on_p2p_channels_snark_libp2p_received: Some(redux::callback!(
539                on_p2p_channels_snark_libp2p_received((peer_id: PeerId, snark: Box<Snark>)) -> crate::Action {
540                    SnarkPoolCandidateAction::WorkFetchSuccess { peer_id, work: *snark }
541                }
542            )),
543            on_p2p_channels_streaming_rpc_ready: Some(redux::callback!(
544                on_p2p_channels_streaming_rpc_ready(_var: ()) -> crate::Action {
545                    P2pCallbacksAction::P2pChannelsStreamingRpcReady
546                }
547            )),
548            on_p2p_channels_best_tip_request_received: Some(redux::callback!(
549                on_p2p_channels_best_tip_request_received(peer_id: PeerId) -> crate::Action {
550                    P2pCallbacksAction::RpcRespondBestTip { peer_id }
551                }
552            )),
553            on_p2p_disconnection_finish: Some(redux::callback!(
554                on_p2p_disconnection_finish(peer_id: PeerId) -> crate::Action {
555                    P2pCallbacksAction::P2pDisconnection { peer_id }
556                }
557            )),
558            on_p2p_connection_outgoing_error: Some(redux::callback!(
559                on_p2p_connection_outgoing_error((rpc_id: RpcId, error: P2pConnectionOutgoingError)) -> crate::Action {
560                    RpcAction::P2pConnectionOutgoingError { rpc_id, error }
561                }
562            )),
563            on_p2p_connection_outgoing_success: Some(redux::callback!(
564                on_p2p_connection_outgoing_success(rpc_id: RpcId) -> crate::Action {
565                    RpcAction::P2pConnectionOutgoingSuccess { rpc_id }
566                }
567            )),
568            on_p2p_connection_incoming_error: Some(redux::callback!(
569                on_p2p_connection_incoming_error((rpc_id: RpcId, error: String)) -> crate::Action {
570                    RpcAction::P2pConnectionIncomingError { rpc_id, error }
571                }
572            )),
573            on_p2p_connection_incoming_success: Some(redux::callback!(
574                on_p2p_connection_incoming_success(rpc_id: RpcId) -> crate::Action {
575                    RpcAction::P2pConnectionIncomingSuccess { rpc_id }
576                }
577            )),
578            on_p2p_connection_incoming_answer_ready: Some(redux::callback!(
579                on_p2p_connection_incoming_answer_ready((rpc_id: RpcId, peer_id: PeerId, answer: P2pConnectionResponse)) -> crate::Action {
580                    RpcAction::P2pConnectionIncomingAnswerReady { rpc_id, answer, peer_id }
581                }
582            )),
583            on_p2p_peer_best_tip_update: Some(redux::callback!(
584                on_p2p_peer_best_tip_update(best_tip: BlockWithHash<Arc<v2::MinaBlockBlockStableV2>>) -> crate::Action {
585                    TransitionFrontierCandidateAction::P2pBestTipUpdate { best_tip }
586                }
587            )),
588            on_p2p_channels_rpc_ready: Some(redux::callback!(
589                on_p2p_channels_rpc_ready(peer_id: PeerId) -> crate::Action {
590                    P2pCallbacksAction::P2pChannelsRpcReady { peer_id }
591                }
592            )),
593            on_p2p_channels_rpc_timeout: Some(redux::callback!(
594                on_p2p_channels_rpc_timeout((peer_id: PeerId, id: P2pRpcId)) -> crate::Action {
595                    P2pCallbacksAction::P2pChannelsRpcTimeout { peer_id, id }
596                }
597            )),
598            on_p2p_channels_rpc_response_received: Some(redux::callback!(
599                on_p2p_channels_rpc_response_received((peer_id: PeerId, id: P2pRpcId, response: Option<Box<P2pRpcResponse>>)) -> crate::Action {
600                    P2pCallbacksAction::P2pChannelsRpcResponseReceived { peer_id, id, response }
601                }
602            )),
603            on_p2p_channels_rpc_request_received: Some(redux::callback!(
604                on_p2p_channels_rpc_request_received((peer_id: PeerId, id: P2pRpcId, request: Box<P2pRpcRequest>)) -> crate::Action {
605                    P2pCallbacksAction::P2pChannelsRpcRequestReceived { peer_id, id, request }
606                }
607            )),
608            on_p2p_channels_streaming_rpc_response_received: Some(redux::callback!(
609                on_p2p_channels_streaming_rpc_response_received((peer_id: PeerId, id: P2pRpcId, response: Option<P2pStreamingRpcResponseFull>)) -> crate::Action {
610                    P2pCallbacksAction::P2pChannelsStreamingRpcResponseReceived { peer_id, id, response }
611                }
612            )),
613            on_p2p_channels_streaming_rpc_timeout: Some(redux::callback!(
614                on_p2p_channels_streaming_rpc_timeout((peer_id: PeerId, id: P2pRpcId)) -> crate::Action {
615                    P2pCallbacksAction::P2pChannelsStreamingRpcTimeout { peer_id, id }
616                }
617            )),
618            on_p2p_pubsub_message_received: Some(redux::callback!(
619                on_p2p_pubsub_message_received((message_id: P2pNetworkPubsubMessageCacheId)) -> crate::Action{
620                    P2pCallbacksAction::P2pPubsubValidateMessage { message_id }
621                }
622            )),
623        }
624    }
625
626    pub fn ready(&self) -> Option<&P2pState> {
627        if let P2p::Ready(state) = self {
628            Some(state)
629        } else {
630            None
631        }
632    }
633
634    pub fn ready_mut(&mut self) -> Option<&mut P2pState> {
635        if let P2p::Ready(state) = self {
636            Some(state)
637        } else {
638            None
639        }
640    }
641
642    pub fn unwrap(&self) -> &P2pState {
643        self.ready().expect("p2p is not initialized")
644    }
645
646    pub fn is_enabled<T>(&self, action: &T, time: Timestamp) -> bool
647    where
648        T: EnablingCondition<P2pState>,
649    {
650        match self {
651            P2p::Pending(_) => false,
652            P2p::Ready(p2p_state) => action.is_enabled(p2p_state, time),
653        }
654    }
655
656    pub fn my_id(&self) -> PeerId {
657        match self {
658            P2p::Pending(config) => &config.identity_pub_key,
659            P2p::Ready(state) => &state.config.identity_pub_key,
660        }
661        .peer_id()
662    }
663
664    pub fn get_peer(&self, peer_id: &PeerId) -> Option<&P2pPeerState> {
665        self.ready().and_then(|p2p| p2p.peers.get(peer_id))
666    }
667
668    pub fn get_ready_peer(&self, peer_id: &PeerId) -> Option<&P2pPeerStatusReady> {
669        self.ready().and_then(|p2p| p2p.get_ready_peer(peer_id))
670    }
671
672    pub fn ready_peers(&self) -> Vec<PeerId> {
673        self.ready_peers_iter()
674            .map(|(peer_id, _)| *peer_id)
675            .collect()
676    }
677
678    pub fn ready_peers_iter(&self) -> ReadyPeersIter<'_> {
679        ReadyPeersIter::new(self)
680    }
681}
682
683#[derive(Debug, Clone)]
684pub struct ReadyPeersIter<'a>(Option<std::collections::btree_map::Iter<'a, PeerId, P2pPeerState>>);
685
686impl<'a> ReadyPeersIter<'a> {
687    fn new(p2p: &'a P2p) -> Self {
688        ReadyPeersIter(p2p.ready().map(|p2p| p2p.peers.iter()))
689    }
690}
691
692impl<'a> Iterator for ReadyPeersIter<'a> {
693    type Item = (&'a PeerId, &'a P2pPeerStatusReady);
694
695    fn next(&mut self) -> Option<Self::Item> {
696        let iter = self.0.as_mut()?;
697        Some(loop {
698            let (peer_id, state) = iter.next()?;
699            if let Some(ready) = state.status.as_ready() {
700                break (peer_id, ready);
701            }
702        })
703    }
704}