node/
state.rs

1use std::{sync::Arc, time::Duration};
2
3use malloc_size_of_derive::MallocSizeOf;
4use mina_core::{
5    block::prevalidate::{prevalidate_block, BlockPrevalidationError},
6    consensus::ConsensusTime,
7    transaction::{TransactionInfo, TransactionWithHash},
8};
9use mina_p2p_messages::v2;
10use p2p::P2pNetworkPubsubMessageCacheId;
11use rand::prelude::*;
12
13use mina_core::{
14    block::{ArcBlockWithHash, BlockWithHash},
15    consensus::ConsensusConstants,
16    constants::constraint_constants,
17    error,
18    requests::RpcId,
19    snark::{Snark, SnarkInfo, SnarkJobCommitment},
20    ChainId,
21};
22use p2p::{
23    bootstrap::P2pNetworkKadBootstrapState,
24    channels::{
25        rpc::{P2pRpcId, P2pRpcRequest, P2pRpcResponse},
26        streaming_rpc::P2pStreamingRpcResponseFull,
27    },
28    connection::{outgoing::P2pConnectionOutgoingError, P2pConnectionResponse},
29    network::identify::P2pNetworkIdentifyState,
30    P2pCallbacks, P2pConfig, P2pNetworkSchedulerState, P2pPeerState, P2pPeerStatusReady, PeerId,
31};
32use redux::{ActionMeta, EnablingCondition, Timestamp};
33use serde::{Deserialize, Serialize};
34use snark::{
35    block_verify::SnarkBlockVerifyState, user_command_verify::SnarkUserCommandVerifyState,
36    work_verify::SnarkWorkVerifyState,
37};
38
39use crate::{
40    block_producer::vrf_evaluator::BlockProducerVrfEvaluatorState,
41    config::GlobalConfig,
42    external_snark_worker::{ExternalSnarkWorker, ExternalSnarkWorkers},
43    ledger::{read::LedgerReadState, write::LedgerWriteState},
44    p2p::callbacks::P2pCallbacksAction,
45    snark_pool::candidate::SnarkPoolCandidateAction,
46    transaction_pool::{
47        candidate::{TransactionPoolCandidateAction, TransactionPoolCandidatesState},
48        TransactionPoolState,
49    },
50    transition_frontier::{
51        candidate::TransitionFrontierCandidateAction,
52        genesis::TransitionFrontierGenesisState,
53        sync::{
54            ledger::{
55                snarked::TransitionFrontierSyncLedgerSnarkedState,
56                staged::TransitionFrontierSyncLedgerStagedState, TransitionFrontierSyncLedgerState,
57            },
58            TransitionFrontierSyncState,
59        },
60    },
61    ActionWithMeta, RpcAction, SnarkPoolAction,
62};
63pub use crate::{
64    block_producer::BlockProducerState,
65    ledger::LedgerState,
66    p2p::P2pState,
67    rpc::RpcState,
68    snark::SnarkState,
69    snark_pool::{candidate::SnarkPoolCandidatesState, SnarkPoolState},
70    transition_frontier::{candidate::TransitionFrontierCandidatesState, TransitionFrontierState},
71    watched_accounts::WatchedAccountsState,
72    Config,
73};
74
75#[derive(Serialize, Deserialize, Debug, Clone)]
76pub struct State {
77    pub config: GlobalConfig,
78
79    pub p2p: P2p,
80    pub ledger: LedgerState,
81    pub snark: SnarkState,
82    pub transition_frontier: TransitionFrontierState,
83    pub snark_pool: SnarkPoolState,
84    pub external_snark_worker: ExternalSnarkWorkers,
85    pub transaction_pool: TransactionPoolState,
86    pub block_producer: BlockProducerState,
87    pub rpc: RpcState,
88
89    pub watched_accounts: WatchedAccountsState,
90
91    // TODO(binier): include action kind in `last_action`.
92    last_action: ActionMeta,
93    applied_actions_count: u64,
94}
95
96// Substate accessors that will be used in reducers
97use mina_core::{bug_condition, impl_substate_access, SubstateAccess};
98
99impl_substate_access!(State, SnarkState, snark);
100impl_substate_access!(State, SnarkBlockVerifyState, snark.block_verify);
101impl_substate_access!(State, SnarkWorkVerifyState, snark.work_verify);
102impl_substate_access!(
103    State,
104    SnarkUserCommandVerifyState,
105    snark.user_command_verify
106);
107impl_substate_access!(State, TransitionFrontierState, transition_frontier);
108impl_substate_access!(
109    State,
110    TransitionFrontierCandidatesState,
111    transition_frontier.candidates
112);
113impl_substate_access!(State, TransactionPoolState, transaction_pool);
114impl_substate_access!(
115    State,
116    TransactionPoolCandidatesState,
117    transaction_pool.candidates
118);
119impl_substate_access!(
120    State,
121    TransitionFrontierGenesisState,
122    transition_frontier.genesis
123);
124impl_substate_access!(State, TransitionFrontierSyncState, transition_frontier.sync);
125impl_substate_access!(State, SnarkPoolState, snark_pool);
126impl_substate_access!(State, SnarkPoolCandidatesState, snark_pool.candidates);
127impl_substate_access!(State, ExternalSnarkWorkers, external_snark_worker);
128impl_substate_access!(State, BlockProducerState, block_producer);
129impl_substate_access!(State, RpcState, rpc);
130impl_substate_access!(State, WatchedAccountsState, watched_accounts);
131impl_substate_access!(State, ExternalSnarkWorker, external_snark_worker.0);
132impl_substate_access!(State, LedgerState, ledger);
133impl_substate_access!(State, LedgerReadState, ledger.read);
134impl_substate_access!(State, LedgerWriteState, ledger.write);
135
136impl mina_core::SubstateAccess<P2pState> for State {
137    fn substate(&self) -> mina_core::SubstateResult<&P2pState> {
138        self.p2p
139            .ready()
140            .ok_or_else(|| "P2P state unavailable. P2P layer is not ready".to_owned())
141    }
142
143    fn substate_mut(&mut self) -> mina_core::SubstateResult<&mut P2pState> {
144        self.p2p
145            .ready_mut()
146            .ok_or_else(|| "P2P state unavailable. P2P layer is not ready".to_owned())
147    }
148}
149
150impl mina_core::SubstateAccess<TransitionFrontierSyncLedgerState> for State {
151    fn substate(&self) -> mina_core::SubstateResult<&TransitionFrontierSyncLedgerState> {
152        self.transition_frontier
153            .sync
154            .ledger()
155            .ok_or_else(|| "Ledger sync state unavailable".to_owned())
156    }
157
158    fn substate_mut(
159        &mut self,
160    ) -> mina_core::SubstateResult<&mut TransitionFrontierSyncLedgerState> {
161        self.transition_frontier
162            .sync
163            .ledger_mut()
164            .ok_or_else(|| "Ledger sync state unavailable".to_owned())
165    }
166}
167
168impl SubstateAccess<BlockProducerVrfEvaluatorState> for State {
169    fn substate(&self) -> mina_core::SubstateResult<&BlockProducerVrfEvaluatorState> {
170        self.block_producer
171            .as_ref()
172            .map(|state| &state.vrf_evaluator)
173            .ok_or_else(|| "Block producer VRF evaluator state unavailable".to_owned())
174    }
175
176    fn substate_mut(&mut self) -> mina_core::SubstateResult<&mut BlockProducerVrfEvaluatorState> {
177        self.block_producer
178            .as_mut()
179            .map(|state| &mut state.vrf_evaluator)
180            .ok_or_else(|| "Block producer VRF evaluator state unavailable".to_owned())
181    }
182}
183
184impl mina_core::SubstateAccess<TransitionFrontierSyncLedgerSnarkedState> for State {
185    fn substate(&self) -> mina_core::SubstateResult<&TransitionFrontierSyncLedgerSnarkedState> {
186        self.transition_frontier
187            .sync
188            .ledger()
189            .ok_or_else(|| {
190                "Snarked ledger state unavailable. Ledger sync state unavailable".to_owned()
191            })?
192            .snarked()
193            .ok_or_else(|| "Snarked ledger state unavailable".to_owned())
194    }
195
196    fn substate_mut(
197        &mut self,
198    ) -> mina_core::SubstateResult<&mut TransitionFrontierSyncLedgerSnarkedState> {
199        self.transition_frontier
200            .sync
201            .ledger_mut()
202            .ok_or_else(|| {
203                "Snarked ledger state unavailable. Ledger sync state unavailable".to_owned()
204            })?
205            .snarked_mut()
206            .ok_or_else(|| "Snarked ledger state unavailable".to_owned())
207    }
208}
209
210impl mina_core::SubstateAccess<TransitionFrontierSyncLedgerStagedState> for State {
211    fn substate(&self) -> mina_core::SubstateResult<&TransitionFrontierSyncLedgerStagedState> {
212        self.transition_frontier
213            .sync
214            .ledger()
215            .ok_or_else(|| {
216                "Staged ledger state unavailable. Ledger sync state unavailable".to_owned()
217            })?
218            .staged()
219            .ok_or_else(|| "Staged ledger state unavailable".to_owned())
220    }
221
222    fn substate_mut(
223        &mut self,
224    ) -> mina_core::SubstateResult<&mut TransitionFrontierSyncLedgerStagedState> {
225        self.transition_frontier
226            .sync
227            .ledger_mut()
228            .ok_or_else(|| {
229                "Staged ledger state unavailable. Ledger sync state unavailable".to_owned()
230            })?
231            .staged_mut()
232            .ok_or_else(|| "Staged ledger state unavailable".to_owned())
233    }
234}
235
236impl SubstateAccess<State> for State {
237    fn substate(&self) -> mina_core::SubstateResult<&State> {
238        Ok(self)
239    }
240
241    fn substate_mut(&mut self) -> mina_core::SubstateResult<&mut State> {
242        Ok(self)
243    }
244}
245
246macro_rules! impl_p2p_state_access {
247    ($state:ty, $substate_type:ty) => {
248        impl mina_core::SubstateAccess<$substate_type> for $state {
249            fn substate(&self) -> mina_core::SubstateResult<&$substate_type> {
250                let substate: &P2pState = self.substate()?;
251                substate.substate()
252            }
253
254            fn substate_mut(&mut self) -> mina_core::SubstateResult<&mut $substate_type> {
255                let substate: &mut P2pState = self.substate_mut()?;
256                substate.substate_mut()
257            }
258        }
259    };
260}
261
262impl_p2p_state_access!(State, P2pNetworkIdentifyState);
263impl_p2p_state_access!(State, p2p::P2pNetworkState);
264impl_p2p_state_access!(State, P2pNetworkKadBootstrapState);
265impl_p2p_state_access!(State, p2p::P2pNetworkKadState);
266impl_p2p_state_access!(State, P2pNetworkSchedulerState);
267impl_p2p_state_access!(State, p2p::P2pLimits);
268impl_p2p_state_access!(State, p2p::P2pNetworkPubsubState);
269impl_p2p_state_access!(State, p2p::P2pConfig);
270
271impl p2p::P2pStateTrait for State {}
272
273pub type Substate<'a, S> = mina_core::Substate<'a, crate::Action, State, S>;
274
275impl State {
276    pub fn new(config: Config, constants: &ConsensusConstants, now: Timestamp) -> Self {
277        Self {
278            p2p: P2p::Pending(config.p2p),
279            ledger: LedgerState::new(config.ledger),
280            snark_pool: SnarkPoolState::new(),
281            snark: SnarkState::new(config.snark),
282            transition_frontier: TransitionFrontierState::new(
283                config.transition_frontier,
284                config.archive.is_some(),
285            ),
286            external_snark_worker: ExternalSnarkWorkers::new(now),
287            block_producer: BlockProducerState::new(now, config.block_producer),
288            rpc: RpcState::new(),
289            transaction_pool: TransactionPoolState::new(config.tx_pool, constants),
290
291            watched_accounts: WatchedAccountsState::new(),
292
293            config: config.global,
294            last_action: ActionMeta::zero_custom(now),
295            applied_actions_count: 0,
296        }
297    }
298
299    pub fn last_action(&self) -> &ActionMeta {
300        &self.last_action
301    }
302
303    /// Latest time observed by the state machine.
304    ///
305    /// Only updated when action is dispatched and reducer is executed.
306    #[inline(always)]
307    pub fn time(&self) -> Timestamp {
308        self.last_action.time()
309    }
310
311    pub fn pseudo_rng(&self) -> StdRng {
312        crate::core::pseudo_rng(self.time())
313    }
314
315    /// Must be called in the global reducer as the last thing only once
316    /// and only there!
317    pub fn action_applied(&mut self, action: &ActionWithMeta) {
318        self.last_action = action.meta().clone();
319        self.applied_actions_count = self.applied_actions_count.checked_add(1).expect("overflow");
320    }
321
322    pub fn genesis_block(&self) -> Option<ArcBlockWithHash> {
323        self.transition_frontier
324            .genesis
325            .block_with_real_or_dummy_proof()
326    }
327
328    fn cur_slot(&self, initial_slot: impl FnOnce(&ArcBlockWithHash) -> u32) -> Option<u32> {
329        let genesis = self.genesis_block()?;
330        let diff_ns = u64::from(self.time()).saturating_sub(u64::from(genesis.timestamp()));
331        let diff_ms = diff_ns / 1_000_000;
332        let slots = diff_ms
333            .checked_div(constraint_constants().block_window_duration_ms)
334            .expect("division by 0");
335        Some(
336            initial_slot(&genesis)
337                .checked_add(slots as u32)
338                .expect("overflow"),
339        )
340    }
341
342    /// Current global slot based on constants and current time.
343    ///
344    /// It's not equal to global slot of the best tip.
345    pub fn cur_global_slot(&self) -> Option<u32> {
346        self.cur_slot(|b| b.global_slot())
347    }
348
349    pub fn current_slot(&self) -> Option<u32> {
350        let slots_per_epoch = self.genesis_block()?.constants().slots_per_epoch.as_u32();
351        Some(
352            self.cur_global_slot()?
353                .checked_rem(slots_per_epoch)
354                .expect("division by 0"),
355        )
356    }
357
358    pub fn cur_global_slot_since_genesis(&self) -> Option<u32> {
359        self.cur_slot(|b| b.global_slot_since_genesis())
360    }
361
362    pub fn current_epoch(&self) -> Option<u32> {
363        let slots_per_epoch = self.genesis_block()?.constants().slots_per_epoch.as_u32();
364        Some(
365            self.cur_global_slot()?
366                .checked_div(slots_per_epoch)
367                .expect("division by 0"),
368        )
369    }
370
371    pub fn slot_time(&self, global_slot: u64) -> Option<(Timestamp, Timestamp)> {
372        let genesis_timestamp = self.genesis_block()?.genesis_timestamp();
373        println!("genesis_timestamp: {}", u64::from(genesis_timestamp));
374
375        let start_time = genesis_timestamp.checked_add(
376            global_slot
377                .checked_mul(constraint_constants().block_window_duration_ms)?
378                .checked_mul(1_000_000)?,
379        )?;
380        let end_time = start_time.checked_add(
381            constraint_constants()
382                .block_window_duration_ms
383                .checked_mul(1_000_000)?,
384        )?;
385
386        Some((start_time, end_time))
387    }
388
389    pub fn producing_block_after_genesis(&self) -> bool {
390        #[allow(clippy::arithmetic_side_effects)]
391        let two_mins_in_future = self.time() + Duration::from_secs(2 * 60);
392        self.block_producer.with(false, |bp| {
393            bp.current.won_slot_should_produce(two_mins_in_future)
394        }) && self.genesis_block().is_some_and(|b| {
395            let slot = &b.consensus_state().curr_global_slot_since_hard_fork;
396            let epoch = slot
397                .slot_number
398                .as_u32()
399                .checked_div(slot.slots_per_epoch.as_u32())
400                .expect("division by 0");
401            self.current_epoch() <= Some(epoch)
402        })
403    }
404
405    pub fn prevalidate_block(
406        &self,
407        block: &ArcBlockWithHash,
408        allow_block_too_late: bool,
409    ) -> Result<(), BlockPrevalidationError> {
410        let Some((genesis, cur_global_slot)) =
411            None.or_else(|| Some((self.genesis_block()?, self.cur_global_slot()?)))
412        else {
413            // we don't have genesis block. This should be impossible
414            // because we don't even know chain_id before we have genesis
415            // block, so we can't be connected to any peers from which
416            // we would receive a block.
417            bug_condition!("Tried to prevalidate a block before the genesis block was ready");
418            return Err(BlockPrevalidationError::GenesisNotReady);
419        };
420
421        prevalidate_block(block, &genesis, cur_global_slot, allow_block_too_late)
422    }
423
424    pub fn should_log_node_id(&self) -> bool {
425        self.config.testing_run
426    }
427
428    pub fn consensus_time_now(&self) -> Option<ConsensusTime> {
429        let (start_time, end_time) = self.slot_time(self.cur_global_slot()?.into())?;
430        let epoch = self.current_epoch()?;
431        let global_slot = self.cur_global_slot()?;
432        let slot = self.current_slot()?;
433        Some(ConsensusTime {
434            start_time,
435            end_time,
436            epoch,
437            global_slot,
438            slot,
439        })
440    }
441
442    pub fn consensus_time_best_tip(&self) -> Option<ConsensusTime> {
443        let best_tip = self.transition_frontier.best_tip()?;
444        let global_slot = best_tip
445            .curr_global_slot_since_hard_fork()
446            .slot_number
447            .as_u32();
448        let (start_time, end_time) = self.slot_time(global_slot.into())?;
449        let epoch = best_tip.consensus_state().epoch_count.as_u32();
450        let slot = best_tip.slot();
451        Some(ConsensusTime {
452            start_time,
453            end_time,
454            epoch,
455            global_slot,
456            slot,
457        })
458    }
459}
460
461#[serde_with::serde_as]
462#[derive(Debug, Clone, Serialize, Deserialize, MallocSizeOf)]
463pub enum P2p {
464    Pending(#[ignore_malloc_size_of = "constant"] P2pConfig),
465    Ready(P2pState),
466}
467
468#[derive(Debug, thiserror::Error)]
469pub enum P2pInitializationError {
470    #[error("p2p is already initialized")]
471    AlreadyInitialized,
472}
473
474#[macro_export]
475macro_rules! p2p_ready {
476    ($p2p:expr, $time:expr) => {
477        p2p_ready!($p2p, "", $time)
478    };
479    ($p2p:expr, $reason:expr, $time:expr) => {
480        match $p2p.ready() {
481            Some(v) => v,
482            None => {
483                //panic!("p2p is not ready: {:?}\nline: {}", $reason, line!());
484                mina_core::error!($time; "p2p is not initialized: {}", $reason);
485                return;
486            }
487        }
488    };
489}
490
491impl P2p {
492    pub fn config(&self) -> &P2pConfig {
493        match self {
494            P2p::Pending(config) => config,
495            P2p::Ready(p2p_state) => &p2p_state.config,
496        }
497    }
498
499    // TODO: add chain id
500    pub fn initialize(&mut self, chain_id: &ChainId) -> Result<(), P2pInitializationError> {
501        let P2p::Pending(config) = self else {
502            return Err(P2pInitializationError::AlreadyInitialized);
503        };
504
505        let callbacks = Self::p2p_callbacks();
506        *self = P2p::Ready(P2pState::new(config.clone(), callbacks, chain_id));
507        Ok(())
508    }
509
510    fn p2p_callbacks() -> P2pCallbacks {
511        P2pCallbacks {
512            on_p2p_channels_transaction_received: Some(redux::callback!(
513                on_p2p_channels_transaction_received((peer_id: PeerId, info: Box<TransactionInfo>)) -> crate::Action {
514                    TransactionPoolCandidateAction::InfoReceived {
515                        peer_id,
516                        info: *info,
517                    }
518                }
519            )),
520            on_p2p_channels_transactions_libp2p_received: Some(redux::callback!(
521                on_p2p_channels_transactions_libp2p_received((peer_id: PeerId, transactions: Vec<TransactionWithHash>, message_id: P2pNetworkPubsubMessageCacheId)) -> crate::Action {
522                    TransactionPoolCandidateAction::Libp2pTransactionsReceived {
523                        message_id,
524                        transactions,
525                        peer_id
526                    }
527                }
528            )),
529            on_p2p_channels_snark_job_commitment_received: Some(redux::callback!(
530                on_p2p_channels_snark_job_commitment_received((peer_id: PeerId, commitment: Box<SnarkJobCommitment>)) -> crate::Action {
531                    SnarkPoolAction::CommitmentAdd { commitment: *commitment, sender: peer_id }
532                }
533            )),
534            on_p2p_channels_snark_received: Some(redux::callback!(
535                on_p2p_channels_snark_received((peer_id: PeerId, snark: Box<SnarkInfo>)) -> crate::Action {
536                    SnarkPoolCandidateAction::InfoReceived { peer_id, info: *snark }
537                }
538            )),
539            on_p2p_channels_snark_libp2p_received: Some(redux::callback!(
540                on_p2p_channels_snark_libp2p_received((peer_id: PeerId, snark: Box<Snark>)) -> crate::Action {
541                    SnarkPoolCandidateAction::WorkFetchSuccess { peer_id, work: *snark }
542                }
543            )),
544            on_p2p_channels_streaming_rpc_ready: Some(redux::callback!(
545                on_p2p_channels_streaming_rpc_ready(_var: ()) -> crate::Action {
546                    P2pCallbacksAction::P2pChannelsStreamingRpcReady
547                }
548            )),
549            on_p2p_channels_best_tip_request_received: Some(redux::callback!(
550                on_p2p_channels_best_tip_request_received(peer_id: PeerId) -> crate::Action {
551                    P2pCallbacksAction::RpcRespondBestTip { peer_id }
552                }
553            )),
554            on_p2p_disconnection_finish: Some(redux::callback!(
555                on_p2p_disconnection_finish(peer_id: PeerId) -> crate::Action {
556                    P2pCallbacksAction::P2pDisconnection { peer_id }
557                }
558            )),
559            on_p2p_connection_outgoing_error: Some(redux::callback!(
560                on_p2p_connection_outgoing_error((rpc_id: RpcId, error: P2pConnectionOutgoingError)) -> crate::Action {
561                    RpcAction::P2pConnectionOutgoingError { rpc_id, error }
562                }
563            )),
564            on_p2p_connection_outgoing_success: Some(redux::callback!(
565                on_p2p_connection_outgoing_success(rpc_id: RpcId) -> crate::Action {
566                    RpcAction::P2pConnectionOutgoingSuccess { rpc_id }
567                }
568            )),
569            on_p2p_connection_incoming_error: Some(redux::callback!(
570                on_p2p_connection_incoming_error((rpc_id: RpcId, error: String)) -> crate::Action {
571                    RpcAction::P2pConnectionIncomingError { rpc_id, error }
572                }
573            )),
574            on_p2p_connection_incoming_success: Some(redux::callback!(
575                on_p2p_connection_incoming_success(rpc_id: RpcId) -> crate::Action {
576                    RpcAction::P2pConnectionIncomingSuccess { rpc_id }
577                }
578            )),
579            on_p2p_connection_incoming_answer_ready: Some(redux::callback!(
580                on_p2p_connection_incoming_answer_ready((rpc_id: RpcId, peer_id: PeerId, answer: P2pConnectionResponse)) -> crate::Action {
581                    RpcAction::P2pConnectionIncomingAnswerReady { rpc_id, answer, peer_id }
582                }
583            )),
584            on_p2p_peer_best_tip_update: Some(redux::callback!(
585                on_p2p_peer_best_tip_update(best_tip: BlockWithHash<Arc<v2::MinaBlockBlockStableV2>>) -> crate::Action {
586                    TransitionFrontierCandidateAction::P2pBestTipUpdate { best_tip }
587                }
588            )),
589            on_p2p_channels_rpc_ready: Some(redux::callback!(
590                on_p2p_channels_rpc_ready(peer_id: PeerId) -> crate::Action {
591                    P2pCallbacksAction::P2pChannelsRpcReady { peer_id }
592                }
593            )),
594            on_p2p_channels_rpc_timeout: Some(redux::callback!(
595                on_p2p_channels_rpc_timeout((peer_id: PeerId, id: P2pRpcId)) -> crate::Action {
596                    P2pCallbacksAction::P2pChannelsRpcTimeout { peer_id, id }
597                }
598            )),
599            on_p2p_channels_rpc_response_received: Some(redux::callback!(
600                on_p2p_channels_rpc_response_received((peer_id: PeerId, id: P2pRpcId, response: Option<Box<P2pRpcResponse>>)) -> crate::Action {
601                    P2pCallbacksAction::P2pChannelsRpcResponseReceived { peer_id, id, response }
602                }
603            )),
604            on_p2p_channels_rpc_request_received: Some(redux::callback!(
605                on_p2p_channels_rpc_request_received((peer_id: PeerId, id: P2pRpcId, request: Box<P2pRpcRequest>)) -> crate::Action {
606                    P2pCallbacksAction::P2pChannelsRpcRequestReceived { peer_id, id, request }
607                }
608            )),
609            on_p2p_channels_streaming_rpc_response_received: Some(redux::callback!(
610                on_p2p_channels_streaming_rpc_response_received((peer_id: PeerId, id: P2pRpcId, response: Option<P2pStreamingRpcResponseFull>)) -> crate::Action {
611                    P2pCallbacksAction::P2pChannelsStreamingRpcResponseReceived { peer_id, id, response }
612                }
613            )),
614            on_p2p_channels_streaming_rpc_timeout: Some(redux::callback!(
615                on_p2p_channels_streaming_rpc_timeout((peer_id: PeerId, id: P2pRpcId)) -> crate::Action {
616                    P2pCallbacksAction::P2pChannelsStreamingRpcTimeout { peer_id, id }
617                }
618            )),
619            on_p2p_pubsub_message_received: Some(redux::callback!(
620                on_p2p_pubsub_message_received((message_id: P2pNetworkPubsubMessageCacheId)) -> crate::Action{
621                    P2pCallbacksAction::P2pPubsubValidateMessage { message_id }
622                }
623            )),
624        }
625    }
626
627    pub fn ready(&self) -> Option<&P2pState> {
628        if let P2p::Ready(state) = self {
629            Some(state)
630        } else {
631            None
632        }
633    }
634
635    pub fn ready_mut(&mut self) -> Option<&mut P2pState> {
636        if let P2p::Ready(state) = self {
637            Some(state)
638        } else {
639            None
640        }
641    }
642
643    pub fn unwrap(&self) -> &P2pState {
644        self.ready().expect("p2p is not initialized")
645    }
646
647    pub fn is_enabled<T>(&self, action: &T, time: Timestamp) -> bool
648    where
649        T: EnablingCondition<P2pState>,
650    {
651        match self {
652            P2p::Pending(_) => false,
653            P2p::Ready(p2p_state) => action.is_enabled(p2p_state, time),
654        }
655    }
656
657    pub fn my_id(&self) -> PeerId {
658        match self {
659            P2p::Pending(config) => &config.identity_pub_key,
660            P2p::Ready(state) => &state.config.identity_pub_key,
661        }
662        .peer_id()
663    }
664
665    pub fn get_peer(&self, peer_id: &PeerId) -> Option<&P2pPeerState> {
666        self.ready().and_then(|p2p| p2p.peers.get(peer_id))
667    }
668
669    pub fn get_ready_peer(&self, peer_id: &PeerId) -> Option<&P2pPeerStatusReady> {
670        self.ready().and_then(|p2p| p2p.get_ready_peer(peer_id))
671    }
672
673    pub fn ready_peers(&self) -> Vec<PeerId> {
674        self.ready_peers_iter()
675            .map(|(peer_id, _)| *peer_id)
676            .collect()
677    }
678
679    pub fn ready_peers_iter(&self) -> ReadyPeersIter<'_> {
680        ReadyPeersIter::new(self)
681    }
682}
683
684#[derive(Debug, Clone)]
685pub struct ReadyPeersIter<'a>(Option<std::collections::btree_map::Iter<'a, PeerId, P2pPeerState>>);
686
687impl<'a> ReadyPeersIter<'a> {
688    fn new(p2p: &'a P2p) -> Self {
689        ReadyPeersIter(p2p.ready().map(|p2p| p2p.peers.iter()))
690    }
691}
692
693impl<'a> Iterator for ReadyPeersIter<'a> {
694    type Item = (&'a PeerId, &'a P2pPeerStatusReady);
695
696    fn next(&mut self) -> Option<Self::Item> {
697        let iter = self.0.as_mut()?;
698        Some(loop {
699            let (peer_id, state) = iter.next()?;
700            if let Some(ready) = state.status.as_ready() {
701                break (peer_id, ready);
702            }
703        })
704    }
705}