mina_p2p/channels/rpc/
mod.rs

1mod p2p_channels_rpc_state;
2pub use p2p_channels_rpc_state::*;
3
4mod p2p_channels_rpc_actions;
5pub use p2p_channels_rpc_actions::*;
6
7mod p2p_channels_rpc_reducer;
8
9use std::{sync::Arc, time::Duration};
10
11use binprot_derive::{BinProtRead, BinProtWrite};
12use mina_core::{
13    block::ArcBlock,
14    snark::{Snark, SnarkJobId},
15    transaction::{Transaction, TransactionHash},
16};
17use mina_p2p_messages::{
18    list::List,
19    rpc_kernel::QueryID,
20    v2::{
21        LedgerHash, MerkleAddressBinableArgStableV1, MinaBasePendingCoinbaseStableV2,
22        MinaBaseStateBodyHashStableV1, MinaLedgerSyncLedgerAnswerStableV2,
23        MinaLedgerSyncLedgerQueryStableV1, MinaStateProtocolStateValueStableV2, StateHash,
24        TransactionSnarkScanStateStableV2,
25    },
26};
27use serde::{Deserialize, Serialize};
28
29use crate::{connection::outgoing::P2pConnectionOutgoingInitOpts, P2pTimeouts};
30
31pub type P2pRpcId = QueryID;
32
33#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, Clone)]
34pub enum RpcChannelMsg {
35    Request(P2pRpcId, P2pRpcRequest),
36    Response(P2pRpcId, Option<P2pRpcResponse>),
37}
38
39impl RpcChannelMsg {
40    pub fn request_id(&self) -> P2pRpcId {
41        match self {
42            Self::Request(id, _) => *id,
43            Self::Response(id, _) => *id,
44        }
45    }
46}
47
48#[derive(Serialize, Deserialize, Debug, Ord, PartialOrd, Eq, PartialEq, Clone)]
49pub enum P2pRpcKind {
50    BestTipWithProof,
51    LedgerQuery,
52    StagedLedgerAuxAndPendingCoinbasesAtBlock,
53    Block,
54    Snark,
55    Transaction,
56    InitialPeers,
57}
58
59impl P2pRpcKind {
60    pub fn timeout(self, config: &P2pTimeouts) -> Option<Duration> {
61        match self {
62            Self::BestTipWithProof => config.best_tip_with_proof,
63            Self::LedgerQuery => config.ledger_query,
64            Self::StagedLedgerAuxAndPendingCoinbasesAtBlock => {
65                config.staged_ledger_aux_and_pending_coinbases_at_block
66            }
67            Self::Block => config.block,
68            Self::Snark => config.snark,
69            Self::Transaction => config.transaction,
70            Self::InitialPeers => config.initial_peers,
71        }
72    }
73
74    pub fn supported_by_libp2p(self) -> bool {
75        match self {
76            Self::BestTipWithProof => true,
77            Self::LedgerQuery => true,
78            Self::StagedLedgerAuxAndPendingCoinbasesAtBlock => true,
79            Self::Block => true,
80            Self::Snark => false,
81            Self::Transaction => false,
82            Self::InitialPeers => true,
83        }
84    }
85}
86
87#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
88pub enum P2pRpcRequest {
89    #[default]
90    BestTipWithProof,
91    LedgerQuery(LedgerHash, MinaLedgerSyncLedgerQueryStableV1),
92    StagedLedgerAuxAndPendingCoinbasesAtBlock(StateHash),
93    Block(StateHash),
94    Snark(SnarkJobId),
95    Transaction(TransactionHash),
96    InitialPeers,
97}
98
99impl P2pRpcRequest {
100    pub fn kind(&self) -> P2pRpcKind {
101        match self {
102            Self::BestTipWithProof => P2pRpcKind::BestTipWithProof,
103            Self::LedgerQuery(..) => P2pRpcKind::LedgerQuery,
104            Self::StagedLedgerAuxAndPendingCoinbasesAtBlock(_) => {
105                P2pRpcKind::StagedLedgerAuxAndPendingCoinbasesAtBlock
106            }
107            Self::Block(_) => P2pRpcKind::Block,
108            Self::Snark(_) => P2pRpcKind::Snark,
109            Self::Transaction(_) => P2pRpcKind::Transaction,
110            Self::InitialPeers => P2pRpcKind::InitialPeers,
111        }
112    }
113}
114
115fn addr_to_str(
116    MerkleAddressBinableArgStableV1(mina_p2p_messages::number::Number(length), byte_string): &MerkleAddressBinableArgStableV1,
117) -> String {
118    let addr = byte_string
119        .as_ref()
120        .iter()
121        .copied()
122        .flat_map(|byte| {
123            (0..8)
124                .map(move |b| byte & (1 << (7 - b)) != 0)
125                .map(|b| if b { '1' } else { '0' })
126        })
127        .take(*length as usize)
128        .collect::<String>();
129
130    format!("depth: {length}, addr: {addr}")
131}
132
133impl std::fmt::Display for P2pRpcRequest {
134    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135        write!(f, "{:?}", self.kind())?;
136        match self {
137            Self::BestTipWithProof => Ok(()),
138            Self::LedgerQuery(ledger_hash, query) => {
139                match query {
140                    MinaLedgerSyncLedgerQueryStableV1::NumAccounts => write!(f, ", NumAccounts, ")?,
141                    MinaLedgerSyncLedgerQueryStableV1::WhatChildHashes(addr) => {
142                        write!(f, ", ChildHashes, {}, ", addr_to_str(addr))?
143                    }
144                    MinaLedgerSyncLedgerQueryStableV1::WhatContents(addr) => {
145                        write!(f, ", ChildContents, {}, ", addr_to_str(addr))?
146                    }
147                }
148                write!(f, "ledger: {ledger_hash}")
149            }
150            Self::StagedLedgerAuxAndPendingCoinbasesAtBlock(block_hash)
151            | Self::Block(block_hash) => {
152                write!(f, ", {block_hash}")
153            }
154            Self::Snark(job_id) => {
155                write!(f, ", {job_id}")
156            }
157            Self::Transaction(hash) => {
158                write!(f, ", {hash}")
159            }
160            Self::InitialPeers => Ok(()),
161        }
162    }
163}
164
165#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, Clone)]
166pub struct BestTipWithProof {
167    pub best_tip: ArcBlock,
168    pub proof: (List<MinaBaseStateBodyHashStableV1>, ArcBlock),
169}
170
171/// Pieces required to reconstruct staged ledger from snarked ledger.
172#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, Clone)]
173pub struct StagedLedgerAuxAndPendingCoinbases {
174    pub scan_state: TransactionSnarkScanStateStableV2,
175    pub staged_ledger_hash: LedgerHash,
176    pub pending_coinbase: MinaBasePendingCoinbaseStableV2,
177    pub needed_blocks: List<MinaStateProtocolStateValueStableV2>,
178}
179
180#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, Clone)]
181pub enum P2pRpcResponse {
182    BestTipWithProof(BestTipWithProof),
183    LedgerQuery(MinaLedgerSyncLedgerAnswerStableV2),
184    StagedLedgerAuxAndPendingCoinbasesAtBlock(Arc<StagedLedgerAuxAndPendingCoinbases>),
185    Block(ArcBlock),
186    Snark(Snark),
187    Transaction(Transaction),
188    InitialPeers(List<P2pConnectionOutgoingInitOpts>),
189}
190
191impl P2pRpcResponse {
192    pub fn kind(&self) -> P2pRpcKind {
193        match self {
194            Self::BestTipWithProof(_) => P2pRpcKind::BestTipWithProof,
195            Self::LedgerQuery(_) => P2pRpcKind::LedgerQuery,
196            Self::StagedLedgerAuxAndPendingCoinbasesAtBlock(_) => {
197                P2pRpcKind::StagedLedgerAuxAndPendingCoinbasesAtBlock
198            }
199            Self::Block(_) => P2pRpcKind::Block,
200            Self::Snark(_) => P2pRpcKind::Snark,
201            Self::Transaction(_) => P2pRpcKind::Transaction,
202            Self::InitialPeers(_) => P2pRpcKind::InitialPeers,
203        }
204    }
205}
206
207#[cfg(feature = "p2p-libp2p")]
208mod libp2p {
209    use super::*;
210    use crate::Data;
211    use mina_p2p_messages::{
212        rpc,
213        rpc_kernel::{
214            NeedsLength, QueryHeader, QueryPayload, ResponseHeader, ResponsePayload, RpcMethod,
215            RpcResult,
216        },
217    };
218
219    pub fn internal_response_into_libp2p(
220        response: P2pRpcResponse,
221        id: P2pRpcId,
222    ) -> Option<(ResponseHeader, Data)> {
223        use binprot::BinProtWrite;
224
225        match response {
226            P2pRpcResponse::BestTipWithProof(r) => {
227                type Method = rpc::GetBestTipV2;
228                type Payload = ResponsePayload<<Method as RpcMethod>::Response>;
229
230                let BestTipWithProof {
231                    best_tip,
232                    proof: (middle, block),
233                } = r;
234
235                let r = RpcResult(Ok(NeedsLength(Some(rpc::ProofCarryingDataStableV1 {
236                    data: best_tip.as_ref().clone(),
237                    proof: (middle, block.as_ref().clone()),
238                }))));
239
240                let mut v = vec![];
241                <Payload as BinProtWrite>::binprot_write(&r, &mut v).unwrap_or_default();
242                Some((ResponseHeader { id: id as _ }, v.into()))
243            }
244            P2pRpcResponse::LedgerQuery(answer) => {
245                type Method = rpc::AnswerSyncLedgerQueryV2;
246                type Payload = ResponsePayload<<Method as RpcMethod>::Response>;
247
248                let r = RpcResult(Ok(NeedsLength(RpcResult(Ok(answer)))));
249
250                let mut v = vec![];
251                <Payload as BinProtWrite>::binprot_write(&r, &mut v).unwrap_or_default();
252                Some((ResponseHeader { id: id as _ }, v.into()))
253            }
254            P2pRpcResponse::StagedLedgerAuxAndPendingCoinbasesAtBlock(staged_ledger_info) => {
255                type Method = rpc::GetStagedLedgerAuxAndPendingCoinbasesAtHashV2;
256                type Payload = ResponsePayload<<Method as RpcMethod>::Response>;
257
258                let StagedLedgerAuxAndPendingCoinbases {
259                    scan_state,
260                    staged_ledger_hash,
261                    pending_coinbase,
262                    needed_blocks,
263                } = staged_ledger_info.as_ref().clone();
264
265                let hash = staged_ledger_hash.inner().0.clone();
266
267                let r = RpcResult(Ok(NeedsLength(Some((
268                    scan_state,
269                    hash,
270                    pending_coinbase,
271                    needed_blocks,
272                )))));
273
274                let mut v = vec![];
275                <Payload as BinProtWrite>::binprot_write(&r, &mut v).unwrap_or_default();
276                Some((ResponseHeader { id: id as _ }, v.into()))
277            }
278            P2pRpcResponse::Block(block) => {
279                type Method = rpc::GetTransitionChainV2;
280                type Payload = ResponsePayload<<Method as RpcMethod>::Response>;
281
282                let r = RpcResult(Ok(NeedsLength(Some(List::one(block.as_ref().clone())))));
283
284                let mut v = vec![];
285                <Payload as BinProtWrite>::binprot_write(&r, &mut v).unwrap_or_default();
286                Some((ResponseHeader { id: id as _ }, v.into()))
287            }
288            P2pRpcResponse::Snark(_) => {
289                // should use gossipsub to broadcast
290                None
291            }
292            P2pRpcResponse::Transaction(_) => {
293                // should use gossipsub to broadcast
294                None
295            }
296            P2pRpcResponse::InitialPeers(peers) => {
297                type Method = rpc::GetSomeInitialPeersV1ForV2;
298                type Payload = ResponsePayload<<Method as RpcMethod>::Response>;
299
300                let r = peers
301                    .into_iter()
302                    .filter_map(|peer| peer.try_into_mina_rpc())
303                    .collect();
304                let r = RpcResult(Ok(NeedsLength(r)));
305
306                let mut v = vec![];
307                <Payload as BinProtWrite>::binprot_write(&r, &mut v).unwrap_or_default();
308                Some((ResponseHeader { id: id as _ }, v.into()))
309            }
310        }
311    }
312
313    pub fn internal_request_into_libp2p(
314        request: P2pRpcRequest,
315        id: P2pRpcId,
316    ) -> Option<(QueryHeader, Data)> {
317        use binprot::BinProtWrite;
318
319        match request {
320            P2pRpcRequest::BestTipWithProof => {
321                type Method = rpc::GetBestTipV2;
322                type Payload = QueryPayload<<Method as RpcMethod>::Query>;
323
324                let mut v = vec![];
325                <Payload as BinProtWrite>::binprot_write(&NeedsLength(()), &mut v)
326                    .unwrap_or_default();
327                Some((
328                    QueryHeader {
329                        tag: Method::NAME.into(),
330                        version: Method::VERSION,
331                        id: id as _,
332                    },
333                    v.into(),
334                ))
335            }
336            P2pRpcRequest::LedgerQuery(hash, q) => {
337                type Method = rpc::AnswerSyncLedgerQueryV2;
338                type Payload = QueryPayload<<Method as RpcMethod>::Query>;
339
340                let mut v = vec![];
341                <Payload as BinProtWrite>::binprot_write(&NeedsLength((hash.0.clone(), q)), &mut v)
342                    .unwrap_or_default();
343                Some((
344                    QueryHeader {
345                        tag: Method::NAME.into(),
346                        version: Method::VERSION,
347                        id: id as _,
348                    },
349                    v.into(),
350                ))
351            }
352            P2pRpcRequest::StagedLedgerAuxAndPendingCoinbasesAtBlock(hash) => {
353                type Method = rpc::GetStagedLedgerAuxAndPendingCoinbasesAtHashV2;
354                type Payload = QueryPayload<<Method as RpcMethod>::Query>;
355
356                let mut v = vec![];
357                <Payload as BinProtWrite>::binprot_write(&NeedsLength(hash.0.clone()), &mut v)
358                    .unwrap_or_default();
359                Some((
360                    QueryHeader {
361                        tag: Method::NAME.into(),
362                        version: Method::VERSION,
363                        id: id as _,
364                    },
365                    v.into(),
366                ))
367            }
368            P2pRpcRequest::Block(hash) => {
369                type Method = rpc::GetTransitionChainV2;
370                type Payload = QueryPayload<<Method as RpcMethod>::Query>;
371
372                let mut v = vec![];
373                <Payload as BinProtWrite>::binprot_write(
374                    &NeedsLength(List::one(hash.0.clone())),
375                    &mut v,
376                )
377                .unwrap_or_default();
378                Some((
379                    QueryHeader {
380                        tag: Method::NAME.into(),
381                        version: Method::VERSION,
382                        id: id as _,
383                    },
384                    v.into(),
385                ))
386            }
387            P2pRpcRequest::Snark(hash) => {
388                let _ = hash;
389                // libp2p cannot fulfill this request
390                None
391            }
392            P2pRpcRequest::Transaction(hash) => {
393                let _ = hash;
394                // libp2p cannot fulfill this request
395                None
396            }
397            P2pRpcRequest::InitialPeers => {
398                type Method = rpc::GetSomeInitialPeersV1ForV2;
399                type Payload = QueryPayload<<Method as RpcMethod>::Query>;
400
401                let mut v = vec![];
402                <Payload as BinProtWrite>::binprot_write(&NeedsLength(()), &mut v)
403                    .unwrap_or_default();
404                Some((
405                    QueryHeader {
406                        tag: Method::NAME.into(),
407                        version: Method::VERSION,
408                        id: id as _,
409                    },
410                    v.into(),
411                ))
412            }
413        }
414    }
415}