p2p/network/
p2p_network_state.rs1use multiaddr::Multiaddr;
2use openmina_core::ChainId;
3use serde::{Deserialize, Serialize};
4
5use crate::{identity::PublicKey, PeerId};
6
7use super::*;
8
9#[derive(Serialize, Deserialize, Debug, Clone)]
10pub struct P2pNetworkState {
11 pub scheduler: P2pNetworkSchedulerState,
12}
13
14impl P2pNetworkState {
15 pub fn new(
16 identity: PublicKey,
17 addrs: Vec<Multiaddr>,
18 known_peers: Vec<(PeerId, Multiaddr)>,
19 chain_id: &ChainId,
20 discovery: bool,
21 ) -> Self {
22 let peer_id = identity.peer_id();
23 let pnet_key = chain_id.preshared_key();
24 let discovery_state = discovery.then(|| {
25 let mut routing_table = P2pNetworkKadRoutingTable::new(
26 P2pNetworkKadEntry::new(peer_id, addrs).expect("valid peer_id"),
27 );
28 routing_table.extend(known_peers.into_iter().map(|(peer_id, maddr)| {
29 P2pNetworkKadEntry::new(peer_id, vec![maddr]).expect("valid known peer")
30 }));
31 P2pNetworkKadState {
32 routing_table,
33 ..Default::default()
34 }
35 });
36
37 P2pNetworkState {
38 scheduler: P2pNetworkSchedulerState {
39 interfaces: Default::default(),
40 listeners: Default::default(),
41 local_pk: identity,
42 pnet_key,
43 connections: Default::default(),
44 broadcast_state: Default::default(),
45 identify_state: Default::default(),
46 discovery_state,
47 rpc_incoming_streams: Default::default(),
48 rpc_outgoing_streams: Default::default(),
49 },
50 }
51 }
52
53 pub fn find_rpc_state(&self, a: &P2pNetworkRpcAction) -> Option<&P2pNetworkRpcState> {
54 match a.stream_id() {
55 RpcStreamId::Exact(stream_id) => self
56 .scheduler
57 .rpc_incoming_streams
58 .get(a.peer_id())
59 .and_then(|cn| cn.get(&stream_id))
60 .or_else(|| {
61 self.scheduler
62 .rpc_outgoing_streams
63 .get(a.peer_id())
64 .and_then(|cn| cn.get(&stream_id))
65 }),
66 RpcStreamId::WithQuery(id) => self
67 .scheduler
68 .rpc_incoming_streams
69 .get(a.peer_id())
70 .and_then(|streams| {
71 streams.iter().find_map(|(_, state)| {
72 if state
73 .pending
74 .as_ref()
75 .is_some_and(|query_header| query_header.id == id)
76 {
77 Some(state)
78 } else {
79 None
80 }
81 })
82 }),
83 RpcStreamId::AnyIncoming => self
84 .scheduler
85 .rpc_incoming_streams
86 .get(a.peer_id())
87 .and_then(|stream| stream.first_key_value())
88 .map(|(_k, v)| v),
89 RpcStreamId::AnyOutgoing => {
90 if let Some(streams) = self.scheduler.rpc_outgoing_streams.get(a.peer_id()) {
91 if let Some((k, _)) = streams.first_key_value() {
92 return Some(streams.get(k).expect("checked above"));
93 }
94 }
95
96 None
97 }
98 }
99 }
100}