1use std::{sync::Arc, time::Duration};
2
3use malloc_size_of_derive::MallocSizeOf;
4use mina_core::{
5 block::prevalidate::{prevalidate_block, BlockPrevalidationError},
6 consensus::ConsensusTime,
7 transaction::{TransactionInfo, TransactionWithHash},
8};
9use mina_p2p_messages::v2;
10use p2p::P2pNetworkPubsubMessageCacheId;
11use rand::prelude::*;
12
13use mina_core::{
14 block::{ArcBlockWithHash, BlockWithHash},
15 consensus::ConsensusConstants,
16 constants::constraint_constants,
17 requests::RpcId,
18 snark::{Snark, SnarkInfo, SnarkJobCommitment},
19 ChainId,
20};
21use p2p::{
22 bootstrap::P2pNetworkKadBootstrapState,
23 channels::{
24 rpc::{P2pRpcId, P2pRpcRequest, P2pRpcResponse},
25 streaming_rpc::P2pStreamingRpcResponseFull,
26 },
27 connection::{outgoing::P2pConnectionOutgoingError, P2pConnectionResponse},
28 network::identify::P2pNetworkIdentifyState,
29 P2pCallbacks, P2pConfig, P2pNetworkSchedulerState, P2pPeerState, P2pPeerStatusReady, PeerId,
30};
31use redux::{ActionMeta, EnablingCondition, Timestamp};
32use serde::{Deserialize, Serialize};
33use snark::{
34 block_verify::SnarkBlockVerifyState, user_command_verify::SnarkUserCommandVerifyState,
35 work_verify::SnarkWorkVerifyState,
36};
37
38use crate::{
39 block_producer::vrf_evaluator::BlockProducerVrfEvaluatorState,
40 config::GlobalConfig,
41 external_snark_worker::{ExternalSnarkWorker, ExternalSnarkWorkers},
42 ledger::{read::LedgerReadState, write::LedgerWriteState},
43 p2p::callbacks::P2pCallbacksAction,
44 snark_pool::candidate::SnarkPoolCandidateAction,
45 transaction_pool::{
46 candidate::{TransactionPoolCandidateAction, TransactionPoolCandidatesState},
47 TransactionPoolState,
48 },
49 transition_frontier::{
50 candidate::TransitionFrontierCandidateAction,
51 genesis::TransitionFrontierGenesisState,
52 sync::{
53 ledger::{
54 snarked::TransitionFrontierSyncLedgerSnarkedState,
55 staged::TransitionFrontierSyncLedgerStagedState, TransitionFrontierSyncLedgerState,
56 },
57 TransitionFrontierSyncState,
58 },
59 },
60 ActionWithMeta, RpcAction, SnarkPoolAction,
61};
62pub use crate::{
63 block_producer::BlockProducerState,
64 ledger::LedgerState,
65 p2p::P2pState,
66 rpc::RpcState,
67 snark::SnarkState,
68 snark_pool::{candidate::SnarkPoolCandidatesState, SnarkPoolState},
69 transition_frontier::{candidate::TransitionFrontierCandidatesState, TransitionFrontierState},
70 watched_accounts::WatchedAccountsState,
71 Config,
72};
73
74#[derive(Serialize, Deserialize, Debug, Clone)]
75pub struct State {
76 pub config: GlobalConfig,
77
78 pub p2p: P2p,
79 pub ledger: LedgerState,
80 pub snark: SnarkState,
81 pub transition_frontier: TransitionFrontierState,
82 pub snark_pool: SnarkPoolState,
83 pub external_snark_worker: ExternalSnarkWorkers,
84 pub transaction_pool: TransactionPoolState,
85 pub block_producer: BlockProducerState,
86 pub rpc: RpcState,
87
88 pub watched_accounts: WatchedAccountsState,
89
90 last_action: ActionMeta,
92 applied_actions_count: u64,
93}
94
95use mina_core::{bug_condition, impl_substate_access, SubstateAccess};
97
98impl_substate_access!(State, SnarkState, snark);
99impl_substate_access!(State, SnarkBlockVerifyState, snark.block_verify);
100impl_substate_access!(State, SnarkWorkVerifyState, snark.work_verify);
101impl_substate_access!(
102 State,
103 SnarkUserCommandVerifyState,
104 snark.user_command_verify
105);
106impl_substate_access!(State, TransitionFrontierState, transition_frontier);
107impl_substate_access!(
108 State,
109 TransitionFrontierCandidatesState,
110 transition_frontier.candidates
111);
112impl_substate_access!(State, TransactionPoolState, transaction_pool);
113impl_substate_access!(
114 State,
115 TransactionPoolCandidatesState,
116 transaction_pool.candidates
117);
118impl_substate_access!(
119 State,
120 TransitionFrontierGenesisState,
121 transition_frontier.genesis
122);
123impl_substate_access!(State, TransitionFrontierSyncState, transition_frontier.sync);
124impl_substate_access!(State, SnarkPoolState, snark_pool);
125impl_substate_access!(State, SnarkPoolCandidatesState, snark_pool.candidates);
126impl_substate_access!(State, ExternalSnarkWorkers, external_snark_worker);
127impl_substate_access!(State, BlockProducerState, block_producer);
128impl_substate_access!(State, RpcState, rpc);
129impl_substate_access!(State, WatchedAccountsState, watched_accounts);
130impl_substate_access!(State, ExternalSnarkWorker, external_snark_worker.0);
131impl_substate_access!(State, LedgerState, ledger);
132impl_substate_access!(State, LedgerReadState, ledger.read);
133impl_substate_access!(State, LedgerWriteState, ledger.write);
134
135impl mina_core::SubstateAccess<P2pState> for State {
136 fn substate(&self) -> mina_core::SubstateResult<&P2pState> {
137 self.p2p
138 .ready()
139 .ok_or_else(|| "P2P state unavailable. P2P layer is not ready".to_owned())
140 }
141
142 fn substate_mut(&mut self) -> mina_core::SubstateResult<&mut P2pState> {
143 self.p2p
144 .ready_mut()
145 .ok_or_else(|| "P2P state unavailable. P2P layer is not ready".to_owned())
146 }
147}
148
149impl mina_core::SubstateAccess<TransitionFrontierSyncLedgerState> for State {
150 fn substate(&self) -> mina_core::SubstateResult<&TransitionFrontierSyncLedgerState> {
151 self.transition_frontier
152 .sync
153 .ledger()
154 .ok_or_else(|| "Ledger sync state unavailable".to_owned())
155 }
156
157 fn substate_mut(
158 &mut self,
159 ) -> mina_core::SubstateResult<&mut TransitionFrontierSyncLedgerState> {
160 self.transition_frontier
161 .sync
162 .ledger_mut()
163 .ok_or_else(|| "Ledger sync state unavailable".to_owned())
164 }
165}
166
167impl SubstateAccess<BlockProducerVrfEvaluatorState> for State {
168 fn substate(&self) -> mina_core::SubstateResult<&BlockProducerVrfEvaluatorState> {
169 self.block_producer
170 .as_ref()
171 .map(|state| &state.vrf_evaluator)
172 .ok_or_else(|| "Block producer VRF evaluator state unavailable".to_owned())
173 }
174
175 fn substate_mut(&mut self) -> mina_core::SubstateResult<&mut BlockProducerVrfEvaluatorState> {
176 self.block_producer
177 .as_mut()
178 .map(|state| &mut state.vrf_evaluator)
179 .ok_or_else(|| "Block producer VRF evaluator state unavailable".to_owned())
180 }
181}
182
183impl mina_core::SubstateAccess<TransitionFrontierSyncLedgerSnarkedState> for State {
184 fn substate(&self) -> mina_core::SubstateResult<&TransitionFrontierSyncLedgerSnarkedState> {
185 self.transition_frontier
186 .sync
187 .ledger()
188 .ok_or_else(|| {
189 "Snarked ledger state unavailable. Ledger sync state unavailable".to_owned()
190 })?
191 .snarked()
192 .ok_or_else(|| "Snarked ledger state unavailable".to_owned())
193 }
194
195 fn substate_mut(
196 &mut self,
197 ) -> mina_core::SubstateResult<&mut TransitionFrontierSyncLedgerSnarkedState> {
198 self.transition_frontier
199 .sync
200 .ledger_mut()
201 .ok_or_else(|| {
202 "Snarked ledger state unavailable. Ledger sync state unavailable".to_owned()
203 })?
204 .snarked_mut()
205 .ok_or_else(|| "Snarked ledger state unavailable".to_owned())
206 }
207}
208
209impl mina_core::SubstateAccess<TransitionFrontierSyncLedgerStagedState> for State {
210 fn substate(&self) -> mina_core::SubstateResult<&TransitionFrontierSyncLedgerStagedState> {
211 self.transition_frontier
212 .sync
213 .ledger()
214 .ok_or_else(|| {
215 "Staged ledger state unavailable. Ledger sync state unavailable".to_owned()
216 })?
217 .staged()
218 .ok_or_else(|| "Staged ledger state unavailable".to_owned())
219 }
220
221 fn substate_mut(
222 &mut self,
223 ) -> mina_core::SubstateResult<&mut TransitionFrontierSyncLedgerStagedState> {
224 self.transition_frontier
225 .sync
226 .ledger_mut()
227 .ok_or_else(|| {
228 "Staged ledger state unavailable. Ledger sync state unavailable".to_owned()
229 })?
230 .staged_mut()
231 .ok_or_else(|| "Staged ledger state unavailable".to_owned())
232 }
233}
234
235impl SubstateAccess<State> for State {
236 fn substate(&self) -> mina_core::SubstateResult<&State> {
237 Ok(self)
238 }
239
240 fn substate_mut(&mut self) -> mina_core::SubstateResult<&mut State> {
241 Ok(self)
242 }
243}
244
245macro_rules! impl_p2p_state_access {
246 ($state:ty, $substate_type:ty) => {
247 impl mina_core::SubstateAccess<$substate_type> for $state {
248 fn substate(&self) -> mina_core::SubstateResult<&$substate_type> {
249 let substate: &P2pState = self.substate()?;
250 substate.substate()
251 }
252
253 fn substate_mut(&mut self) -> mina_core::SubstateResult<&mut $substate_type> {
254 let substate: &mut P2pState = self.substate_mut()?;
255 substate.substate_mut()
256 }
257 }
258 };
259}
260
261impl_p2p_state_access!(State, P2pNetworkIdentifyState);
262impl_p2p_state_access!(State, p2p::P2pNetworkState);
263impl_p2p_state_access!(State, P2pNetworkKadBootstrapState);
264impl_p2p_state_access!(State, p2p::P2pNetworkKadState);
265impl_p2p_state_access!(State, P2pNetworkSchedulerState);
266impl_p2p_state_access!(State, p2p::P2pLimits);
267impl_p2p_state_access!(State, p2p::P2pNetworkPubsubState);
268impl_p2p_state_access!(State, p2p::P2pConfig);
269
270impl p2p::P2pStateTrait for State {}
271
272pub type Substate<'a, S> = mina_core::Substate<'a, crate::Action, State, S>;
273
274impl State {
275 pub fn new(config: Config, constants: &ConsensusConstants, now: Timestamp) -> Self {
276 Self {
277 p2p: P2p::Pending(config.p2p),
278 ledger: LedgerState::new(config.ledger),
279 snark_pool: SnarkPoolState::new(),
280 snark: SnarkState::new(config.snark),
281 transition_frontier: TransitionFrontierState::new(
282 config.transition_frontier,
283 config.archive.is_some(),
284 ),
285 external_snark_worker: ExternalSnarkWorkers::new(now),
286 block_producer: BlockProducerState::new(now, config.block_producer),
287 rpc: RpcState::new(),
288 transaction_pool: TransactionPoolState::new(config.tx_pool, constants),
289
290 watched_accounts: WatchedAccountsState::new(),
291
292 config: config.global,
293 last_action: ActionMeta::zero_custom(now),
294 applied_actions_count: 0,
295 }
296 }
297
298 pub fn last_action(&self) -> &ActionMeta {
299 &self.last_action
300 }
301
302 #[inline(always)]
306 pub fn time(&self) -> Timestamp {
307 self.last_action.time()
308 }
309
310 pub fn pseudo_rng(&self) -> StdRng {
311 crate::core::pseudo_rng(self.time())
312 }
313
314 pub fn action_applied(&mut self, action: &ActionWithMeta) {
317 self.last_action = action.meta().clone();
318 self.applied_actions_count = self.applied_actions_count.checked_add(1).expect("overflow");
319 }
320
321 pub fn genesis_block(&self) -> Option<ArcBlockWithHash> {
322 self.transition_frontier
323 .genesis
324 .block_with_real_or_dummy_proof()
325 }
326
327 fn cur_slot(&self, initial_slot: impl FnOnce(&ArcBlockWithHash) -> u32) -> Option<u32> {
328 let genesis = self.genesis_block()?;
329 let diff_ns = u64::from(self.time()).saturating_sub(u64::from(genesis.timestamp()));
330 let diff_ms = diff_ns / 1_000_000;
331 let slots = diff_ms
332 .checked_div(constraint_constants().block_window_duration_ms)
333 .expect("division by 0");
334 Some(
335 initial_slot(&genesis)
336 .checked_add(slots as u32)
337 .expect("overflow"),
338 )
339 }
340
341 pub fn cur_global_slot(&self) -> Option<u32> {
345 self.cur_slot(|b| b.global_slot())
346 }
347
348 pub fn current_slot(&self) -> Option<u32> {
349 let slots_per_epoch = self.genesis_block()?.constants().slots_per_epoch.as_u32();
350 Some(
351 self.cur_global_slot()?
352 .checked_rem(slots_per_epoch)
353 .expect("division by 0"),
354 )
355 }
356
357 pub fn cur_global_slot_since_genesis(&self) -> Option<u32> {
358 self.cur_slot(|b| b.global_slot_since_genesis())
359 }
360
361 pub fn current_epoch(&self) -> Option<u32> {
362 let slots_per_epoch = self.genesis_block()?.constants().slots_per_epoch.as_u32();
363 Some(
364 self.cur_global_slot()?
365 .checked_div(slots_per_epoch)
366 .expect("division by 0"),
367 )
368 }
369
370 pub fn slot_time(&self, global_slot: u64) -> Option<(Timestamp, Timestamp)> {
371 let genesis_timestamp = self.genesis_block()?.genesis_timestamp();
372 println!("genesis_timestamp: {}", u64::from(genesis_timestamp));
373
374 let start_time = genesis_timestamp.checked_add(
375 global_slot
376 .checked_mul(constraint_constants().block_window_duration_ms)?
377 .checked_mul(1_000_000)?,
378 )?;
379 let end_time = start_time.checked_add(
380 constraint_constants()
381 .block_window_duration_ms
382 .checked_mul(1_000_000)?,
383 )?;
384
385 Some((start_time, end_time))
386 }
387
388 pub fn producing_block_after_genesis(&self) -> bool {
389 #[allow(clippy::arithmetic_side_effects)]
390 let two_mins_in_future = self.time() + Duration::from_secs(2 * 60);
391 self.block_producer.with(false, |bp| {
392 bp.current.won_slot_should_produce(two_mins_in_future)
393 }) && self.genesis_block().is_some_and(|b| {
394 let slot = &b.consensus_state().curr_global_slot_since_hard_fork;
395 let epoch = slot
396 .slot_number
397 .as_u32()
398 .checked_div(slot.slots_per_epoch.as_u32())
399 .expect("division by 0");
400 self.current_epoch() <= Some(epoch)
401 })
402 }
403
404 pub fn prevalidate_block(
405 &self,
406 block: &ArcBlockWithHash,
407 allow_block_too_late: bool,
408 ) -> Result<(), BlockPrevalidationError> {
409 let Some((genesis, cur_global_slot)) =
410 None.or_else(|| Some((self.genesis_block()?, self.cur_global_slot()?)))
411 else {
412 bug_condition!("Tried to prevalidate a block before the genesis block was ready");
417 return Err(BlockPrevalidationError::GenesisNotReady);
418 };
419
420 prevalidate_block(block, &genesis, cur_global_slot, allow_block_too_late)
421 }
422
423 pub fn should_log_node_id(&self) -> bool {
424 self.config.testing_run
425 }
426
427 pub fn consensus_time_now(&self) -> Option<ConsensusTime> {
428 let (start_time, end_time) = self.slot_time(self.cur_global_slot()?.into())?;
429 let epoch = self.current_epoch()?;
430 let global_slot = self.cur_global_slot()?;
431 let slot = self.current_slot()?;
432 Some(ConsensusTime {
433 start_time,
434 end_time,
435 epoch,
436 global_slot,
437 slot,
438 })
439 }
440
441 pub fn consensus_time_best_tip(&self) -> Option<ConsensusTime> {
442 let best_tip = self.transition_frontier.best_tip()?;
443 let global_slot = best_tip
444 .curr_global_slot_since_hard_fork()
445 .slot_number
446 .as_u32();
447 let (start_time, end_time) = self.slot_time(global_slot.into())?;
448 let epoch = best_tip.consensus_state().epoch_count.as_u32();
449 let slot = best_tip.slot();
450 Some(ConsensusTime {
451 start_time,
452 end_time,
453 epoch,
454 global_slot,
455 slot,
456 })
457 }
458}
459
460#[serde_with::serde_as]
461#[derive(Debug, Clone, Serialize, Deserialize, MallocSizeOf)]
462pub enum P2p {
463 Pending(#[ignore_malloc_size_of = "constant"] P2pConfig),
464 Ready(P2pState),
465}
466
467#[derive(Debug, thiserror::Error)]
468pub enum P2pInitializationError {
469 #[error("p2p is already initialized")]
470 AlreadyInitialized,
471}
472
473#[macro_export]
474macro_rules! p2p_ready {
475 ($p2p:expr, $time:expr) => {
476 p2p_ready!($p2p, "", $time)
477 };
478 ($p2p:expr, $reason:expr, $time:expr) => {
479 match $p2p.ready() {
480 Some(v) => v,
481 None => {
482 mina_core::error!($time; "p2p is not initialized: {}", $reason);
484 return;
485 }
486 }
487 };
488}
489
490impl P2p {
491 pub fn config(&self) -> &P2pConfig {
492 match self {
493 P2p::Pending(config) => config,
494 P2p::Ready(p2p_state) => &p2p_state.config,
495 }
496 }
497
498 pub fn initialize(&mut self, chain_id: &ChainId) -> Result<(), P2pInitializationError> {
500 let P2p::Pending(config) = self else {
501 return Err(P2pInitializationError::AlreadyInitialized);
502 };
503
504 let callbacks = Self::p2p_callbacks();
505 *self = P2p::Ready(P2pState::new(config.clone(), callbacks, chain_id));
506 Ok(())
507 }
508
509 fn p2p_callbacks() -> P2pCallbacks {
510 P2pCallbacks {
511 on_p2p_channels_transaction_received: Some(redux::callback!(
512 on_p2p_channels_transaction_received((peer_id: PeerId, info: Box<TransactionInfo>)) -> crate::Action {
513 TransactionPoolCandidateAction::InfoReceived {
514 peer_id,
515 info: *info,
516 }
517 }
518 )),
519 on_p2p_channels_transactions_libp2p_received: Some(redux::callback!(
520 on_p2p_channels_transactions_libp2p_received((peer_id: PeerId, transactions: Vec<TransactionWithHash>, message_id: P2pNetworkPubsubMessageCacheId)) -> crate::Action {
521 TransactionPoolCandidateAction::Libp2pTransactionsReceived {
522 message_id,
523 transactions,
524 peer_id
525 }
526 }
527 )),
528 on_p2p_channels_snark_job_commitment_received: Some(redux::callback!(
529 on_p2p_channels_snark_job_commitment_received((peer_id: PeerId, commitment: Box<SnarkJobCommitment>)) -> crate::Action {
530 SnarkPoolAction::CommitmentAdd { commitment: *commitment, sender: peer_id }
531 }
532 )),
533 on_p2p_channels_snark_received: Some(redux::callback!(
534 on_p2p_channels_snark_received((peer_id: PeerId, snark: Box<SnarkInfo>)) -> crate::Action {
535 SnarkPoolCandidateAction::InfoReceived { peer_id, info: *snark }
536 }
537 )),
538 on_p2p_channels_snark_libp2p_received: Some(redux::callback!(
539 on_p2p_channels_snark_libp2p_received((peer_id: PeerId, snark: Box<Snark>)) -> crate::Action {
540 SnarkPoolCandidateAction::WorkFetchSuccess { peer_id, work: *snark }
541 }
542 )),
543 on_p2p_channels_streaming_rpc_ready: Some(redux::callback!(
544 on_p2p_channels_streaming_rpc_ready(_var: ()) -> crate::Action {
545 P2pCallbacksAction::P2pChannelsStreamingRpcReady
546 }
547 )),
548 on_p2p_channels_best_tip_request_received: Some(redux::callback!(
549 on_p2p_channels_best_tip_request_received(peer_id: PeerId) -> crate::Action {
550 P2pCallbacksAction::RpcRespondBestTip { peer_id }
551 }
552 )),
553 on_p2p_disconnection_finish: Some(redux::callback!(
554 on_p2p_disconnection_finish(peer_id: PeerId) -> crate::Action {
555 P2pCallbacksAction::P2pDisconnection { peer_id }
556 }
557 )),
558 on_p2p_connection_outgoing_error: Some(redux::callback!(
559 on_p2p_connection_outgoing_error((rpc_id: RpcId, error: P2pConnectionOutgoingError)) -> crate::Action {
560 RpcAction::P2pConnectionOutgoingError { rpc_id, error }
561 }
562 )),
563 on_p2p_connection_outgoing_success: Some(redux::callback!(
564 on_p2p_connection_outgoing_success(rpc_id: RpcId) -> crate::Action {
565 RpcAction::P2pConnectionOutgoingSuccess { rpc_id }
566 }
567 )),
568 on_p2p_connection_incoming_error: Some(redux::callback!(
569 on_p2p_connection_incoming_error((rpc_id: RpcId, error: String)) -> crate::Action {
570 RpcAction::P2pConnectionIncomingError { rpc_id, error }
571 }
572 )),
573 on_p2p_connection_incoming_success: Some(redux::callback!(
574 on_p2p_connection_incoming_success(rpc_id: RpcId) -> crate::Action {
575 RpcAction::P2pConnectionIncomingSuccess { rpc_id }
576 }
577 )),
578 on_p2p_connection_incoming_answer_ready: Some(redux::callback!(
579 on_p2p_connection_incoming_answer_ready((rpc_id: RpcId, peer_id: PeerId, answer: P2pConnectionResponse)) -> crate::Action {
580 RpcAction::P2pConnectionIncomingAnswerReady { rpc_id, answer, peer_id }
581 }
582 )),
583 on_p2p_peer_best_tip_update: Some(redux::callback!(
584 on_p2p_peer_best_tip_update(best_tip: BlockWithHash<Arc<v2::MinaBlockBlockStableV2>>) -> crate::Action {
585 TransitionFrontierCandidateAction::P2pBestTipUpdate { best_tip }
586 }
587 )),
588 on_p2p_channels_rpc_ready: Some(redux::callback!(
589 on_p2p_channels_rpc_ready(peer_id: PeerId) -> crate::Action {
590 P2pCallbacksAction::P2pChannelsRpcReady { peer_id }
591 }
592 )),
593 on_p2p_channels_rpc_timeout: Some(redux::callback!(
594 on_p2p_channels_rpc_timeout((peer_id: PeerId, id: P2pRpcId)) -> crate::Action {
595 P2pCallbacksAction::P2pChannelsRpcTimeout { peer_id, id }
596 }
597 )),
598 on_p2p_channels_rpc_response_received: Some(redux::callback!(
599 on_p2p_channels_rpc_response_received((peer_id: PeerId, id: P2pRpcId, response: Option<Box<P2pRpcResponse>>)) -> crate::Action {
600 P2pCallbacksAction::P2pChannelsRpcResponseReceived { peer_id, id, response }
601 }
602 )),
603 on_p2p_channels_rpc_request_received: Some(redux::callback!(
604 on_p2p_channels_rpc_request_received((peer_id: PeerId, id: P2pRpcId, request: Box<P2pRpcRequest>)) -> crate::Action {
605 P2pCallbacksAction::P2pChannelsRpcRequestReceived { peer_id, id, request }
606 }
607 )),
608 on_p2p_channels_streaming_rpc_response_received: Some(redux::callback!(
609 on_p2p_channels_streaming_rpc_response_received((peer_id: PeerId, id: P2pRpcId, response: Option<P2pStreamingRpcResponseFull>)) -> crate::Action {
610 P2pCallbacksAction::P2pChannelsStreamingRpcResponseReceived { peer_id, id, response }
611 }
612 )),
613 on_p2p_channels_streaming_rpc_timeout: Some(redux::callback!(
614 on_p2p_channels_streaming_rpc_timeout((peer_id: PeerId, id: P2pRpcId)) -> crate::Action {
615 P2pCallbacksAction::P2pChannelsStreamingRpcTimeout { peer_id, id }
616 }
617 )),
618 on_p2p_pubsub_message_received: Some(redux::callback!(
619 on_p2p_pubsub_message_received((message_id: P2pNetworkPubsubMessageCacheId)) -> crate::Action{
620 P2pCallbacksAction::P2pPubsubValidateMessage { message_id }
621 }
622 )),
623 }
624 }
625
626 pub fn ready(&self) -> Option<&P2pState> {
627 if let P2p::Ready(state) = self {
628 Some(state)
629 } else {
630 None
631 }
632 }
633
634 pub fn ready_mut(&mut self) -> Option<&mut P2pState> {
635 if let P2p::Ready(state) = self {
636 Some(state)
637 } else {
638 None
639 }
640 }
641
642 pub fn unwrap(&self) -> &P2pState {
643 self.ready().expect("p2p is not initialized")
644 }
645
646 pub fn is_enabled<T>(&self, action: &T, time: Timestamp) -> bool
647 where
648 T: EnablingCondition<P2pState>,
649 {
650 match self {
651 P2p::Pending(_) => false,
652 P2p::Ready(p2p_state) => action.is_enabled(p2p_state, time),
653 }
654 }
655
656 pub fn my_id(&self) -> PeerId {
657 match self {
658 P2p::Pending(config) => &config.identity_pub_key,
659 P2p::Ready(state) => &state.config.identity_pub_key,
660 }
661 .peer_id()
662 }
663
664 pub fn get_peer(&self, peer_id: &PeerId) -> Option<&P2pPeerState> {
665 self.ready().and_then(|p2p| p2p.peers.get(peer_id))
666 }
667
668 pub fn get_ready_peer(&self, peer_id: &PeerId) -> Option<&P2pPeerStatusReady> {
669 self.ready().and_then(|p2p| p2p.get_ready_peer(peer_id))
670 }
671
672 pub fn ready_peers(&self) -> Vec<PeerId> {
673 self.ready_peers_iter()
674 .map(|(peer_id, _)| *peer_id)
675 .collect()
676 }
677
678 pub fn ready_peers_iter(&self) -> ReadyPeersIter<'_> {
679 ReadyPeersIter::new(self)
680 }
681}
682
683#[derive(Debug, Clone)]
684pub struct ReadyPeersIter<'a>(Option<std::collections::btree_map::Iter<'a, PeerId, P2pPeerState>>);
685
686impl<'a> ReadyPeersIter<'a> {
687 fn new(p2p: &'a P2p) -> Self {
688 ReadyPeersIter(p2p.ready().map(|p2p| p2p.peers.iter()))
689 }
690}
691
692impl<'a> Iterator for ReadyPeersIter<'a> {
693 type Item = (&'a PeerId, &'a P2pPeerStatusReady);
694
695 fn next(&mut self) -> Option<Self::Item> {
696 let iter = self.0.as_mut()?;
697 Some(loop {
698 let (peer_id, state) = iter.next()?;
699 if let Some(ready) = state.status.as_ready() {
700 break (peer_id, ready);
701 }
702 })
703 }
704}