1use std::{sync::Arc, time::Duration};
2
3use malloc_size_of_derive::MallocSizeOf;
4use mina_core::{
5 block::prevalidate::{prevalidate_block, BlockPrevalidationError},
6 consensus::ConsensusTime,
7 transaction::{TransactionInfo, TransactionWithHash},
8};
9use mina_p2p_messages::v2;
10use p2p::P2pNetworkPubsubMessageCacheId;
11use rand::prelude::*;
12
13use mina_core::{
14 block::{ArcBlockWithHash, BlockWithHash},
15 consensus::ConsensusConstants,
16 constants::constraint_constants,
17 error,
18 requests::RpcId,
19 snark::{Snark, SnarkInfo, SnarkJobCommitment},
20 ChainId,
21};
22use p2p::{
23 bootstrap::P2pNetworkKadBootstrapState,
24 channels::{
25 rpc::{P2pRpcId, P2pRpcRequest, P2pRpcResponse},
26 streaming_rpc::P2pStreamingRpcResponseFull,
27 },
28 connection::{outgoing::P2pConnectionOutgoingError, P2pConnectionResponse},
29 network::identify::P2pNetworkIdentifyState,
30 P2pCallbacks, P2pConfig, P2pNetworkSchedulerState, P2pPeerState, P2pPeerStatusReady, PeerId,
31};
32use redux::{ActionMeta, EnablingCondition, Timestamp};
33use serde::{Deserialize, Serialize};
34use snark::{
35 block_verify::SnarkBlockVerifyState, user_command_verify::SnarkUserCommandVerifyState,
36 work_verify::SnarkWorkVerifyState,
37};
38
39use crate::{
40 block_producer::vrf_evaluator::BlockProducerVrfEvaluatorState,
41 config::GlobalConfig,
42 external_snark_worker::{ExternalSnarkWorker, ExternalSnarkWorkers},
43 ledger::{read::LedgerReadState, write::LedgerWriteState},
44 p2p::callbacks::P2pCallbacksAction,
45 snark_pool::candidate::SnarkPoolCandidateAction,
46 transaction_pool::{
47 candidate::{TransactionPoolCandidateAction, TransactionPoolCandidatesState},
48 TransactionPoolState,
49 },
50 transition_frontier::{
51 candidate::TransitionFrontierCandidateAction,
52 genesis::TransitionFrontierGenesisState,
53 sync::{
54 ledger::{
55 snarked::TransitionFrontierSyncLedgerSnarkedState,
56 staged::TransitionFrontierSyncLedgerStagedState, TransitionFrontierSyncLedgerState,
57 },
58 TransitionFrontierSyncState,
59 },
60 },
61 ActionWithMeta, RpcAction, SnarkPoolAction,
62};
63pub use crate::{
64 block_producer::BlockProducerState,
65 ledger::LedgerState,
66 p2p::P2pState,
67 rpc::RpcState,
68 snark::SnarkState,
69 snark_pool::{candidate::SnarkPoolCandidatesState, SnarkPoolState},
70 transition_frontier::{candidate::TransitionFrontierCandidatesState, TransitionFrontierState},
71 watched_accounts::WatchedAccountsState,
72 Config,
73};
74
75#[derive(Serialize, Deserialize, Debug, Clone)]
76pub struct State {
77 pub config: GlobalConfig,
78
79 pub p2p: P2p,
80 pub ledger: LedgerState,
81 pub snark: SnarkState,
82 pub transition_frontier: TransitionFrontierState,
83 pub snark_pool: SnarkPoolState,
84 pub external_snark_worker: ExternalSnarkWorkers,
85 pub transaction_pool: TransactionPoolState,
86 pub block_producer: BlockProducerState,
87 pub rpc: RpcState,
88
89 pub watched_accounts: WatchedAccountsState,
90
91 last_action: ActionMeta,
93 applied_actions_count: u64,
94}
95
96use mina_core::{bug_condition, impl_substate_access, SubstateAccess};
98
99impl_substate_access!(State, SnarkState, snark);
100impl_substate_access!(State, SnarkBlockVerifyState, snark.block_verify);
101impl_substate_access!(State, SnarkWorkVerifyState, snark.work_verify);
102impl_substate_access!(
103 State,
104 SnarkUserCommandVerifyState,
105 snark.user_command_verify
106);
107impl_substate_access!(State, TransitionFrontierState, transition_frontier);
108impl_substate_access!(
109 State,
110 TransitionFrontierCandidatesState,
111 transition_frontier.candidates
112);
113impl_substate_access!(State, TransactionPoolState, transaction_pool);
114impl_substate_access!(
115 State,
116 TransactionPoolCandidatesState,
117 transaction_pool.candidates
118);
119impl_substate_access!(
120 State,
121 TransitionFrontierGenesisState,
122 transition_frontier.genesis
123);
124impl_substate_access!(State, TransitionFrontierSyncState, transition_frontier.sync);
125impl_substate_access!(State, SnarkPoolState, snark_pool);
126impl_substate_access!(State, SnarkPoolCandidatesState, snark_pool.candidates);
127impl_substate_access!(State, ExternalSnarkWorkers, external_snark_worker);
128impl_substate_access!(State, BlockProducerState, block_producer);
129impl_substate_access!(State, RpcState, rpc);
130impl_substate_access!(State, WatchedAccountsState, watched_accounts);
131impl_substate_access!(State, ExternalSnarkWorker, external_snark_worker.0);
132impl_substate_access!(State, LedgerState, ledger);
133impl_substate_access!(State, LedgerReadState, ledger.read);
134impl_substate_access!(State, LedgerWriteState, ledger.write);
135
136impl mina_core::SubstateAccess<P2pState> for State {
137 fn substate(&self) -> mina_core::SubstateResult<&P2pState> {
138 self.p2p
139 .ready()
140 .ok_or_else(|| "P2P state unavailable. P2P layer is not ready".to_owned())
141 }
142
143 fn substate_mut(&mut self) -> mina_core::SubstateResult<&mut P2pState> {
144 self.p2p
145 .ready_mut()
146 .ok_or_else(|| "P2P state unavailable. P2P layer is not ready".to_owned())
147 }
148}
149
150impl mina_core::SubstateAccess<TransitionFrontierSyncLedgerState> for State {
151 fn substate(&self) -> mina_core::SubstateResult<&TransitionFrontierSyncLedgerState> {
152 self.transition_frontier
153 .sync
154 .ledger()
155 .ok_or_else(|| "Ledger sync state unavailable".to_owned())
156 }
157
158 fn substate_mut(
159 &mut self,
160 ) -> mina_core::SubstateResult<&mut TransitionFrontierSyncLedgerState> {
161 self.transition_frontier
162 .sync
163 .ledger_mut()
164 .ok_or_else(|| "Ledger sync state unavailable".to_owned())
165 }
166}
167
168impl SubstateAccess<BlockProducerVrfEvaluatorState> for State {
169 fn substate(&self) -> mina_core::SubstateResult<&BlockProducerVrfEvaluatorState> {
170 self.block_producer
171 .as_ref()
172 .map(|state| &state.vrf_evaluator)
173 .ok_or_else(|| "Block producer VRF evaluator state unavailable".to_owned())
174 }
175
176 fn substate_mut(&mut self) -> mina_core::SubstateResult<&mut BlockProducerVrfEvaluatorState> {
177 self.block_producer
178 .as_mut()
179 .map(|state| &mut state.vrf_evaluator)
180 .ok_or_else(|| "Block producer VRF evaluator state unavailable".to_owned())
181 }
182}
183
184impl mina_core::SubstateAccess<TransitionFrontierSyncLedgerSnarkedState> for State {
185 fn substate(&self) -> mina_core::SubstateResult<&TransitionFrontierSyncLedgerSnarkedState> {
186 self.transition_frontier
187 .sync
188 .ledger()
189 .ok_or_else(|| {
190 "Snarked ledger state unavailable. Ledger sync state unavailable".to_owned()
191 })?
192 .snarked()
193 .ok_or_else(|| "Snarked ledger state unavailable".to_owned())
194 }
195
196 fn substate_mut(
197 &mut self,
198 ) -> mina_core::SubstateResult<&mut TransitionFrontierSyncLedgerSnarkedState> {
199 self.transition_frontier
200 .sync
201 .ledger_mut()
202 .ok_or_else(|| {
203 "Snarked ledger state unavailable. Ledger sync state unavailable".to_owned()
204 })?
205 .snarked_mut()
206 .ok_or_else(|| "Snarked ledger state unavailable".to_owned())
207 }
208}
209
210impl mina_core::SubstateAccess<TransitionFrontierSyncLedgerStagedState> for State {
211 fn substate(&self) -> mina_core::SubstateResult<&TransitionFrontierSyncLedgerStagedState> {
212 self.transition_frontier
213 .sync
214 .ledger()
215 .ok_or_else(|| {
216 "Staged ledger state unavailable. Ledger sync state unavailable".to_owned()
217 })?
218 .staged()
219 .ok_or_else(|| "Staged ledger state unavailable".to_owned())
220 }
221
222 fn substate_mut(
223 &mut self,
224 ) -> mina_core::SubstateResult<&mut TransitionFrontierSyncLedgerStagedState> {
225 self.transition_frontier
226 .sync
227 .ledger_mut()
228 .ok_or_else(|| {
229 "Staged ledger state unavailable. Ledger sync state unavailable".to_owned()
230 })?
231 .staged_mut()
232 .ok_or_else(|| "Staged ledger state unavailable".to_owned())
233 }
234}
235
236impl SubstateAccess<State> for State {
237 fn substate(&self) -> mina_core::SubstateResult<&State> {
238 Ok(self)
239 }
240
241 fn substate_mut(&mut self) -> mina_core::SubstateResult<&mut State> {
242 Ok(self)
243 }
244}
245
246macro_rules! impl_p2p_state_access {
247 ($state:ty, $substate_type:ty) => {
248 impl mina_core::SubstateAccess<$substate_type> for $state {
249 fn substate(&self) -> mina_core::SubstateResult<&$substate_type> {
250 let substate: &P2pState = self.substate()?;
251 substate.substate()
252 }
253
254 fn substate_mut(&mut self) -> mina_core::SubstateResult<&mut $substate_type> {
255 let substate: &mut P2pState = self.substate_mut()?;
256 substate.substate_mut()
257 }
258 }
259 };
260}
261
262impl_p2p_state_access!(State, P2pNetworkIdentifyState);
263impl_p2p_state_access!(State, p2p::P2pNetworkState);
264impl_p2p_state_access!(State, P2pNetworkKadBootstrapState);
265impl_p2p_state_access!(State, p2p::P2pNetworkKadState);
266impl_p2p_state_access!(State, P2pNetworkSchedulerState);
267impl_p2p_state_access!(State, p2p::P2pLimits);
268impl_p2p_state_access!(State, p2p::P2pNetworkPubsubState);
269impl_p2p_state_access!(State, p2p::P2pConfig);
270
271impl p2p::P2pStateTrait for State {}
272
273pub type Substate<'a, S> = mina_core::Substate<'a, crate::Action, State, S>;
274
275impl State {
276 pub fn new(config: Config, constants: &ConsensusConstants, now: Timestamp) -> Self {
277 Self {
278 p2p: P2p::Pending(config.p2p),
279 ledger: LedgerState::new(config.ledger),
280 snark_pool: SnarkPoolState::new(),
281 snark: SnarkState::new(config.snark),
282 transition_frontier: TransitionFrontierState::new(
283 config.transition_frontier,
284 config.archive.is_some(),
285 ),
286 external_snark_worker: ExternalSnarkWorkers::new(now),
287 block_producer: BlockProducerState::new(now, config.block_producer),
288 rpc: RpcState::new(),
289 transaction_pool: TransactionPoolState::new(config.tx_pool, constants),
290
291 watched_accounts: WatchedAccountsState::new(),
292
293 config: config.global,
294 last_action: ActionMeta::zero_custom(now),
295 applied_actions_count: 0,
296 }
297 }
298
299 pub fn last_action(&self) -> &ActionMeta {
300 &self.last_action
301 }
302
303 #[inline(always)]
307 pub fn time(&self) -> Timestamp {
308 self.last_action.time()
309 }
310
311 pub fn pseudo_rng(&self) -> StdRng {
312 crate::core::pseudo_rng(self.time())
313 }
314
315 pub fn action_applied(&mut self, action: &ActionWithMeta) {
318 self.last_action = action.meta().clone();
319 self.applied_actions_count = self.applied_actions_count.checked_add(1).expect("overflow");
320 }
321
322 pub fn genesis_block(&self) -> Option<ArcBlockWithHash> {
323 self.transition_frontier
324 .genesis
325 .block_with_real_or_dummy_proof()
326 }
327
328 fn cur_slot(&self, initial_slot: impl FnOnce(&ArcBlockWithHash) -> u32) -> Option<u32> {
329 let genesis = self.genesis_block()?;
330 let diff_ns = u64::from(self.time()).saturating_sub(u64::from(genesis.timestamp()));
331 let diff_ms = diff_ns / 1_000_000;
332 let slots = diff_ms
333 .checked_div(constraint_constants().block_window_duration_ms)
334 .expect("division by 0");
335 Some(
336 initial_slot(&genesis)
337 .checked_add(slots as u32)
338 .expect("overflow"),
339 )
340 }
341
342 pub fn cur_global_slot(&self) -> Option<u32> {
346 self.cur_slot(|b| b.global_slot())
347 }
348
349 pub fn current_slot(&self) -> Option<u32> {
350 let slots_per_epoch = self.genesis_block()?.constants().slots_per_epoch.as_u32();
351 Some(
352 self.cur_global_slot()?
353 .checked_rem(slots_per_epoch)
354 .expect("division by 0"),
355 )
356 }
357
358 pub fn cur_global_slot_since_genesis(&self) -> Option<u32> {
359 self.cur_slot(|b| b.global_slot_since_genesis())
360 }
361
362 pub fn current_epoch(&self) -> Option<u32> {
363 let slots_per_epoch = self.genesis_block()?.constants().slots_per_epoch.as_u32();
364 Some(
365 self.cur_global_slot()?
366 .checked_div(slots_per_epoch)
367 .expect("division by 0"),
368 )
369 }
370
371 pub fn slot_time(&self, global_slot: u64) -> Option<(Timestamp, Timestamp)> {
372 let genesis_timestamp = self.genesis_block()?.genesis_timestamp();
373 println!("genesis_timestamp: {}", u64::from(genesis_timestamp));
374
375 let start_time = genesis_timestamp.checked_add(
376 global_slot
377 .checked_mul(constraint_constants().block_window_duration_ms)?
378 .checked_mul(1_000_000)?,
379 )?;
380 let end_time = start_time.checked_add(
381 constraint_constants()
382 .block_window_duration_ms
383 .checked_mul(1_000_000)?,
384 )?;
385
386 Some((start_time, end_time))
387 }
388
389 pub fn producing_block_after_genesis(&self) -> bool {
390 #[allow(clippy::arithmetic_side_effects)]
391 let two_mins_in_future = self.time() + Duration::from_secs(2 * 60);
392 self.block_producer.with(false, |bp| {
393 bp.current.won_slot_should_produce(two_mins_in_future)
394 }) && self.genesis_block().is_some_and(|b| {
395 let slot = &b.consensus_state().curr_global_slot_since_hard_fork;
396 let epoch = slot
397 .slot_number
398 .as_u32()
399 .checked_div(slot.slots_per_epoch.as_u32())
400 .expect("division by 0");
401 self.current_epoch() <= Some(epoch)
402 })
403 }
404
405 pub fn prevalidate_block(
406 &self,
407 block: &ArcBlockWithHash,
408 allow_block_too_late: bool,
409 ) -> Result<(), BlockPrevalidationError> {
410 let Some((genesis, cur_global_slot)) =
411 None.or_else(|| Some((self.genesis_block()?, self.cur_global_slot()?)))
412 else {
413 bug_condition!("Tried to prevalidate a block before the genesis block was ready");
418 return Err(BlockPrevalidationError::GenesisNotReady);
419 };
420
421 prevalidate_block(block, &genesis, cur_global_slot, allow_block_too_late)
422 }
423
424 pub fn should_log_node_id(&self) -> bool {
425 self.config.testing_run
426 }
427
428 pub fn consensus_time_now(&self) -> Option<ConsensusTime> {
429 let (start_time, end_time) = self.slot_time(self.cur_global_slot()?.into())?;
430 let epoch = self.current_epoch()?;
431 let global_slot = self.cur_global_slot()?;
432 let slot = self.current_slot()?;
433 Some(ConsensusTime {
434 start_time,
435 end_time,
436 epoch,
437 global_slot,
438 slot,
439 })
440 }
441
442 pub fn consensus_time_best_tip(&self) -> Option<ConsensusTime> {
443 let best_tip = self.transition_frontier.best_tip()?;
444 let global_slot = best_tip
445 .curr_global_slot_since_hard_fork()
446 .slot_number
447 .as_u32();
448 let (start_time, end_time) = self.slot_time(global_slot.into())?;
449 let epoch = best_tip.consensus_state().epoch_count.as_u32();
450 let slot = best_tip.slot();
451 Some(ConsensusTime {
452 start_time,
453 end_time,
454 epoch,
455 global_slot,
456 slot,
457 })
458 }
459}
460
461#[serde_with::serde_as]
462#[derive(Debug, Clone, Serialize, Deserialize, MallocSizeOf)]
463pub enum P2p {
464 Pending(#[ignore_malloc_size_of = "constant"] P2pConfig),
465 Ready(P2pState),
466}
467
468#[derive(Debug, thiserror::Error)]
469pub enum P2pInitializationError {
470 #[error("p2p is already initialized")]
471 AlreadyInitialized,
472}
473
474#[macro_export]
475macro_rules! p2p_ready {
476 ($p2p:expr, $time:expr) => {
477 p2p_ready!($p2p, "", $time)
478 };
479 ($p2p:expr, $reason:expr, $time:expr) => {
480 match $p2p.ready() {
481 Some(v) => v,
482 None => {
483 mina_core::error!($time; "p2p is not initialized: {}", $reason);
485 return;
486 }
487 }
488 };
489}
490
491impl P2p {
492 pub fn config(&self) -> &P2pConfig {
493 match self {
494 P2p::Pending(config) => config,
495 P2p::Ready(p2p_state) => &p2p_state.config,
496 }
497 }
498
499 pub fn initialize(&mut self, chain_id: &ChainId) -> Result<(), P2pInitializationError> {
501 let P2p::Pending(config) = self else {
502 return Err(P2pInitializationError::AlreadyInitialized);
503 };
504
505 let callbacks = Self::p2p_callbacks();
506 *self = P2p::Ready(P2pState::new(config.clone(), callbacks, chain_id));
507 Ok(())
508 }
509
510 fn p2p_callbacks() -> P2pCallbacks {
511 P2pCallbacks {
512 on_p2p_channels_transaction_received: Some(redux::callback!(
513 on_p2p_channels_transaction_received((peer_id: PeerId, info: Box<TransactionInfo>)) -> crate::Action {
514 TransactionPoolCandidateAction::InfoReceived {
515 peer_id,
516 info: *info,
517 }
518 }
519 )),
520 on_p2p_channels_transactions_libp2p_received: Some(redux::callback!(
521 on_p2p_channels_transactions_libp2p_received((peer_id: PeerId, transactions: Vec<TransactionWithHash>, message_id: P2pNetworkPubsubMessageCacheId)) -> crate::Action {
522 TransactionPoolCandidateAction::Libp2pTransactionsReceived {
523 message_id,
524 transactions,
525 peer_id
526 }
527 }
528 )),
529 on_p2p_channels_snark_job_commitment_received: Some(redux::callback!(
530 on_p2p_channels_snark_job_commitment_received((peer_id: PeerId, commitment: Box<SnarkJobCommitment>)) -> crate::Action {
531 SnarkPoolAction::CommitmentAdd { commitment: *commitment, sender: peer_id }
532 }
533 )),
534 on_p2p_channels_snark_received: Some(redux::callback!(
535 on_p2p_channels_snark_received((peer_id: PeerId, snark: Box<SnarkInfo>)) -> crate::Action {
536 SnarkPoolCandidateAction::InfoReceived { peer_id, info: *snark }
537 }
538 )),
539 on_p2p_channels_snark_libp2p_received: Some(redux::callback!(
540 on_p2p_channels_snark_libp2p_received((peer_id: PeerId, snark: Box<Snark>)) -> crate::Action {
541 SnarkPoolCandidateAction::WorkFetchSuccess { peer_id, work: *snark }
542 }
543 )),
544 on_p2p_channels_streaming_rpc_ready: Some(redux::callback!(
545 on_p2p_channels_streaming_rpc_ready(_var: ()) -> crate::Action {
546 P2pCallbacksAction::P2pChannelsStreamingRpcReady
547 }
548 )),
549 on_p2p_channels_best_tip_request_received: Some(redux::callback!(
550 on_p2p_channels_best_tip_request_received(peer_id: PeerId) -> crate::Action {
551 P2pCallbacksAction::RpcRespondBestTip { peer_id }
552 }
553 )),
554 on_p2p_disconnection_finish: Some(redux::callback!(
555 on_p2p_disconnection_finish(peer_id: PeerId) -> crate::Action {
556 P2pCallbacksAction::P2pDisconnection { peer_id }
557 }
558 )),
559 on_p2p_connection_outgoing_error: Some(redux::callback!(
560 on_p2p_connection_outgoing_error((rpc_id: RpcId, error: P2pConnectionOutgoingError)) -> crate::Action {
561 RpcAction::P2pConnectionOutgoingError { rpc_id, error }
562 }
563 )),
564 on_p2p_connection_outgoing_success: Some(redux::callback!(
565 on_p2p_connection_outgoing_success(rpc_id: RpcId) -> crate::Action {
566 RpcAction::P2pConnectionOutgoingSuccess { rpc_id }
567 }
568 )),
569 on_p2p_connection_incoming_error: Some(redux::callback!(
570 on_p2p_connection_incoming_error((rpc_id: RpcId, error: String)) -> crate::Action {
571 RpcAction::P2pConnectionIncomingError { rpc_id, error }
572 }
573 )),
574 on_p2p_connection_incoming_success: Some(redux::callback!(
575 on_p2p_connection_incoming_success(rpc_id: RpcId) -> crate::Action {
576 RpcAction::P2pConnectionIncomingSuccess { rpc_id }
577 }
578 )),
579 on_p2p_connection_incoming_answer_ready: Some(redux::callback!(
580 on_p2p_connection_incoming_answer_ready((rpc_id: RpcId, peer_id: PeerId, answer: P2pConnectionResponse)) -> crate::Action {
581 RpcAction::P2pConnectionIncomingAnswerReady { rpc_id, answer, peer_id }
582 }
583 )),
584 on_p2p_peer_best_tip_update: Some(redux::callback!(
585 on_p2p_peer_best_tip_update(best_tip: BlockWithHash<Arc<v2::MinaBlockBlockStableV2>>) -> crate::Action {
586 TransitionFrontierCandidateAction::P2pBestTipUpdate { best_tip }
587 }
588 )),
589 on_p2p_channels_rpc_ready: Some(redux::callback!(
590 on_p2p_channels_rpc_ready(peer_id: PeerId) -> crate::Action {
591 P2pCallbacksAction::P2pChannelsRpcReady { peer_id }
592 }
593 )),
594 on_p2p_channels_rpc_timeout: Some(redux::callback!(
595 on_p2p_channels_rpc_timeout((peer_id: PeerId, id: P2pRpcId)) -> crate::Action {
596 P2pCallbacksAction::P2pChannelsRpcTimeout { peer_id, id }
597 }
598 )),
599 on_p2p_channels_rpc_response_received: Some(redux::callback!(
600 on_p2p_channels_rpc_response_received((peer_id: PeerId, id: P2pRpcId, response: Option<Box<P2pRpcResponse>>)) -> crate::Action {
601 P2pCallbacksAction::P2pChannelsRpcResponseReceived { peer_id, id, response }
602 }
603 )),
604 on_p2p_channels_rpc_request_received: Some(redux::callback!(
605 on_p2p_channels_rpc_request_received((peer_id: PeerId, id: P2pRpcId, request: Box<P2pRpcRequest>)) -> crate::Action {
606 P2pCallbacksAction::P2pChannelsRpcRequestReceived { peer_id, id, request }
607 }
608 )),
609 on_p2p_channels_streaming_rpc_response_received: Some(redux::callback!(
610 on_p2p_channels_streaming_rpc_response_received((peer_id: PeerId, id: P2pRpcId, response: Option<P2pStreamingRpcResponseFull>)) -> crate::Action {
611 P2pCallbacksAction::P2pChannelsStreamingRpcResponseReceived { peer_id, id, response }
612 }
613 )),
614 on_p2p_channels_streaming_rpc_timeout: Some(redux::callback!(
615 on_p2p_channels_streaming_rpc_timeout((peer_id: PeerId, id: P2pRpcId)) -> crate::Action {
616 P2pCallbacksAction::P2pChannelsStreamingRpcTimeout { peer_id, id }
617 }
618 )),
619 on_p2p_pubsub_message_received: Some(redux::callback!(
620 on_p2p_pubsub_message_received((message_id: P2pNetworkPubsubMessageCacheId)) -> crate::Action{
621 P2pCallbacksAction::P2pPubsubValidateMessage { message_id }
622 }
623 )),
624 }
625 }
626
627 pub fn ready(&self) -> Option<&P2pState> {
628 if let P2p::Ready(state) = self {
629 Some(state)
630 } else {
631 None
632 }
633 }
634
635 pub fn ready_mut(&mut self) -> Option<&mut P2pState> {
636 if let P2p::Ready(state) = self {
637 Some(state)
638 } else {
639 None
640 }
641 }
642
643 pub fn unwrap(&self) -> &P2pState {
644 self.ready().expect("p2p is not initialized")
645 }
646
647 pub fn is_enabled<T>(&self, action: &T, time: Timestamp) -> bool
648 where
649 T: EnablingCondition<P2pState>,
650 {
651 match self {
652 P2p::Pending(_) => false,
653 P2p::Ready(p2p_state) => action.is_enabled(p2p_state, time),
654 }
655 }
656
657 pub fn my_id(&self) -> PeerId {
658 match self {
659 P2p::Pending(config) => &config.identity_pub_key,
660 P2p::Ready(state) => &state.config.identity_pub_key,
661 }
662 .peer_id()
663 }
664
665 pub fn get_peer(&self, peer_id: &PeerId) -> Option<&P2pPeerState> {
666 self.ready().and_then(|p2p| p2p.peers.get(peer_id))
667 }
668
669 pub fn get_ready_peer(&self, peer_id: &PeerId) -> Option<&P2pPeerStatusReady> {
670 self.ready().and_then(|p2p| p2p.get_ready_peer(peer_id))
671 }
672
673 pub fn ready_peers(&self) -> Vec<PeerId> {
674 self.ready_peers_iter()
675 .map(|(peer_id, _)| *peer_id)
676 .collect()
677 }
678
679 pub fn ready_peers_iter(&self) -> ReadyPeersIter<'_> {
680 ReadyPeersIter::new(self)
681 }
682}
683
684#[derive(Debug, Clone)]
685pub struct ReadyPeersIter<'a>(Option<std::collections::btree_map::Iter<'a, PeerId, P2pPeerState>>);
686
687impl<'a> ReadyPeersIter<'a> {
688 fn new(p2p: &'a P2p) -> Self {
689 ReadyPeersIter(p2p.ready().map(|p2p| p2p.peers.iter()))
690 }
691}
692
693impl<'a> Iterator for ReadyPeersIter<'a> {
694 type Item = (&'a PeerId, &'a P2pPeerStatusReady);
695
696 fn next(&mut self) -> Option<Self::Item> {
697 let iter = self.0.as_mut()?;
698 Some(loop {
699 let (peer_id, state) = iter.next()?;
700 if let Some(ready) = state.status.as_ready() {
701 break (peer_id, ready);
702 }
703 })
704 }
705}