p2p/network/
p2p_network_state.rs

1use multiaddr::Multiaddr;
2use openmina_core::ChainId;
3use serde::{Deserialize, Serialize};
4
5use crate::{identity::PublicKey, PeerId};
6
7use super::*;
8
9#[derive(Serialize, Deserialize, Debug, Clone)]
10pub struct P2pNetworkState {
11    pub scheduler: P2pNetworkSchedulerState,
12}
13
14impl P2pNetworkState {
15    pub fn new(
16        identity: PublicKey,
17        addrs: Vec<Multiaddr>,
18        known_peers: Vec<(PeerId, Multiaddr)>,
19        chain_id: &ChainId,
20        discovery: bool,
21    ) -> Self {
22        let peer_id = identity.peer_id();
23        let pnet_key = chain_id.preshared_key();
24        let discovery_state = discovery.then(|| {
25            let mut routing_table = P2pNetworkKadRoutingTable::new(
26                P2pNetworkKadEntry::new(peer_id, addrs).expect("valid peer_id"),
27            );
28            routing_table.extend(known_peers.into_iter().map(|(peer_id, maddr)| {
29                P2pNetworkKadEntry::new(peer_id, vec![maddr]).expect("valid known peer")
30            }));
31            P2pNetworkKadState {
32                routing_table,
33                ..Default::default()
34            }
35        });
36
37        P2pNetworkState {
38            scheduler: P2pNetworkSchedulerState {
39                interfaces: Default::default(),
40                listeners: Default::default(),
41                local_pk: identity,
42                pnet_key,
43                connections: Default::default(),
44                broadcast_state: Default::default(),
45                identify_state: Default::default(),
46                discovery_state,
47                rpc_incoming_streams: Default::default(),
48                rpc_outgoing_streams: Default::default(),
49            },
50        }
51    }
52
53    pub fn find_rpc_state(&self, a: &P2pNetworkRpcAction) -> Option<&P2pNetworkRpcState> {
54        match a.stream_id() {
55            RpcStreamId::Exact(stream_id) => self
56                .scheduler
57                .rpc_incoming_streams
58                .get(a.peer_id())
59                .and_then(|cn| cn.get(&stream_id))
60                .or_else(|| {
61                    self.scheduler
62                        .rpc_outgoing_streams
63                        .get(a.peer_id())
64                        .and_then(|cn| cn.get(&stream_id))
65                }),
66            RpcStreamId::WithQuery(id) => self
67                .scheduler
68                .rpc_incoming_streams
69                .get(a.peer_id())
70                .and_then(|streams| {
71                    streams.iter().find_map(|(_, state)| {
72                        if state
73                            .pending
74                            .as_ref()
75                            .is_some_and(|query_header| query_header.id == id)
76                        {
77                            Some(state)
78                        } else {
79                            None
80                        }
81                    })
82                }),
83            RpcStreamId::AnyIncoming => self
84                .scheduler
85                .rpc_incoming_streams
86                .get(a.peer_id())
87                .and_then(|stream| stream.first_key_value())
88                .map(|(_k, v)| v),
89            RpcStreamId::AnyOutgoing => {
90                if let Some(streams) = self.scheduler.rpc_outgoing_streams.get(a.peer_id()) {
91                    if let Some((k, _)) = streams.first_key_value() {
92                        return Some(streams.get(k).expect("checked above"));
93                    }
94                }
95
96                None
97            }
98        }
99    }
100}