1use std::{sync::Arc, time::Duration};
2
3use malloc_size_of_derive::MallocSizeOf;
4use mina_p2p_messages::v2;
5use openmina_core::{
6 block::prevalidate::{prevalidate_block, BlockPrevalidationError},
7 consensus::ConsensusTime,
8 transaction::{TransactionInfo, TransactionWithHash},
9};
10use p2p::P2pNetworkPubsubMessageCacheId;
11use rand::prelude::*;
12
13use openmina_core::{
14 block::{ArcBlockWithHash, BlockWithHash},
15 consensus::ConsensusConstants,
16 constants::constraint_constants,
17 error,
18 requests::RpcId,
19 snark::{Snark, SnarkInfo, SnarkJobCommitment},
20 ChainId,
21};
22use p2p::{
23 bootstrap::P2pNetworkKadBootstrapState,
24 channels::{
25 rpc::{P2pRpcId, P2pRpcRequest, P2pRpcResponse},
26 streaming_rpc::P2pStreamingRpcResponseFull,
27 },
28 connection::{outgoing::P2pConnectionOutgoingError, P2pConnectionResponse},
29 network::identify::P2pNetworkIdentifyState,
30 P2pCallbacks, P2pConfig, P2pNetworkSchedulerState, P2pPeerState, P2pPeerStatusReady, PeerId,
31};
32use redux::{ActionMeta, EnablingCondition, Timestamp};
33use serde::{Deserialize, Serialize};
34use snark::{
35 block_verify::SnarkBlockVerifyState, user_command_verify::SnarkUserCommandVerifyState,
36 work_verify::SnarkWorkVerifyState,
37};
38
39use crate::{
40 block_producer::vrf_evaluator::BlockProducerVrfEvaluatorState,
41 config::GlobalConfig,
42 external_snark_worker::{ExternalSnarkWorker, ExternalSnarkWorkers},
43 ledger::{read::LedgerReadState, write::LedgerWriteState},
44 p2p::callbacks::P2pCallbacksAction,
45 snark_pool::candidate::SnarkPoolCandidateAction,
46 transaction_pool::{
47 candidate::{TransactionPoolCandidateAction, TransactionPoolCandidatesState},
48 TransactionPoolState,
49 },
50 transition_frontier::{
51 candidate::TransitionFrontierCandidateAction,
52 genesis::TransitionFrontierGenesisState,
53 sync::{
54 ledger::{
55 snarked::TransitionFrontierSyncLedgerSnarkedState,
56 staged::TransitionFrontierSyncLedgerStagedState, TransitionFrontierSyncLedgerState,
57 },
58 TransitionFrontierSyncState,
59 },
60 },
61 ActionWithMeta, RpcAction, SnarkPoolAction,
62};
63pub use crate::{
64 block_producer::BlockProducerState,
65 ledger::LedgerState,
66 p2p::P2pState,
67 rpc::RpcState,
68 snark::SnarkState,
69 snark_pool::{candidate::SnarkPoolCandidatesState, SnarkPoolState},
70 transition_frontier::{candidate::TransitionFrontierCandidatesState, TransitionFrontierState},
71 watched_accounts::WatchedAccountsState,
72 Config,
73};
74
75#[derive(Serialize, Deserialize, Debug, Clone)]
76pub struct State {
77 pub config: GlobalConfig,
78
79 pub p2p: P2p,
80 pub ledger: LedgerState,
81 pub snark: SnarkState,
82 pub transition_frontier: TransitionFrontierState,
83 pub snark_pool: SnarkPoolState,
84 pub external_snark_worker: ExternalSnarkWorkers,
85 pub transaction_pool: TransactionPoolState,
86 pub block_producer: BlockProducerState,
87 pub rpc: RpcState,
88
89 pub watched_accounts: WatchedAccountsState,
90
91 last_action: ActionMeta,
93 applied_actions_count: u64,
94}
95
96use openmina_core::{bug_condition, impl_substate_access, SubstateAccess};
98
99impl_substate_access!(State, SnarkState, snark);
100impl_substate_access!(State, SnarkBlockVerifyState, snark.block_verify);
101impl_substate_access!(State, SnarkWorkVerifyState, snark.work_verify);
102impl_substate_access!(
103 State,
104 SnarkUserCommandVerifyState,
105 snark.user_command_verify
106);
107impl_substate_access!(State, TransitionFrontierState, transition_frontier);
108impl_substate_access!(
109 State,
110 TransitionFrontierCandidatesState,
111 transition_frontier.candidates
112);
113impl_substate_access!(State, TransactionPoolState, transaction_pool);
114impl_substate_access!(
115 State,
116 TransactionPoolCandidatesState,
117 transaction_pool.candidates
118);
119impl_substate_access!(
120 State,
121 TransitionFrontierGenesisState,
122 transition_frontier.genesis
123);
124impl_substate_access!(State, TransitionFrontierSyncState, transition_frontier.sync);
125impl_substate_access!(State, SnarkPoolState, snark_pool);
126impl_substate_access!(State, SnarkPoolCandidatesState, snark_pool.candidates);
127impl_substate_access!(State, ExternalSnarkWorkers, external_snark_worker);
128impl_substate_access!(State, BlockProducerState, block_producer);
129impl_substate_access!(State, RpcState, rpc);
130impl_substate_access!(State, WatchedAccountsState, watched_accounts);
131impl_substate_access!(State, ExternalSnarkWorker, external_snark_worker.0);
132impl_substate_access!(State, LedgerState, ledger);
133impl_substate_access!(State, LedgerReadState, ledger.read);
134impl_substate_access!(State, LedgerWriteState, ledger.write);
135
136impl openmina_core::SubstateAccess<P2pState> for State {
137 fn substate(&self) -> openmina_core::SubstateResult<&P2pState> {
138 self.p2p
139 .ready()
140 .ok_or_else(|| "P2P state unavailable. P2P layer is not ready".to_owned())
141 }
142
143 fn substate_mut(&mut self) -> openmina_core::SubstateResult<&mut P2pState> {
144 self.p2p
145 .ready_mut()
146 .ok_or_else(|| "P2P state unavailable. P2P layer is not ready".to_owned())
147 }
148}
149
150impl openmina_core::SubstateAccess<TransitionFrontierSyncLedgerState> for State {
151 fn substate(&self) -> openmina_core::SubstateResult<&TransitionFrontierSyncLedgerState> {
152 self.transition_frontier
153 .sync
154 .ledger()
155 .ok_or_else(|| "Ledger sync state unavailable".to_owned())
156 }
157
158 fn substate_mut(
159 &mut self,
160 ) -> openmina_core::SubstateResult<&mut TransitionFrontierSyncLedgerState> {
161 self.transition_frontier
162 .sync
163 .ledger_mut()
164 .ok_or_else(|| "Ledger sync state unavailable".to_owned())
165 }
166}
167
168impl SubstateAccess<BlockProducerVrfEvaluatorState> for State {
169 fn substate(&self) -> openmina_core::SubstateResult<&BlockProducerVrfEvaluatorState> {
170 self.block_producer
171 .as_ref()
172 .map(|state| &state.vrf_evaluator)
173 .ok_or_else(|| "Block producer VRF evaluator state unavailable".to_owned())
174 }
175
176 fn substate_mut(
177 &mut self,
178 ) -> openmina_core::SubstateResult<&mut BlockProducerVrfEvaluatorState> {
179 self.block_producer
180 .as_mut()
181 .map(|state| &mut state.vrf_evaluator)
182 .ok_or_else(|| "Block producer VRF evaluator state unavailable".to_owned())
183 }
184}
185
186impl openmina_core::SubstateAccess<TransitionFrontierSyncLedgerSnarkedState> for State {
187 fn substate(&self) -> openmina_core::SubstateResult<&TransitionFrontierSyncLedgerSnarkedState> {
188 self.transition_frontier
189 .sync
190 .ledger()
191 .ok_or_else(|| {
192 "Snarked ledger state unavailable. Ledger sync state unavailable".to_owned()
193 })?
194 .snarked()
195 .ok_or_else(|| "Snarked ledger state unavailable".to_owned())
196 }
197
198 fn substate_mut(
199 &mut self,
200 ) -> openmina_core::SubstateResult<&mut TransitionFrontierSyncLedgerSnarkedState> {
201 self.transition_frontier
202 .sync
203 .ledger_mut()
204 .ok_or_else(|| {
205 "Snarked ledger state unavailable. Ledger sync state unavailable".to_owned()
206 })?
207 .snarked_mut()
208 .ok_or_else(|| "Snarked ledger state unavailable".to_owned())
209 }
210}
211
212impl openmina_core::SubstateAccess<TransitionFrontierSyncLedgerStagedState> for State {
213 fn substate(&self) -> openmina_core::SubstateResult<&TransitionFrontierSyncLedgerStagedState> {
214 self.transition_frontier
215 .sync
216 .ledger()
217 .ok_or_else(|| {
218 "Staged ledger state unavailable. Ledger sync state unavailable".to_owned()
219 })?
220 .staged()
221 .ok_or_else(|| "Staged ledger state unavailable".to_owned())
222 }
223
224 fn substate_mut(
225 &mut self,
226 ) -> openmina_core::SubstateResult<&mut TransitionFrontierSyncLedgerStagedState> {
227 self.transition_frontier
228 .sync
229 .ledger_mut()
230 .ok_or_else(|| {
231 "Staged ledger state unavailable. Ledger sync state unavailable".to_owned()
232 })?
233 .staged_mut()
234 .ok_or_else(|| "Staged ledger state unavailable".to_owned())
235 }
236}
237
238impl SubstateAccess<State> for State {
239 fn substate(&self) -> openmina_core::SubstateResult<&State> {
240 Ok(self)
241 }
242
243 fn substate_mut(&mut self) -> openmina_core::SubstateResult<&mut State> {
244 Ok(self)
245 }
246}
247
248macro_rules! impl_p2p_state_access {
249 ($state:ty, $substate_type:ty) => {
250 impl openmina_core::SubstateAccess<$substate_type> for $state {
251 fn substate(&self) -> openmina_core::SubstateResult<&$substate_type> {
252 let substate: &P2pState = self.substate()?;
253 substate.substate()
254 }
255
256 fn substate_mut(&mut self) -> openmina_core::SubstateResult<&mut $substate_type> {
257 let substate: &mut P2pState = self.substate_mut()?;
258 substate.substate_mut()
259 }
260 }
261 };
262}
263
264impl_p2p_state_access!(State, P2pNetworkIdentifyState);
265impl_p2p_state_access!(State, p2p::P2pNetworkState);
266impl_p2p_state_access!(State, P2pNetworkKadBootstrapState);
267impl_p2p_state_access!(State, p2p::P2pNetworkKadState);
268impl_p2p_state_access!(State, P2pNetworkSchedulerState);
269impl_p2p_state_access!(State, p2p::P2pLimits);
270impl_p2p_state_access!(State, p2p::P2pNetworkPubsubState);
271impl_p2p_state_access!(State, p2p::P2pConfig);
272
273impl p2p::P2pStateTrait for State {}
274
275pub type Substate<'a, S> = openmina_core::Substate<'a, crate::Action, State, S>;
276
277impl State {
278 pub fn new(config: Config, constants: &ConsensusConstants, now: Timestamp) -> Self {
279 Self {
280 p2p: P2p::Pending(config.p2p),
281 ledger: LedgerState::new(config.ledger),
282 snark_pool: SnarkPoolState::new(),
283 snark: SnarkState::new(config.snark),
284 transition_frontier: TransitionFrontierState::new(
285 config.transition_frontier,
286 config.archive.is_some(),
287 ),
288 external_snark_worker: ExternalSnarkWorkers::new(now),
289 block_producer: BlockProducerState::new(now, config.block_producer),
290 rpc: RpcState::new(),
291 transaction_pool: TransactionPoolState::new(config.tx_pool, constants),
292
293 watched_accounts: WatchedAccountsState::new(),
294
295 config: config.global,
296 last_action: ActionMeta::zero_custom(now),
297 applied_actions_count: 0,
298 }
299 }
300
301 pub fn last_action(&self) -> &ActionMeta {
302 &self.last_action
303 }
304
305 #[inline(always)]
309 pub fn time(&self) -> Timestamp {
310 self.last_action.time()
311 }
312
313 pub fn pseudo_rng(&self) -> StdRng {
314 crate::core::pseudo_rng(self.time())
315 }
316
317 pub fn action_applied(&mut self, action: &ActionWithMeta) {
320 self.last_action = action.meta().clone();
321 self.applied_actions_count = self.applied_actions_count.checked_add(1).expect("overflow");
322 }
323
324 pub fn genesis_block(&self) -> Option<ArcBlockWithHash> {
325 self.transition_frontier
326 .genesis
327 .block_with_real_or_dummy_proof()
328 }
329
330 fn cur_slot(&self, initial_slot: impl FnOnce(&ArcBlockWithHash) -> u32) -> Option<u32> {
331 let genesis = self.genesis_block()?;
332 let diff_ns = u64::from(self.time()).saturating_sub(u64::from(genesis.timestamp()));
333 let diff_ms = diff_ns / 1_000_000;
334 let slots = diff_ms
335 .checked_div(constraint_constants().block_window_duration_ms)
336 .expect("division by 0");
337 Some(
338 initial_slot(&genesis)
339 .checked_add(slots as u32)
340 .expect("overflow"),
341 )
342 }
343
344 pub fn cur_global_slot(&self) -> Option<u32> {
348 self.cur_slot(|b| b.global_slot())
349 }
350
351 pub fn current_slot(&self) -> Option<u32> {
352 let slots_per_epoch = self.genesis_block()?.constants().slots_per_epoch.as_u32();
353 Some(
354 self.cur_global_slot()?
355 .checked_rem(slots_per_epoch)
356 .expect("division by 0"),
357 )
358 }
359
360 pub fn cur_global_slot_since_genesis(&self) -> Option<u32> {
361 self.cur_slot(|b| b.global_slot_since_genesis())
362 }
363
364 pub fn current_epoch(&self) -> Option<u32> {
365 let slots_per_epoch = self.genesis_block()?.constants().slots_per_epoch.as_u32();
366 Some(
367 self.cur_global_slot()?
368 .checked_div(slots_per_epoch)
369 .expect("division by 0"),
370 )
371 }
372
373 pub fn slot_time(&self, global_slot: u64) -> Option<(Timestamp, Timestamp)> {
374 let genesis_timestamp = self.genesis_block()?.genesis_timestamp();
375 println!("genesis_timestamp: {}", u64::from(genesis_timestamp));
376
377 let start_time = genesis_timestamp.checked_add(
378 global_slot
379 .checked_mul(constraint_constants().block_window_duration_ms)?
380 .checked_mul(1_000_000)?,
381 )?;
382 let end_time = start_time.checked_add(
383 constraint_constants()
384 .block_window_duration_ms
385 .checked_mul(1_000_000)?,
386 )?;
387
388 Some((start_time, end_time))
389 }
390
391 pub fn producing_block_after_genesis(&self) -> bool {
392 #[allow(clippy::arithmetic_side_effects)]
393 let two_mins_in_future = self.time() + Duration::from_secs(2 * 60);
394 self.block_producer.with(false, |bp| {
395 bp.current.won_slot_should_produce(two_mins_in_future)
396 }) && self.genesis_block().is_some_and(|b| {
397 let slot = &b.consensus_state().curr_global_slot_since_hard_fork;
398 let epoch = slot
399 .slot_number
400 .as_u32()
401 .checked_div(slot.slots_per_epoch.as_u32())
402 .expect("division by 0");
403 self.current_epoch() <= Some(epoch)
404 })
405 }
406
407 pub fn prevalidate_block(
408 &self,
409 block: &ArcBlockWithHash,
410 allow_block_too_late: bool,
411 ) -> Result<(), BlockPrevalidationError> {
412 let Some((genesis, cur_global_slot)) =
413 None.or_else(|| Some((self.genesis_block()?, self.cur_global_slot()?)))
414 else {
415 bug_condition!("Tried to prevalidate a block before the genesis block was ready");
420 return Err(BlockPrevalidationError::GenesisNotReady);
421 };
422
423 prevalidate_block(block, &genesis, cur_global_slot, allow_block_too_late)
424 }
425
426 pub fn should_log_node_id(&self) -> bool {
427 self.config.testing_run
428 }
429
430 pub fn consensus_time_now(&self) -> Option<ConsensusTime> {
431 let (start_time, end_time) = self.slot_time(self.cur_global_slot()?.into())?;
432 let epoch = self.current_epoch()?;
433 let global_slot = self.cur_global_slot()?;
434 let slot = self.current_slot()?;
435 Some(ConsensusTime {
436 start_time,
437 end_time,
438 epoch,
439 global_slot,
440 slot,
441 })
442 }
443
444 pub fn consensus_time_best_tip(&self) -> Option<ConsensusTime> {
445 let best_tip = self.transition_frontier.best_tip()?;
446 let global_slot = best_tip
447 .curr_global_slot_since_hard_fork()
448 .slot_number
449 .as_u32();
450 let (start_time, end_time) = self.slot_time(global_slot.into())?;
451 let epoch = best_tip.consensus_state().epoch_count.as_u32();
452 let slot = best_tip.slot();
453 Some(ConsensusTime {
454 start_time,
455 end_time,
456 epoch,
457 global_slot,
458 slot,
459 })
460 }
461}
462
463#[serde_with::serde_as]
464#[derive(Debug, Clone, Serialize, Deserialize, MallocSizeOf)]
465pub enum P2p {
466 Pending(#[ignore_malloc_size_of = "constant"] P2pConfig),
467 Ready(P2pState),
468}
469
470#[derive(Debug, thiserror::Error)]
471pub enum P2pInitializationError {
472 #[error("p2p is already initialized")]
473 AlreadyInitialized,
474}
475
476#[macro_export]
477macro_rules! p2p_ready {
478 ($p2p:expr, $time:expr) => {
479 p2p_ready!($p2p, "", $time)
480 };
481 ($p2p:expr, $reason:expr, $time:expr) => {
482 match $p2p.ready() {
483 Some(v) => v,
484 None => {
485 openmina_core::error!($time; "p2p is not initialized: {}", $reason);
487 return;
488 }
489 }
490 };
491}
492
493impl P2p {
494 pub fn config(&self) -> &P2pConfig {
495 match self {
496 P2p::Pending(config) => config,
497 P2p::Ready(p2p_state) => &p2p_state.config,
498 }
499 }
500
501 pub fn initialize(&mut self, chain_id: &ChainId) -> Result<(), P2pInitializationError> {
503 let P2p::Pending(config) = self else {
504 return Err(P2pInitializationError::AlreadyInitialized);
505 };
506
507 let callbacks = Self::p2p_callbacks();
508 *self = P2p::Ready(P2pState::new(config.clone(), callbacks, chain_id));
509 Ok(())
510 }
511
512 fn p2p_callbacks() -> P2pCallbacks {
513 P2pCallbacks {
514 on_p2p_channels_transaction_received: Some(redux::callback!(
515 on_p2p_channels_transaction_received((peer_id: PeerId, info: Box<TransactionInfo>)) -> crate::Action {
516 TransactionPoolCandidateAction::InfoReceived {
517 peer_id,
518 info: *info,
519 }
520 }
521 )),
522 on_p2p_channels_transactions_libp2p_received: Some(redux::callback!(
523 on_p2p_channels_transactions_libp2p_received((peer_id: PeerId, transactions: Vec<TransactionWithHash>, message_id: P2pNetworkPubsubMessageCacheId)) -> crate::Action {
524 TransactionPoolCandidateAction::Libp2pTransactionsReceived {
525 message_id,
526 transactions,
527 peer_id
528 }
529 }
530 )),
531 on_p2p_channels_snark_job_commitment_received: Some(redux::callback!(
532 on_p2p_channels_snark_job_commitment_received((peer_id: PeerId, commitment: Box<SnarkJobCommitment>)) -> crate::Action {
533 SnarkPoolAction::CommitmentAdd { commitment: *commitment, sender: peer_id }
534 }
535 )),
536 on_p2p_channels_snark_received: Some(redux::callback!(
537 on_p2p_channels_snark_received((peer_id: PeerId, snark: Box<SnarkInfo>)) -> crate::Action {
538 SnarkPoolCandidateAction::InfoReceived { peer_id, info: *snark }
539 }
540 )),
541 on_p2p_channels_snark_libp2p_received: Some(redux::callback!(
542 on_p2p_channels_snark_libp2p_received((peer_id: PeerId, snark: Box<Snark>)) -> crate::Action {
543 SnarkPoolCandidateAction::WorkFetchSuccess { peer_id, work: *snark }
544 }
545 )),
546 on_p2p_channels_streaming_rpc_ready: Some(redux::callback!(
547 on_p2p_channels_streaming_rpc_ready(_var: ()) -> crate::Action {
548 P2pCallbacksAction::P2pChannelsStreamingRpcReady
549 }
550 )),
551 on_p2p_channels_best_tip_request_received: Some(redux::callback!(
552 on_p2p_channels_best_tip_request_received(peer_id: PeerId) -> crate::Action {
553 P2pCallbacksAction::RpcRespondBestTip { peer_id }
554 }
555 )),
556 on_p2p_disconnection_finish: Some(redux::callback!(
557 on_p2p_disconnection_finish(peer_id: PeerId) -> crate::Action {
558 P2pCallbacksAction::P2pDisconnection { peer_id }
559 }
560 )),
561 on_p2p_connection_outgoing_error: Some(redux::callback!(
562 on_p2p_connection_outgoing_error((rpc_id: RpcId, error: P2pConnectionOutgoingError)) -> crate::Action {
563 RpcAction::P2pConnectionOutgoingError { rpc_id, error }
564 }
565 )),
566 on_p2p_connection_outgoing_success: Some(redux::callback!(
567 on_p2p_connection_outgoing_success(rpc_id: RpcId) -> crate::Action {
568 RpcAction::P2pConnectionOutgoingSuccess { rpc_id }
569 }
570 )),
571 on_p2p_connection_incoming_error: Some(redux::callback!(
572 on_p2p_connection_incoming_error((rpc_id: RpcId, error: String)) -> crate::Action {
573 RpcAction::P2pConnectionIncomingError { rpc_id, error }
574 }
575 )),
576 on_p2p_connection_incoming_success: Some(redux::callback!(
577 on_p2p_connection_incoming_success(rpc_id: RpcId) -> crate::Action {
578 RpcAction::P2pConnectionIncomingSuccess { rpc_id }
579 }
580 )),
581 on_p2p_connection_incoming_answer_ready: Some(redux::callback!(
582 on_p2p_connection_incoming_answer_ready((rpc_id: RpcId, peer_id: PeerId, answer: P2pConnectionResponse)) -> crate::Action {
583 RpcAction::P2pConnectionIncomingAnswerReady { rpc_id, answer, peer_id }
584 }
585 )),
586 on_p2p_peer_best_tip_update: Some(redux::callback!(
587 on_p2p_peer_best_tip_update(best_tip: BlockWithHash<Arc<v2::MinaBlockBlockStableV2>>) -> crate::Action {
588 TransitionFrontierCandidateAction::P2pBestTipUpdate { best_tip }
589 }
590 )),
591 on_p2p_channels_rpc_ready: Some(redux::callback!(
592 on_p2p_channels_rpc_ready(peer_id: PeerId) -> crate::Action {
593 P2pCallbacksAction::P2pChannelsRpcReady { peer_id }
594 }
595 )),
596 on_p2p_channels_rpc_timeout: Some(redux::callback!(
597 on_p2p_channels_rpc_timeout((peer_id: PeerId, id: P2pRpcId)) -> crate::Action {
598 P2pCallbacksAction::P2pChannelsRpcTimeout { peer_id, id }
599 }
600 )),
601 on_p2p_channels_rpc_response_received: Some(redux::callback!(
602 on_p2p_channels_rpc_response_received((peer_id: PeerId, id: P2pRpcId, response: Option<Box<P2pRpcResponse>>)) -> crate::Action {
603 P2pCallbacksAction::P2pChannelsRpcResponseReceived { peer_id, id, response }
604 }
605 )),
606 on_p2p_channels_rpc_request_received: Some(redux::callback!(
607 on_p2p_channels_rpc_request_received((peer_id: PeerId, id: P2pRpcId, request: Box<P2pRpcRequest>)) -> crate::Action {
608 P2pCallbacksAction::P2pChannelsRpcRequestReceived { peer_id, id, request }
609 }
610 )),
611 on_p2p_channels_streaming_rpc_response_received: Some(redux::callback!(
612 on_p2p_channels_streaming_rpc_response_received((peer_id: PeerId, id: P2pRpcId, response: Option<P2pStreamingRpcResponseFull>)) -> crate::Action {
613 P2pCallbacksAction::P2pChannelsStreamingRpcResponseReceived { peer_id, id, response }
614 }
615 )),
616 on_p2p_channels_streaming_rpc_timeout: Some(redux::callback!(
617 on_p2p_channels_streaming_rpc_timeout((peer_id: PeerId, id: P2pRpcId)) -> crate::Action {
618 P2pCallbacksAction::P2pChannelsStreamingRpcTimeout { peer_id, id }
619 }
620 )),
621 on_p2p_pubsub_message_received: Some(redux::callback!(
622 on_p2p_pubsub_message_received((message_id: P2pNetworkPubsubMessageCacheId)) -> crate::Action{
623 P2pCallbacksAction::P2pPubsubValidateMessage { message_id }
624 }
625 )),
626 }
627 }
628
629 pub fn ready(&self) -> Option<&P2pState> {
630 if let P2p::Ready(state) = self {
631 Some(state)
632 } else {
633 None
634 }
635 }
636
637 pub fn ready_mut(&mut self) -> Option<&mut P2pState> {
638 if let P2p::Ready(state) = self {
639 Some(state)
640 } else {
641 None
642 }
643 }
644
645 pub fn unwrap(&self) -> &P2pState {
646 self.ready().expect("p2p is not initialized")
647 }
648
649 pub fn is_enabled<T>(&self, action: &T, time: Timestamp) -> bool
650 where
651 T: EnablingCondition<P2pState>,
652 {
653 match self {
654 P2p::Pending(_) => false,
655 P2p::Ready(p2p_state) => action.is_enabled(p2p_state, time),
656 }
657 }
658
659 pub fn my_id(&self) -> PeerId {
660 match self {
661 P2p::Pending(config) => &config.identity_pub_key,
662 P2p::Ready(state) => &state.config.identity_pub_key,
663 }
664 .peer_id()
665 }
666
667 pub fn get_peer(&self, peer_id: &PeerId) -> Option<&P2pPeerState> {
668 self.ready().and_then(|p2p| p2p.peers.get(peer_id))
669 }
670
671 pub fn get_ready_peer(&self, peer_id: &PeerId) -> Option<&P2pPeerStatusReady> {
672 self.ready().and_then(|p2p| p2p.get_ready_peer(peer_id))
673 }
674
675 pub fn ready_peers(&self) -> Vec<PeerId> {
676 self.ready_peers_iter()
677 .map(|(peer_id, _)| *peer_id)
678 .collect()
679 }
680
681 pub fn ready_peers_iter(&self) -> ReadyPeersIter<'_> {
682 ReadyPeersIter::new(self)
683 }
684}
685
686#[derive(Debug, Clone)]
687pub struct ReadyPeersIter<'a>(Option<std::collections::btree_map::Iter<'a, PeerId, P2pPeerState>>);
688
689impl<'a> ReadyPeersIter<'a> {
690 fn new(p2p: &'a P2p) -> Self {
691 ReadyPeersIter(p2p.ready().map(|p2p| p2p.peers.iter()))
692 }
693}
694
695impl<'a> Iterator for ReadyPeersIter<'a> {
696 type Item = (&'a PeerId, &'a P2pPeerStatusReady);
697
698 fn next(&mut self) -> Option<Self::Item> {
699 let iter = self.0.as_mut()?;
700 Some(loop {
701 let (peer_id, state) = iter.next()?;
702 if let Some(ready) = state.status.as_ready() {
703 break (peer_id, ready);
704 }
705 })
706 }
707}