1use std::{
2 collections::{BTreeMap, BTreeSet},
3 sync::Arc,
4 time::Duration,
5};
6
7use openmina_core::{
8 block::{ArcBlockWithHash, BlockWithHash},
9 impl_substate_access,
10 requests::RpcId,
11 snark::{Snark, SnarkInfo, SnarkJobCommitment},
12 transaction::{TransactionInfo, TransactionWithHash},
13 ChainId, SubstateAccess,
14};
15
16use malloc_size_of_derive::MallocSizeOf;
17use redux::{Callback, Timestamp};
18use serde::{Deserialize, Serialize};
19
20use crate::{
21 bootstrap::P2pNetworkKadBootstrapState,
22 channels::{
23 rpc::{P2pRpcId, P2pRpcRequest, P2pRpcResponse},
24 streaming_rpc::{P2pStreamingRpcId, P2pStreamingRpcResponseFull},
25 ChannelId, P2pChannelsState,
26 },
27 connection::{
28 incoming::P2pConnectionIncomingState,
29 outgoing::{
30 P2pConnectionOutgoingError, P2pConnectionOutgoingInitOpts, P2pConnectionOutgoingState,
31 },
32 P2pConnectionResponse, P2pConnectionState,
33 },
34 is_time_passed,
35 network::{
36 identify::{P2pNetworkIdentify, P2pNetworkIdentifyState},
37 P2pNetworkState,
38 },
39 Limit, P2pConfig, P2pLimits, P2pNetworkKadState, P2pNetworkPubsubMessageCacheId,
40 P2pNetworkPubsubState, P2pNetworkSchedulerState, P2pTimeouts, PeerId,
41};
42use mina_p2p_messages::v2;
43
44#[derive(Serialize, Deserialize, Debug, Clone)]
45pub struct P2pState {
46 pub chain_id: ChainId,
47 pub config: P2pConfig,
48 pub network: P2pNetworkState,
49 pub peers: BTreeMap<PeerId, P2pPeerState>,
50
51 pub last_random_disconnection_try: redux::Timestamp,
52
53 pub callbacks: P2pCallbacks,
54}
55
56impl P2pState {
57 pub fn new(config: P2pConfig, callbacks: P2pCallbacks, chain_id: &ChainId) -> Self {
58 let addrs = if cfg!(feature = "p2p-libp2p") {
59 config
60 .libp2p_port
61 .map(|port| multiaddr::multiaddr!(Ip4([127, 0, 0, 1]), Tcp((port))))
62 .into_iter()
63 .collect()
64 } else {
65 Vec::new()
66 };
67
68 let my_id = config.identity_pub_key.peer_id();
69
70 #[cfg(not(target_arch = "wasm32"))]
71 {
72 let peer_id_str = my_id.to_libp2p_string();
73
74 openmina_core::log::info!(
75 openmina_core::log::system_time();
76 kind = "P2pState new",
77 summary = format!("Current node's id: {peer_id_str}"),
78 peer_id_str = peer_id_str,
79 );
80 }
81
82 let known_peers = if cfg!(feature = "p2p-libp2p") {
83 config
84 .initial_peers
85 .iter()
86 .filter_map(|peer| {
87 if let P2pConnectionOutgoingInitOpts::LibP2P(peer) = peer {
88 Some(peer.into())
89 } else {
90 None
91 }
92 })
93 .collect()
94 } else {
95 Vec::new()
96 };
97
98 let network = P2pNetworkState::new(
99 config.identity_pub_key.clone(),
100 addrs,
101 known_peers,
102 chain_id,
103 config.peer_discovery,
104 );
105 Self {
106 chain_id: chain_id.clone(),
107 config,
108 network,
109 peers: Default::default(),
110
111 last_random_disconnection_try: redux::Timestamp::ZERO,
112
113 callbacks,
114 }
115 }
116
117 pub fn my_id(&self) -> PeerId {
118 self.config.identity_pub_key.peer_id()
119 }
120
121 pub fn peer_connection_rpc_id(&self, peer_id: &PeerId) -> Option<RpcId> {
122 self.peers.get(peer_id)?.connection_rpc_id()
123 }
124
125 pub fn get_ready_peer(&self, peer_id: &PeerId) -> Option<&P2pPeerStatusReady> {
128 self.peers.get(peer_id)?.status.as_ready()
129 }
130
131 pub fn get_ready_peer_mut(&mut self, peer_id: &PeerId) -> Option<&mut P2pPeerStatusReady> {
134 self.peers.get_mut(peer_id)?.status.as_ready_mut()
135 }
136
137 pub fn any_ready_peers(&self) -> bool {
138 self.peers
139 .iter()
140 .any(|(_, p)| p.status.as_ready().is_some())
141 }
142
143 pub fn disconnected_peers(&self) -> impl '_ + Iterator<Item = P2pConnectionOutgoingInitOpts> {
144 self.peers.iter().filter_map(|(_, state)| {
145 if let P2pPeerState {
146 status: P2pPeerStatus::Disconnected { .. },
147 dial_opts: Some(opts),
148 ..
149 } = state
150 {
151 Some(opts.clone())
152 } else {
153 None
154 }
155 })
156 }
157
158 pub fn ready_peers_iter(&self) -> impl Iterator<Item = (&PeerId, &P2pPeerStatusReady)> {
159 self.peers
160 .iter()
161 .filter_map(|(id, p)| Some((id, p.status.as_ready()?)))
162 }
163
164 pub fn ready_rpc_peers_iter(
165 &self,
166 ) -> impl '_ + Iterator<Item = (&PeerId, &P2pPeerStatusReady)> {
167 self.ready_peers_iter()
168 .filter(|(_, p)| p.channels.rpc.can_send_request())
169 }
170
171 pub fn ready_peers(&self) -> Vec<PeerId> {
172 self.peers
173 .iter()
174 .filter(|(_, p)| p.status.as_ready().is_some())
175 .map(|(id, _)| *id)
176 .collect()
177 }
178
179 pub fn connected_or_connecting_peers_count(&self) -> usize {
180 self.peers
181 .iter()
182 .filter(|(_, p)| p.status.is_connected_or_connecting())
183 .count()
184 }
185
186 pub fn is_peer_connecting(&self, peer_id: &PeerId) -> bool {
187 self.peers
188 .get(peer_id)
189 .and_then(|p| p.status.as_connecting())
190 .is_some_and(|p| !p.is_error())
191 }
192
193 pub fn is_peer_connected_or_connecting(&self, peer_id: &PeerId) -> bool {
194 self.peers
195 .get(peer_id)
196 .is_some_and(|p| p.status.is_connected_or_connecting())
197 }
198
199 pub fn is_libp2p_peer(&self, peer_id: &PeerId) -> bool {
200 self.peers.get(peer_id).is_some_and(|p| p.is_libp2p())
201 }
202
203 pub fn is_peer_rpc_timed_out(
204 &self,
205 peer_id: &PeerId,
206 rpc_id: P2pRpcId,
207 now: redux::Timestamp,
208 ) -> bool {
209 self.get_ready_peer(peer_id).is_some_and(|p| {
210 p.channels
211 .rpc
212 .is_timed_out(rpc_id, now, &self.config.timeouts)
213 })
214 }
215
216 pub fn is_peer_streaming_rpc_timed_out(
217 &self,
218 peer_id: &PeerId,
219 rpc_id: P2pStreamingRpcId,
220 now: redux::Timestamp,
221 ) -> bool {
222 self.get_ready_peer(peer_id).is_some_and(|p| {
223 p.channels
224 .streaming_rpc
225 .is_timed_out(rpc_id, now, &self.config.timeouts)
226 })
227 }
228
229 pub fn peer_rpc_timeouts(&self, now: redux::Timestamp) -> Vec<(PeerId, P2pRpcId, bool)> {
230 let config = &self.config.timeouts;
231 self.ready_peers_iter()
232 .flat_map(|(peer_id, s)| {
233 let pending_rpc = s.channels.rpc.pending_local_rpc_id();
234 let pending_streaming_rpc = s.channels.streaming_rpc.pending_local_rpc_id();
235 IntoIterator::into_iter([
236 pending_rpc
237 .filter(|id| s.channels.rpc.is_timed_out(*id, now, config))
238 .map(|id| (*peer_id, id, false)),
239 pending_streaming_rpc
240 .filter(|id| s.channels.streaming_rpc.is_timed_out(*id, now, config))
241 .map(|id| (*peer_id, id, true)),
242 ])
243 .flatten()
244 })
245 .collect()
246 }
247
248 pub fn peer_streaming_rpc_timeouts(
249 &self,
250 now: redux::Timestamp,
251 ) -> Vec<(PeerId, P2pStreamingRpcId)> {
252 self.ready_peers_iter()
253 .filter_map(|(peer_id, s)| {
254 let rpc_id = s.channels.streaming_rpc.pending_local_rpc_id()?;
255 if !s
256 .channels
257 .streaming_rpc
258 .is_timed_out(rpc_id, now, &self.config.timeouts)
259 {
260 return None;
261 }
262
263 Some((*peer_id, rpc_id))
264 })
265 .collect()
266 }
267
268 pub fn already_has_min_peers(&self) -> bool {
269 self.connected_or_connecting_peers_count() >= self.config.limits.min_peers()
270 }
271
272 pub fn already_has_max_peers(&self) -> bool {
273 self.connected_or_connecting_peers_count() >= self.config.limits.max_peers()
274 }
275
276 pub fn already_has_max_ready_peers(&self) -> bool {
278 self.ready_peers_iter().count() >= self.config.limits.max_peers()
279 }
280
281 pub fn min_peers(&self) -> Limit<usize> {
283 self.config.limits.min_peers()
284 }
285
286 #[cfg(feature = "p2p-libp2p")]
288 pub fn peer_with_connection(
289 &self,
290 conn_id: crate::ConnectionAddr,
291 ) -> Option<(PeerId, &P2pPeerState)> {
292 let result = if let crate::ConnectionAddr {
293 sock_addr,
294 incoming: false,
295 } = conn_id
296 {
297 self.peers
298 .iter()
299 .find(|(_, peer_state)| match &peer_state.dial_opts {
300 Some(P2pConnectionOutgoingInitOpts::LibP2P(libp2p_opts)) => {
301 libp2p_opts.matches_socket_addr(sock_addr)
302 }
303 _ => false,
304 })
305 } else {
306 None
307 };
308
309 result
310 .or_else(|| {
311 self.network
312 .scheduler
313 .connections
314 .get(&conn_id)
315 .and_then(|state| {
316 state
317 .peer_id()
318 .and_then(|peer_id| self.peers.iter().find(|(id, _)| *id == peer_id))
319 })
320 })
321 .map(|(peer_id, peer_state)| (*peer_id, peer_state))
322 }
323
324 pub fn incoming_peer_connection_mut(
325 &mut self,
326 peer_id: &PeerId,
327 ) -> Option<&mut P2pConnectionIncomingState> {
328 let peer_state = self.peers.get_mut(peer_id)?;
329
330 match &mut peer_state.status {
331 P2pPeerStatus::Connecting(P2pConnectionState::Incoming(state)) => Some(state),
332 _ => None,
333 }
334 }
335
336 pub fn outgoing_peer_connection_mut(
337 &mut self,
338 peer_id: &PeerId,
339 ) -> Option<&mut P2pConnectionOutgoingState> {
340 let peer_state = self.peers.get_mut(peer_id)?;
341
342 match &mut peer_state.status {
343 P2pPeerStatus::Connecting(P2pConnectionState::Outgoing(state)) => Some(state),
344 _ => None,
345 }
346 }
347}
348
349#[derive(Serialize, Deserialize, Debug, Clone, MallocSizeOf)]
350pub struct P2pPeerState {
351 pub is_libp2p: bool,
352 pub dial_opts: Option<P2pConnectionOutgoingInitOpts>,
353 pub status: P2pPeerStatus,
354 pub identify: Option<P2pNetworkIdentify>,
355}
356
357impl P2pPeerState {
358 pub fn is_libp2p(&self) -> bool {
359 self.is_libp2p
360 }
361
362 pub fn connection_rpc_id(&self) -> Option<RpcId> {
363 match &self.status {
364 P2pPeerStatus::Connecting(v) => v.rpc_id(),
365 _ => None,
366 }
367 }
368
369 pub fn can_reconnect(&self, now: Timestamp, timeouts: &P2pTimeouts) -> bool {
373 self.dial_opts.is_some()
374 && match &self.status {
375 P2pPeerStatus::Connecting(P2pConnectionState::Incoming(
376 P2pConnectionIncomingState::Error { time, .. },
377 )) => is_time_passed(now, *time, timeouts.incoming_error_reconnect_timeout),
378 P2pPeerStatus::Connecting(P2pConnectionState::Outgoing(
379 P2pConnectionOutgoingState::Error { time, .. },
380 )) => is_time_passed(now, *time, timeouts.outgoing_error_reconnect_timeout),
381 P2pPeerStatus::Disconnected { time } => {
382 *time == Timestamp::ZERO
383 || is_time_passed(now, *time, timeouts.reconnect_timeout)
384 }
385 P2pPeerStatus::Disconnecting { time } => {
386 if !is_time_passed(now, *time, timeouts.reconnect_timeout) {
387 false
388 } else {
389 #[cfg(not(test))]
390 openmina_core::bug_condition!(
391 "peer stuck in `P2pPeerStatus::Disconnecting` state?"
392 );
393 #[cfg(test)]
394 openmina_core::warn!(*time; "peer stuck in `P2pPeerStatus::Disconnecting` state?");
397 true
398 }
399 }
400 _ => false,
401 }
402 }
403}
404
405#[derive(Serialize, Deserialize, Debug, Clone, MallocSizeOf)]
406#[serde(tag = "state")]
407pub enum P2pPeerStatus {
408 Connecting(P2pConnectionState),
409 Disconnecting {
410 #[ignore_malloc_size_of = "doesn't allocate"]
411 time: redux::Timestamp,
412 },
413 Disconnected {
414 #[ignore_malloc_size_of = "doesn't allocate"]
415 time: redux::Timestamp,
416 },
417
418 Ready(P2pPeerStatusReady),
419}
420
421impl P2pPeerStatus {
422 pub fn is_connecting_success(&self) -> bool {
425 match self {
426 Self::Connecting(v) => v.is_success(),
427 _ => false,
428 }
429 }
430
431 pub fn is_connected_or_connecting(&self) -> bool {
432 match self {
433 Self::Connecting(s) => !s.is_error(),
434 Self::Ready(_) => true,
435 Self::Disconnecting { .. } => false,
436 Self::Disconnected { .. } => false,
437 }
438 }
439
440 pub fn is_disconnected_or_disconnecting(&self) -> bool {
441 self.disconnected_or_disconnecting_time().is_some()
442 }
443
444 pub fn disconnected_or_disconnecting_time(&self) -> Option<redux::Timestamp> {
445 match self {
446 Self::Disconnecting { time } | Self::Disconnected { time } => Some(*time),
447 _ => None,
448 }
449 }
450
451 pub fn as_connecting(&self) -> Option<&P2pConnectionState> {
452 match self {
453 Self::Connecting(v) => Some(v),
454 _ => None,
455 }
456 }
457
458 pub fn as_ready(&self) -> Option<&P2pPeerStatusReady> {
459 match self {
460 Self::Ready(v) => Some(v),
461 _ => None,
462 }
463 }
464
465 pub fn as_ready_mut(&mut self) -> Option<&mut P2pPeerStatusReady> {
466 match self {
467 Self::Ready(v) => Some(v),
468 _ => None,
469 }
470 }
471
472 pub fn is_error(&self) -> bool {
473 matches!(self, P2pPeerStatus::Connecting(s) if s.is_error())
474 }
475}
476
477#[derive(Serialize, Deserialize, Debug, Clone)]
478pub struct P2pPeerStatusReady {
479 pub is_incoming: bool,
480 pub connected_since: redux::Timestamp,
481 pub channels: P2pChannelsState,
482 pub best_tip: Option<ArcBlockWithHash>,
483}
484
485impl P2pPeerStatusReady {
486 pub fn new(
487 is_incoming: bool,
488 time: redux::Timestamp,
489 enabled_channels: &BTreeSet<ChannelId>,
490 ) -> Self {
491 Self {
492 is_incoming,
493 connected_since: time,
494 channels: P2pChannelsState::new(enabled_channels),
495 best_tip: None,
496 }
497 }
498
499 pub fn connected_for(&self, now: redux::Timestamp) -> Duration {
500 now.checked_sub(self.connected_since).unwrap_or_default()
501 }
502}
503
504impl SubstateAccess<P2pState> for P2pState {
505 fn substate(&self) -> openmina_core::SubstateResult<&P2pState> {
506 Ok(self)
507 }
508
509 fn substate_mut(&mut self) -> openmina_core::SubstateResult<&mut P2pState> {
510 Ok(self)
511 }
512}
513
514type OptionalCallback<T> = Option<Callback<T>>;
515
516#[derive(Serialize, Deserialize, Debug, Clone, Default)]
517pub struct P2pCallbacks {
518 pub on_p2p_channels_transaction_received: OptionalCallback<(PeerId, Box<TransactionInfo>)>,
520 pub on_p2p_channels_transactions_libp2p_received: OptionalCallback<(
522 PeerId,
523 Vec<TransactionWithHash>,
524 P2pNetworkPubsubMessageCacheId,
525 )>,
526 pub on_p2p_channels_snark_job_commitment_received:
528 OptionalCallback<(PeerId, Box<SnarkJobCommitment>)>,
529
530 pub on_p2p_channels_snark_received: OptionalCallback<(PeerId, Box<SnarkInfo>)>,
532 pub on_p2p_channels_snark_libp2p_received: OptionalCallback<(PeerId, Box<Snark>)>,
534
535 pub on_p2p_channels_best_tip_request_received: OptionalCallback<PeerId>,
537
538 pub on_p2p_disconnection_finish: OptionalCallback<PeerId>,
540
541 pub on_p2p_connection_outgoing_error: OptionalCallback<(RpcId, P2pConnectionOutgoingError)>,
544 pub on_p2p_connection_outgoing_success: OptionalCallback<RpcId>,
546
547 pub on_p2p_connection_incoming_error: OptionalCallback<(RpcId, String)>,
550 pub on_p2p_connection_incoming_success: OptionalCallback<RpcId>,
552 pub on_p2p_connection_incoming_answer_ready:
554 OptionalCallback<(RpcId, PeerId, P2pConnectionResponse)>,
555
556 pub on_p2p_peer_best_tip_update:
558 OptionalCallback<BlockWithHash<Arc<v2::MinaBlockBlockStableV2>>>,
559
560 pub on_p2p_channels_rpc_ready: OptionalCallback<PeerId>,
562 pub on_p2p_channels_rpc_timeout: OptionalCallback<(PeerId, P2pRpcId)>,
564 pub on_p2p_channels_rpc_response_received:
566 OptionalCallback<(PeerId, P2pRpcId, Option<Box<P2pRpcResponse>>)>,
567 pub on_p2p_channels_rpc_request_received:
569 OptionalCallback<(PeerId, P2pRpcId, Box<P2pRpcRequest>)>,
570
571 pub on_p2p_channels_streaming_rpc_ready: OptionalCallback<()>,
573 pub on_p2p_channels_streaming_rpc_timeout: OptionalCallback<(PeerId, P2pRpcId)>,
575 pub on_p2p_channels_streaming_rpc_response_received:
577 OptionalCallback<(PeerId, P2pRpcId, Option<P2pStreamingRpcResponseFull>)>,
578
579 pub on_p2p_pubsub_message_received: OptionalCallback<P2pNetworkPubsubMessageCacheId>,
581}
582
583impl_substate_access!(P2pState, P2pNetworkState, network);
584impl_substate_access!(P2pState, P2pNetworkSchedulerState, network.scheduler);
585impl_substate_access!(P2pState, P2pLimits, config.limits);
586
587impl SubstateAccess<P2pNetworkKadState> for P2pState {
588 fn substate(&self) -> openmina_core::SubstateResult<&P2pNetworkKadState> {
589 self.network
590 .scheduler
591 .discovery_state()
592 .ok_or_else(|| "kademlia state is unavailable".to_owned())
593 }
594
595 fn substate_mut(&mut self) -> openmina_core::SubstateResult<&mut P2pNetworkKadState> {
596 self.network
597 .scheduler
598 .discovery_state
599 .as_mut()
600 .ok_or_else(|| "kademlia state is unavailable".to_owned())
601 }
602}
603
604impl SubstateAccess<P2pNetworkKadBootstrapState> for P2pState {
605 fn substate(&self) -> openmina_core::SubstateResult<&P2pNetworkKadBootstrapState> {
606 let kad_state: &P2pNetworkKadState = self.substate()?;
607 kad_state
608 .bootstrap_state()
609 .ok_or_else(|| "bootstrap state is unavailable".to_owned())
610 }
611
612 fn substate_mut(&mut self) -> openmina_core::SubstateResult<&mut P2pNetworkKadBootstrapState> {
613 let kad_state: &mut P2pNetworkKadState = self.substate_mut()?;
614 kad_state
615 .bootstrap_state_mut()
616 .ok_or_else(|| "bootstrap state is unavailable".to_owned())
617 }
618}
619
620impl_substate_access!(
621 P2pState,
622 P2pNetworkIdentifyState,
623 network.scheduler.identify_state
624);
625impl_substate_access!(
626 P2pState,
627 P2pNetworkPubsubState,
628 network.scheduler.broadcast_state
629);
630impl_substate_access!(P2pState, P2pConfig, config);
631
632mod measurement {
633 use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
634
635 use super::*;
636
637 impl MallocSizeOf for P2pState {
638 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
639 self.peers.values().map(|v| v.size_of(ops)).sum::<usize>()
640 + self.network.scheduler.size_of(ops)
641 }
642 }
643
644 impl MallocSizeOf for P2pPeerStatusReady {
645 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
646 self.best_tip
647 .as_ref()
648 .map(|v| {
649 usize::from(!ops.have_seen_ptr(Arc::as_ptr(&v.block)))
650 * (size_of::<v2::MinaBlockBlockStableV2>() + v.block.size_of(ops))
651 })
652 .unwrap_or_default()
653 }
655 }
656}