node/
state.rs

1use std::{sync::Arc, time::Duration};
2
3use malloc_size_of_derive::MallocSizeOf;
4use mina_p2p_messages::v2;
5use openmina_core::{
6    block::prevalidate::{prevalidate_block, BlockPrevalidationError},
7    consensus::ConsensusTime,
8    transaction::{TransactionInfo, TransactionWithHash},
9};
10use p2p::P2pNetworkPubsubMessageCacheId;
11use rand::prelude::*;
12
13use openmina_core::{
14    block::{ArcBlockWithHash, BlockWithHash},
15    consensus::ConsensusConstants,
16    constants::constraint_constants,
17    error,
18    requests::RpcId,
19    snark::{Snark, SnarkInfo, SnarkJobCommitment},
20    ChainId,
21};
22use p2p::{
23    bootstrap::P2pNetworkKadBootstrapState,
24    channels::{
25        rpc::{P2pRpcId, P2pRpcRequest, P2pRpcResponse},
26        streaming_rpc::P2pStreamingRpcResponseFull,
27    },
28    connection::{outgoing::P2pConnectionOutgoingError, P2pConnectionResponse},
29    network::identify::P2pNetworkIdentifyState,
30    P2pCallbacks, P2pConfig, P2pNetworkSchedulerState, P2pPeerState, P2pPeerStatusReady, PeerId,
31};
32use redux::{ActionMeta, EnablingCondition, Timestamp};
33use serde::{Deserialize, Serialize};
34use snark::{
35    block_verify::SnarkBlockVerifyState, user_command_verify::SnarkUserCommandVerifyState,
36    work_verify::SnarkWorkVerifyState,
37};
38
39use crate::{
40    block_producer::vrf_evaluator::BlockProducerVrfEvaluatorState,
41    config::GlobalConfig,
42    external_snark_worker::{ExternalSnarkWorker, ExternalSnarkWorkers},
43    ledger::{read::LedgerReadState, write::LedgerWriteState},
44    p2p::callbacks::P2pCallbacksAction,
45    snark_pool::candidate::SnarkPoolCandidateAction,
46    transaction_pool::{
47        candidate::{TransactionPoolCandidateAction, TransactionPoolCandidatesState},
48        TransactionPoolState,
49    },
50    transition_frontier::{
51        candidate::TransitionFrontierCandidateAction,
52        genesis::TransitionFrontierGenesisState,
53        sync::{
54            ledger::{
55                snarked::TransitionFrontierSyncLedgerSnarkedState,
56                staged::TransitionFrontierSyncLedgerStagedState, TransitionFrontierSyncLedgerState,
57            },
58            TransitionFrontierSyncState,
59        },
60    },
61    ActionWithMeta, RpcAction, SnarkPoolAction,
62};
63pub use crate::{
64    block_producer::BlockProducerState,
65    ledger::LedgerState,
66    p2p::P2pState,
67    rpc::RpcState,
68    snark::SnarkState,
69    snark_pool::{candidate::SnarkPoolCandidatesState, SnarkPoolState},
70    transition_frontier::{candidate::TransitionFrontierCandidatesState, TransitionFrontierState},
71    watched_accounts::WatchedAccountsState,
72    Config,
73};
74
75#[derive(Serialize, Deserialize, Debug, Clone)]
76pub struct State {
77    pub config: GlobalConfig,
78
79    pub p2p: P2p,
80    pub ledger: LedgerState,
81    pub snark: SnarkState,
82    pub transition_frontier: TransitionFrontierState,
83    pub snark_pool: SnarkPoolState,
84    pub external_snark_worker: ExternalSnarkWorkers,
85    pub transaction_pool: TransactionPoolState,
86    pub block_producer: BlockProducerState,
87    pub rpc: RpcState,
88
89    pub watched_accounts: WatchedAccountsState,
90
91    // TODO(binier): include action kind in `last_action`.
92    last_action: ActionMeta,
93    applied_actions_count: u64,
94}
95
96// Substate accessors that will be used in reducers
97use openmina_core::{bug_condition, impl_substate_access, SubstateAccess};
98
99impl_substate_access!(State, SnarkState, snark);
100impl_substate_access!(State, SnarkBlockVerifyState, snark.block_verify);
101impl_substate_access!(State, SnarkWorkVerifyState, snark.work_verify);
102impl_substate_access!(
103    State,
104    SnarkUserCommandVerifyState,
105    snark.user_command_verify
106);
107impl_substate_access!(State, TransitionFrontierState, transition_frontier);
108impl_substate_access!(
109    State,
110    TransitionFrontierCandidatesState,
111    transition_frontier.candidates
112);
113impl_substate_access!(State, TransactionPoolState, transaction_pool);
114impl_substate_access!(
115    State,
116    TransactionPoolCandidatesState,
117    transaction_pool.candidates
118);
119impl_substate_access!(
120    State,
121    TransitionFrontierGenesisState,
122    transition_frontier.genesis
123);
124impl_substate_access!(State, TransitionFrontierSyncState, transition_frontier.sync);
125impl_substate_access!(State, SnarkPoolState, snark_pool);
126impl_substate_access!(State, SnarkPoolCandidatesState, snark_pool.candidates);
127impl_substate_access!(State, ExternalSnarkWorkers, external_snark_worker);
128impl_substate_access!(State, BlockProducerState, block_producer);
129impl_substate_access!(State, RpcState, rpc);
130impl_substate_access!(State, WatchedAccountsState, watched_accounts);
131impl_substate_access!(State, ExternalSnarkWorker, external_snark_worker.0);
132impl_substate_access!(State, LedgerState, ledger);
133impl_substate_access!(State, LedgerReadState, ledger.read);
134impl_substate_access!(State, LedgerWriteState, ledger.write);
135
136impl openmina_core::SubstateAccess<P2pState> for State {
137    fn substate(&self) -> openmina_core::SubstateResult<&P2pState> {
138        self.p2p
139            .ready()
140            .ok_or_else(|| "P2P state unavailable. P2P layer is not ready".to_owned())
141    }
142
143    fn substate_mut(&mut self) -> openmina_core::SubstateResult<&mut P2pState> {
144        self.p2p
145            .ready_mut()
146            .ok_or_else(|| "P2P state unavailable. P2P layer is not ready".to_owned())
147    }
148}
149
150impl openmina_core::SubstateAccess<TransitionFrontierSyncLedgerState> for State {
151    fn substate(&self) -> openmina_core::SubstateResult<&TransitionFrontierSyncLedgerState> {
152        self.transition_frontier
153            .sync
154            .ledger()
155            .ok_or_else(|| "Ledger sync state unavailable".to_owned())
156    }
157
158    fn substate_mut(
159        &mut self,
160    ) -> openmina_core::SubstateResult<&mut TransitionFrontierSyncLedgerState> {
161        self.transition_frontier
162            .sync
163            .ledger_mut()
164            .ok_or_else(|| "Ledger sync state unavailable".to_owned())
165    }
166}
167
168impl SubstateAccess<BlockProducerVrfEvaluatorState> for State {
169    fn substate(&self) -> openmina_core::SubstateResult<&BlockProducerVrfEvaluatorState> {
170        self.block_producer
171            .as_ref()
172            .map(|state| &state.vrf_evaluator)
173            .ok_or_else(|| "Block producer VRF evaluator state unavailable".to_owned())
174    }
175
176    fn substate_mut(
177        &mut self,
178    ) -> openmina_core::SubstateResult<&mut BlockProducerVrfEvaluatorState> {
179        self.block_producer
180            .as_mut()
181            .map(|state| &mut state.vrf_evaluator)
182            .ok_or_else(|| "Block producer VRF evaluator state unavailable".to_owned())
183    }
184}
185
186impl openmina_core::SubstateAccess<TransitionFrontierSyncLedgerSnarkedState> for State {
187    fn substate(&self) -> openmina_core::SubstateResult<&TransitionFrontierSyncLedgerSnarkedState> {
188        self.transition_frontier
189            .sync
190            .ledger()
191            .ok_or_else(|| {
192                "Snarked ledger state unavailable. Ledger sync state unavailable".to_owned()
193            })?
194            .snarked()
195            .ok_or_else(|| "Snarked ledger state unavailable".to_owned())
196    }
197
198    fn substate_mut(
199        &mut self,
200    ) -> openmina_core::SubstateResult<&mut TransitionFrontierSyncLedgerSnarkedState> {
201        self.transition_frontier
202            .sync
203            .ledger_mut()
204            .ok_or_else(|| {
205                "Snarked ledger state unavailable. Ledger sync state unavailable".to_owned()
206            })?
207            .snarked_mut()
208            .ok_or_else(|| "Snarked ledger state unavailable".to_owned())
209    }
210}
211
212impl openmina_core::SubstateAccess<TransitionFrontierSyncLedgerStagedState> for State {
213    fn substate(&self) -> openmina_core::SubstateResult<&TransitionFrontierSyncLedgerStagedState> {
214        self.transition_frontier
215            .sync
216            .ledger()
217            .ok_or_else(|| {
218                "Staged ledger state unavailable. Ledger sync state unavailable".to_owned()
219            })?
220            .staged()
221            .ok_or_else(|| "Staged ledger state unavailable".to_owned())
222    }
223
224    fn substate_mut(
225        &mut self,
226    ) -> openmina_core::SubstateResult<&mut TransitionFrontierSyncLedgerStagedState> {
227        self.transition_frontier
228            .sync
229            .ledger_mut()
230            .ok_or_else(|| {
231                "Staged ledger state unavailable. Ledger sync state unavailable".to_owned()
232            })?
233            .staged_mut()
234            .ok_or_else(|| "Staged ledger state unavailable".to_owned())
235    }
236}
237
238impl SubstateAccess<State> for State {
239    fn substate(&self) -> openmina_core::SubstateResult<&State> {
240        Ok(self)
241    }
242
243    fn substate_mut(&mut self) -> openmina_core::SubstateResult<&mut State> {
244        Ok(self)
245    }
246}
247
248macro_rules! impl_p2p_state_access {
249    ($state:ty, $substate_type:ty) => {
250        impl openmina_core::SubstateAccess<$substate_type> for $state {
251            fn substate(&self) -> openmina_core::SubstateResult<&$substate_type> {
252                let substate: &P2pState = self.substate()?;
253                substate.substate()
254            }
255
256            fn substate_mut(&mut self) -> openmina_core::SubstateResult<&mut $substate_type> {
257                let substate: &mut P2pState = self.substate_mut()?;
258                substate.substate_mut()
259            }
260        }
261    };
262}
263
264impl_p2p_state_access!(State, P2pNetworkIdentifyState);
265impl_p2p_state_access!(State, p2p::P2pNetworkState);
266impl_p2p_state_access!(State, P2pNetworkKadBootstrapState);
267impl_p2p_state_access!(State, p2p::P2pNetworkKadState);
268impl_p2p_state_access!(State, P2pNetworkSchedulerState);
269impl_p2p_state_access!(State, p2p::P2pLimits);
270impl_p2p_state_access!(State, p2p::P2pNetworkPubsubState);
271impl_p2p_state_access!(State, p2p::P2pConfig);
272
273impl p2p::P2pStateTrait for State {}
274
275pub type Substate<'a, S> = openmina_core::Substate<'a, crate::Action, State, S>;
276
277impl State {
278    pub fn new(config: Config, constants: &ConsensusConstants, now: Timestamp) -> Self {
279        Self {
280            p2p: P2p::Pending(config.p2p),
281            ledger: LedgerState::new(config.ledger),
282            snark_pool: SnarkPoolState::new(),
283            snark: SnarkState::new(config.snark),
284            transition_frontier: TransitionFrontierState::new(
285                config.transition_frontier,
286                config.archive.is_some(),
287            ),
288            external_snark_worker: ExternalSnarkWorkers::new(now),
289            block_producer: BlockProducerState::new(now, config.block_producer),
290            rpc: RpcState::new(),
291            transaction_pool: TransactionPoolState::new(config.tx_pool, constants),
292
293            watched_accounts: WatchedAccountsState::new(),
294
295            config: config.global,
296            last_action: ActionMeta::zero_custom(now),
297            applied_actions_count: 0,
298        }
299    }
300
301    pub fn last_action(&self) -> &ActionMeta {
302        &self.last_action
303    }
304
305    /// Latest time observed by the state machine.
306    ///
307    /// Only updated when action is dispatched and reducer is executed.
308    #[inline(always)]
309    pub fn time(&self) -> Timestamp {
310        self.last_action.time()
311    }
312
313    pub fn pseudo_rng(&self) -> StdRng {
314        crate::core::pseudo_rng(self.time())
315    }
316
317    /// Must be called in the global reducer as the last thing only once
318    /// and only there!
319    pub fn action_applied(&mut self, action: &ActionWithMeta) {
320        self.last_action = action.meta().clone();
321        self.applied_actions_count = self.applied_actions_count.checked_add(1).expect("overflow");
322    }
323
324    pub fn genesis_block(&self) -> Option<ArcBlockWithHash> {
325        self.transition_frontier
326            .genesis
327            .block_with_real_or_dummy_proof()
328    }
329
330    fn cur_slot(&self, initial_slot: impl FnOnce(&ArcBlockWithHash) -> u32) -> Option<u32> {
331        let genesis = self.genesis_block()?;
332        let diff_ns = u64::from(self.time()).saturating_sub(u64::from(genesis.timestamp()));
333        let diff_ms = diff_ns / 1_000_000;
334        let slots = diff_ms
335            .checked_div(constraint_constants().block_window_duration_ms)
336            .expect("division by 0");
337        Some(
338            initial_slot(&genesis)
339                .checked_add(slots as u32)
340                .expect("overflow"),
341        )
342    }
343
344    /// Current global slot based on constants and current time.
345    ///
346    /// It's not equal to global slot of the best tip.
347    pub fn cur_global_slot(&self) -> Option<u32> {
348        self.cur_slot(|b| b.global_slot())
349    }
350
351    pub fn current_slot(&self) -> Option<u32> {
352        let slots_per_epoch = self.genesis_block()?.constants().slots_per_epoch.as_u32();
353        Some(
354            self.cur_global_slot()?
355                .checked_rem(slots_per_epoch)
356                .expect("division by 0"),
357        )
358    }
359
360    pub fn cur_global_slot_since_genesis(&self) -> Option<u32> {
361        self.cur_slot(|b| b.global_slot_since_genesis())
362    }
363
364    pub fn current_epoch(&self) -> Option<u32> {
365        let slots_per_epoch = self.genesis_block()?.constants().slots_per_epoch.as_u32();
366        Some(
367            self.cur_global_slot()?
368                .checked_div(slots_per_epoch)
369                .expect("division by 0"),
370        )
371    }
372
373    pub fn slot_time(&self, global_slot: u64) -> Option<(Timestamp, Timestamp)> {
374        let genesis_timestamp = self.genesis_block()?.genesis_timestamp();
375        println!("genesis_timestamp: {}", u64::from(genesis_timestamp));
376
377        let start_time = genesis_timestamp.checked_add(
378            global_slot
379                .checked_mul(constraint_constants().block_window_duration_ms)?
380                .checked_mul(1_000_000)?,
381        )?;
382        let end_time = start_time.checked_add(
383            constraint_constants()
384                .block_window_duration_ms
385                .checked_mul(1_000_000)?,
386        )?;
387
388        Some((start_time, end_time))
389    }
390
391    pub fn producing_block_after_genesis(&self) -> bool {
392        #[allow(clippy::arithmetic_side_effects)]
393        let two_mins_in_future = self.time() + Duration::from_secs(2 * 60);
394        self.block_producer.with(false, |bp| {
395            bp.current.won_slot_should_produce(two_mins_in_future)
396        }) && self.genesis_block().is_some_and(|b| {
397            let slot = &b.consensus_state().curr_global_slot_since_hard_fork;
398            let epoch = slot
399                .slot_number
400                .as_u32()
401                .checked_div(slot.slots_per_epoch.as_u32())
402                .expect("division by 0");
403            self.current_epoch() <= Some(epoch)
404        })
405    }
406
407    pub fn prevalidate_block(
408        &self,
409        block: &ArcBlockWithHash,
410        allow_block_too_late: bool,
411    ) -> Result<(), BlockPrevalidationError> {
412        let Some((genesis, cur_global_slot)) =
413            None.or_else(|| Some((self.genesis_block()?, self.cur_global_slot()?)))
414        else {
415            // we don't have genesis block. This should be impossible
416            // because we don't even know chain_id before we have genesis
417            // block, so we can't be connected to any peers from which
418            // we would receive a block.
419            bug_condition!("Tried to prevalidate a block before the genesis block was ready");
420            return Err(BlockPrevalidationError::GenesisNotReady);
421        };
422
423        prevalidate_block(block, &genesis, cur_global_slot, allow_block_too_late)
424    }
425
426    pub fn should_log_node_id(&self) -> bool {
427        self.config.testing_run
428    }
429
430    pub fn consensus_time_now(&self) -> Option<ConsensusTime> {
431        let (start_time, end_time) = self.slot_time(self.cur_global_slot()?.into())?;
432        let epoch = self.current_epoch()?;
433        let global_slot = self.cur_global_slot()?;
434        let slot = self.current_slot()?;
435        Some(ConsensusTime {
436            start_time,
437            end_time,
438            epoch,
439            global_slot,
440            slot,
441        })
442    }
443
444    pub fn consensus_time_best_tip(&self) -> Option<ConsensusTime> {
445        let best_tip = self.transition_frontier.best_tip()?;
446        let global_slot = best_tip
447            .curr_global_slot_since_hard_fork()
448            .slot_number
449            .as_u32();
450        let (start_time, end_time) = self.slot_time(global_slot.into())?;
451        let epoch = best_tip.consensus_state().epoch_count.as_u32();
452        let slot = best_tip.slot();
453        Some(ConsensusTime {
454            start_time,
455            end_time,
456            epoch,
457            global_slot,
458            slot,
459        })
460    }
461}
462
463#[serde_with::serde_as]
464#[derive(Debug, Clone, Serialize, Deserialize, MallocSizeOf)]
465pub enum P2p {
466    Pending(#[ignore_malloc_size_of = "constant"] P2pConfig),
467    Ready(P2pState),
468}
469
470#[derive(Debug, thiserror::Error)]
471pub enum P2pInitializationError {
472    #[error("p2p is already initialized")]
473    AlreadyInitialized,
474}
475
476#[macro_export]
477macro_rules! p2p_ready {
478    ($p2p:expr, $time:expr) => {
479        p2p_ready!($p2p, "", $time)
480    };
481    ($p2p:expr, $reason:expr, $time:expr) => {
482        match $p2p.ready() {
483            Some(v) => v,
484            None => {
485                //panic!("p2p is not ready: {:?}\nline: {}", $reason, line!());
486                openmina_core::error!($time; "p2p is not initialized: {}", $reason);
487                return;
488            }
489        }
490    };
491}
492
493impl P2p {
494    pub fn config(&self) -> &P2pConfig {
495        match self {
496            P2p::Pending(config) => config,
497            P2p::Ready(p2p_state) => &p2p_state.config,
498        }
499    }
500
501    // TODO: add chain id
502    pub fn initialize(&mut self, chain_id: &ChainId) -> Result<(), P2pInitializationError> {
503        let P2p::Pending(config) = self else {
504            return Err(P2pInitializationError::AlreadyInitialized);
505        };
506
507        let callbacks = Self::p2p_callbacks();
508        *self = P2p::Ready(P2pState::new(config.clone(), callbacks, chain_id));
509        Ok(())
510    }
511
512    fn p2p_callbacks() -> P2pCallbacks {
513        P2pCallbacks {
514            on_p2p_channels_transaction_received: Some(redux::callback!(
515                on_p2p_channels_transaction_received((peer_id: PeerId, info: Box<TransactionInfo>)) -> crate::Action {
516                    TransactionPoolCandidateAction::InfoReceived {
517                        peer_id,
518                        info: *info,
519                    }
520                }
521            )),
522            on_p2p_channels_transactions_libp2p_received: Some(redux::callback!(
523                on_p2p_channels_transactions_libp2p_received((peer_id: PeerId, transactions: Vec<TransactionWithHash>, message_id: P2pNetworkPubsubMessageCacheId)) -> crate::Action {
524                    TransactionPoolCandidateAction::Libp2pTransactionsReceived {
525                        message_id,
526                        transactions,
527                        peer_id
528                    }
529                }
530            )),
531            on_p2p_channels_snark_job_commitment_received: Some(redux::callback!(
532                on_p2p_channels_snark_job_commitment_received((peer_id: PeerId, commitment: Box<SnarkJobCommitment>)) -> crate::Action {
533                    SnarkPoolAction::CommitmentAdd { commitment: *commitment, sender: peer_id }
534                }
535            )),
536            on_p2p_channels_snark_received: Some(redux::callback!(
537                on_p2p_channels_snark_received((peer_id: PeerId, snark: Box<SnarkInfo>)) -> crate::Action {
538                    SnarkPoolCandidateAction::InfoReceived { peer_id, info: *snark }
539                }
540            )),
541            on_p2p_channels_snark_libp2p_received: Some(redux::callback!(
542                on_p2p_channels_snark_libp2p_received((peer_id: PeerId, snark: Box<Snark>)) -> crate::Action {
543                    SnarkPoolCandidateAction::WorkFetchSuccess { peer_id, work: *snark }
544                }
545            )),
546            on_p2p_channels_streaming_rpc_ready: Some(redux::callback!(
547                on_p2p_channels_streaming_rpc_ready(_var: ()) -> crate::Action {
548                    P2pCallbacksAction::P2pChannelsStreamingRpcReady
549                }
550            )),
551            on_p2p_channels_best_tip_request_received: Some(redux::callback!(
552                on_p2p_channels_best_tip_request_received(peer_id: PeerId) -> crate::Action {
553                    P2pCallbacksAction::RpcRespondBestTip { peer_id }
554                }
555            )),
556            on_p2p_disconnection_finish: Some(redux::callback!(
557                on_p2p_disconnection_finish(peer_id: PeerId) -> crate::Action {
558                    P2pCallbacksAction::P2pDisconnection { peer_id }
559                }
560            )),
561            on_p2p_connection_outgoing_error: Some(redux::callback!(
562                on_p2p_connection_outgoing_error((rpc_id: RpcId, error: P2pConnectionOutgoingError)) -> crate::Action {
563                    RpcAction::P2pConnectionOutgoingError { rpc_id, error }
564                }
565            )),
566            on_p2p_connection_outgoing_success: Some(redux::callback!(
567                on_p2p_connection_outgoing_success(rpc_id: RpcId) -> crate::Action {
568                    RpcAction::P2pConnectionOutgoingSuccess { rpc_id }
569                }
570            )),
571            on_p2p_connection_incoming_error: Some(redux::callback!(
572                on_p2p_connection_incoming_error((rpc_id: RpcId, error: String)) -> crate::Action {
573                    RpcAction::P2pConnectionIncomingError { rpc_id, error }
574                }
575            )),
576            on_p2p_connection_incoming_success: Some(redux::callback!(
577                on_p2p_connection_incoming_success(rpc_id: RpcId) -> crate::Action {
578                    RpcAction::P2pConnectionIncomingSuccess { rpc_id }
579                }
580            )),
581            on_p2p_connection_incoming_answer_ready: Some(redux::callback!(
582                on_p2p_connection_incoming_answer_ready((rpc_id: RpcId, peer_id: PeerId, answer: P2pConnectionResponse)) -> crate::Action {
583                    RpcAction::P2pConnectionIncomingAnswerReady { rpc_id, answer, peer_id }
584                }
585            )),
586            on_p2p_peer_best_tip_update: Some(redux::callback!(
587                on_p2p_peer_best_tip_update(best_tip: BlockWithHash<Arc<v2::MinaBlockBlockStableV2>>) -> crate::Action {
588                    TransitionFrontierCandidateAction::P2pBestTipUpdate { best_tip }
589                }
590            )),
591            on_p2p_channels_rpc_ready: Some(redux::callback!(
592                on_p2p_channels_rpc_ready(peer_id: PeerId) -> crate::Action {
593                    P2pCallbacksAction::P2pChannelsRpcReady { peer_id }
594                }
595            )),
596            on_p2p_channels_rpc_timeout: Some(redux::callback!(
597                on_p2p_channels_rpc_timeout((peer_id: PeerId, id: P2pRpcId)) -> crate::Action {
598                    P2pCallbacksAction::P2pChannelsRpcTimeout { peer_id, id }
599                }
600            )),
601            on_p2p_channels_rpc_response_received: Some(redux::callback!(
602                on_p2p_channels_rpc_response_received((peer_id: PeerId, id: P2pRpcId, response: Option<Box<P2pRpcResponse>>)) -> crate::Action {
603                    P2pCallbacksAction::P2pChannelsRpcResponseReceived { peer_id, id, response }
604                }
605            )),
606            on_p2p_channels_rpc_request_received: Some(redux::callback!(
607                on_p2p_channels_rpc_request_received((peer_id: PeerId, id: P2pRpcId, request: Box<P2pRpcRequest>)) -> crate::Action {
608                    P2pCallbacksAction::P2pChannelsRpcRequestReceived { peer_id, id, request }
609                }
610            )),
611            on_p2p_channels_streaming_rpc_response_received: Some(redux::callback!(
612                on_p2p_channels_streaming_rpc_response_received((peer_id: PeerId, id: P2pRpcId, response: Option<P2pStreamingRpcResponseFull>)) -> crate::Action {
613                    P2pCallbacksAction::P2pChannelsStreamingRpcResponseReceived { peer_id, id, response }
614                }
615            )),
616            on_p2p_channels_streaming_rpc_timeout: Some(redux::callback!(
617                on_p2p_channels_streaming_rpc_timeout((peer_id: PeerId, id: P2pRpcId)) -> crate::Action {
618                    P2pCallbacksAction::P2pChannelsStreamingRpcTimeout { peer_id, id }
619                }
620            )),
621            on_p2p_pubsub_message_received: Some(redux::callback!(
622                on_p2p_pubsub_message_received((message_id: P2pNetworkPubsubMessageCacheId)) -> crate::Action{
623                    P2pCallbacksAction::P2pPubsubValidateMessage { message_id }
624                }
625            )),
626        }
627    }
628
629    pub fn ready(&self) -> Option<&P2pState> {
630        if let P2p::Ready(state) = self {
631            Some(state)
632        } else {
633            None
634        }
635    }
636
637    pub fn ready_mut(&mut self) -> Option<&mut P2pState> {
638        if let P2p::Ready(state) = self {
639            Some(state)
640        } else {
641            None
642        }
643    }
644
645    pub fn unwrap(&self) -> &P2pState {
646        self.ready().expect("p2p is not initialized")
647    }
648
649    pub fn is_enabled<T>(&self, action: &T, time: Timestamp) -> bool
650    where
651        T: EnablingCondition<P2pState>,
652    {
653        match self {
654            P2p::Pending(_) => false,
655            P2p::Ready(p2p_state) => action.is_enabled(p2p_state, time),
656        }
657    }
658
659    pub fn my_id(&self) -> PeerId {
660        match self {
661            P2p::Pending(config) => &config.identity_pub_key,
662            P2p::Ready(state) => &state.config.identity_pub_key,
663        }
664        .peer_id()
665    }
666
667    pub fn get_peer(&self, peer_id: &PeerId) -> Option<&P2pPeerState> {
668        self.ready().and_then(|p2p| p2p.peers.get(peer_id))
669    }
670
671    pub fn get_ready_peer(&self, peer_id: &PeerId) -> Option<&P2pPeerStatusReady> {
672        self.ready().and_then(|p2p| p2p.get_ready_peer(peer_id))
673    }
674
675    pub fn ready_peers(&self) -> Vec<PeerId> {
676        self.ready_peers_iter()
677            .map(|(peer_id, _)| *peer_id)
678            .collect()
679    }
680
681    pub fn ready_peers_iter(&self) -> ReadyPeersIter<'_> {
682        ReadyPeersIter::new(self)
683    }
684}
685
686#[derive(Debug, Clone)]
687pub struct ReadyPeersIter<'a>(Option<std::collections::btree_map::Iter<'a, PeerId, P2pPeerState>>);
688
689impl<'a> ReadyPeersIter<'a> {
690    fn new(p2p: &'a P2p) -> Self {
691        ReadyPeersIter(p2p.ready().map(|p2p| p2p.peers.iter()))
692    }
693}
694
695impl<'a> Iterator for ReadyPeersIter<'a> {
696    type Item = (&'a PeerId, &'a P2pPeerStatusReady);
697
698    fn next(&mut self) -> Option<Self::Item> {
699        let iter = self.0.as_mut()?;
700        Some(loop {
701            let (peer_id, state) = iter.next()?;
702            if let Some(ready) = state.status.as_ready() {
703                break (peer_id, ready);
704            }
705        })
706    }
707}