p2p/channels/rpc/
mod.rs

1mod p2p_channels_rpc_state;
2pub use p2p_channels_rpc_state::*;
3
4mod p2p_channels_rpc_actions;
5pub use p2p_channels_rpc_actions::*;
6
7mod p2p_channels_rpc_reducer;
8
9use std::{sync::Arc, time::Duration};
10
11use binprot_derive::{BinProtRead, BinProtWrite};
12use mina_p2p_messages::{
13    list::List,
14    rpc_kernel::QueryID,
15    v2::{
16        LedgerHash, MerkleAddressBinableArgStableV1, MinaBasePendingCoinbaseStableV2,
17        MinaBaseStateBodyHashStableV1, MinaLedgerSyncLedgerAnswerStableV2,
18        MinaLedgerSyncLedgerQueryStableV1, MinaStateProtocolStateValueStableV2, StateHash,
19        TransactionSnarkScanStateStableV2,
20    },
21};
22use openmina_core::{
23    block::ArcBlock,
24    snark::{Snark, SnarkJobId},
25    transaction::{Transaction, TransactionHash},
26};
27use serde::{Deserialize, Serialize};
28
29use crate::{connection::outgoing::P2pConnectionOutgoingInitOpts, P2pTimeouts};
30
31pub type P2pRpcId = QueryID;
32
33#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, Clone)]
34pub enum RpcChannelMsg {
35    Request(P2pRpcId, P2pRpcRequest),
36    Response(P2pRpcId, Option<P2pRpcResponse>),
37}
38
39impl RpcChannelMsg {
40    pub fn request_id(&self) -> P2pRpcId {
41        match self {
42            Self::Request(id, _) => *id,
43            Self::Response(id, _) => *id,
44        }
45    }
46}
47
48#[derive(Serialize, Deserialize, Debug, Ord, PartialOrd, Eq, PartialEq, Clone)]
49pub enum P2pRpcKind {
50    BestTipWithProof,
51    LedgerQuery,
52    StagedLedgerAuxAndPendingCoinbasesAtBlock,
53    Block,
54    Snark,
55    Transaction,
56    InitialPeers,
57}
58
59impl P2pRpcKind {
60    pub fn timeout(self, config: &P2pTimeouts) -> Option<Duration> {
61        match self {
62            Self::BestTipWithProof => config.best_tip_with_proof,
63            Self::LedgerQuery => config.ledger_query,
64            Self::StagedLedgerAuxAndPendingCoinbasesAtBlock => {
65                config.staged_ledger_aux_and_pending_coinbases_at_block
66            }
67            Self::Block => config.block,
68            Self::Snark => config.snark,
69            Self::Transaction => config.transaction,
70            Self::InitialPeers => config.initial_peers,
71        }
72    }
73
74    pub fn supported_by_libp2p(self) -> bool {
75        match self {
76            Self::BestTipWithProof => true,
77            Self::LedgerQuery => true,
78            Self::StagedLedgerAuxAndPendingCoinbasesAtBlock => true,
79            Self::Block => true,
80            Self::Snark => false,
81            Self::Transaction => false,
82            Self::InitialPeers => true,
83        }
84    }
85}
86
87#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, PartialEq, Clone)]
88pub enum P2pRpcRequest {
89    BestTipWithProof,
90    LedgerQuery(LedgerHash, MinaLedgerSyncLedgerQueryStableV1),
91    StagedLedgerAuxAndPendingCoinbasesAtBlock(StateHash),
92    Block(StateHash),
93    Snark(SnarkJobId),
94    Transaction(TransactionHash),
95    InitialPeers,
96}
97
98impl P2pRpcRequest {
99    pub fn kind(&self) -> P2pRpcKind {
100        match self {
101            Self::BestTipWithProof => P2pRpcKind::BestTipWithProof,
102            Self::LedgerQuery(..) => P2pRpcKind::LedgerQuery,
103            Self::StagedLedgerAuxAndPendingCoinbasesAtBlock(_) => {
104                P2pRpcKind::StagedLedgerAuxAndPendingCoinbasesAtBlock
105            }
106            Self::Block(_) => P2pRpcKind::Block,
107            Self::Snark(_) => P2pRpcKind::Snark,
108            Self::Transaction(_) => P2pRpcKind::Transaction,
109            Self::InitialPeers => P2pRpcKind::InitialPeers,
110        }
111    }
112}
113
114impl Default for P2pRpcRequest {
115    fn default() -> Self {
116        Self::BestTipWithProof
117    }
118}
119
120fn addr_to_str(
121    MerkleAddressBinableArgStableV1(mina_p2p_messages::number::Number(length), byte_string): &MerkleAddressBinableArgStableV1,
122) -> String {
123    let addr = byte_string
124        .as_ref()
125        .iter()
126        .copied()
127        .flat_map(|byte| {
128            (0..8)
129                .map(move |b| byte & (1 << (7 - b)) != 0)
130                .map(|b| if b { '1' } else { '0' })
131        })
132        .take(*length as usize)
133        .collect::<String>();
134
135    format!("depth: {length}, addr: {addr}")
136}
137
138impl std::fmt::Display for P2pRpcRequest {
139    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140        write!(f, "{:?}", self.kind())?;
141        match self {
142            Self::BestTipWithProof => Ok(()),
143            Self::LedgerQuery(ledger_hash, query) => {
144                match query {
145                    MinaLedgerSyncLedgerQueryStableV1::NumAccounts => write!(f, ", NumAccounts, ")?,
146                    MinaLedgerSyncLedgerQueryStableV1::WhatChildHashes(addr) => {
147                        write!(f, ", ChildHashes, {}, ", addr_to_str(addr))?
148                    }
149                    MinaLedgerSyncLedgerQueryStableV1::WhatContents(addr) => {
150                        write!(f, ", ChildContents, {}, ", addr_to_str(addr))?
151                    }
152                }
153                write!(f, "ledger: {ledger_hash}")
154            }
155            Self::StagedLedgerAuxAndPendingCoinbasesAtBlock(block_hash)
156            | Self::Block(block_hash) => {
157                write!(f, ", {block_hash}")
158            }
159            Self::Snark(job_id) => {
160                write!(f, ", {job_id}")
161            }
162            Self::Transaction(hash) => {
163                write!(f, ", {hash}")
164            }
165            Self::InitialPeers => Ok(()),
166        }
167    }
168}
169
170#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, Clone)]
171pub struct BestTipWithProof {
172    pub best_tip: ArcBlock,
173    pub proof: (List<MinaBaseStateBodyHashStableV1>, ArcBlock),
174}
175
176/// Pieces required to reconstruct staged ledger from snarked ledger.
177#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, Clone)]
178pub struct StagedLedgerAuxAndPendingCoinbases {
179    pub scan_state: TransactionSnarkScanStateStableV2,
180    pub staged_ledger_hash: LedgerHash,
181    pub pending_coinbase: MinaBasePendingCoinbaseStableV2,
182    pub needed_blocks: List<MinaStateProtocolStateValueStableV2>,
183}
184
185#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, Clone)]
186pub enum P2pRpcResponse {
187    BestTipWithProof(BestTipWithProof),
188    LedgerQuery(MinaLedgerSyncLedgerAnswerStableV2),
189    StagedLedgerAuxAndPendingCoinbasesAtBlock(Arc<StagedLedgerAuxAndPendingCoinbases>),
190    Block(ArcBlock),
191    Snark(Snark),
192    Transaction(Transaction),
193    InitialPeers(List<P2pConnectionOutgoingInitOpts>),
194}
195
196impl P2pRpcResponse {
197    pub fn kind(&self) -> P2pRpcKind {
198        match self {
199            Self::BestTipWithProof(_) => P2pRpcKind::BestTipWithProof,
200            Self::LedgerQuery(_) => P2pRpcKind::LedgerQuery,
201            Self::StagedLedgerAuxAndPendingCoinbasesAtBlock(_) => {
202                P2pRpcKind::StagedLedgerAuxAndPendingCoinbasesAtBlock
203            }
204            Self::Block(_) => P2pRpcKind::Block,
205            Self::Snark(_) => P2pRpcKind::Snark,
206            Self::Transaction(_) => P2pRpcKind::Transaction,
207            Self::InitialPeers(_) => P2pRpcKind::InitialPeers,
208        }
209    }
210}
211
212#[cfg(feature = "p2p-libp2p")]
213mod libp2p {
214    use super::*;
215    use crate::Data;
216    use mina_p2p_messages::{
217        rpc,
218        rpc_kernel::{
219            NeedsLength, QueryHeader, QueryPayload, ResponseHeader, ResponsePayload, RpcMethod,
220            RpcResult,
221        },
222    };
223
224    pub fn internal_response_into_libp2p(
225        response: P2pRpcResponse,
226        id: P2pRpcId,
227    ) -> Option<(ResponseHeader, Data)> {
228        use binprot::BinProtWrite;
229
230        match response {
231            P2pRpcResponse::BestTipWithProof(r) => {
232                type Method = rpc::GetBestTipV2;
233                type Payload = ResponsePayload<<Method as RpcMethod>::Response>;
234
235                let BestTipWithProof {
236                    best_tip,
237                    proof: (middle, block),
238                } = r;
239
240                let r = RpcResult(Ok(NeedsLength(Some(rpc::ProofCarryingDataStableV1 {
241                    data: best_tip.as_ref().clone(),
242                    proof: (middle, block.as_ref().clone()),
243                }))));
244
245                let mut v = vec![];
246                <Payload as BinProtWrite>::binprot_write(&r, &mut v).unwrap_or_default();
247                Some((ResponseHeader { id: id as _ }, v.into()))
248            }
249            P2pRpcResponse::LedgerQuery(answer) => {
250                type Method = rpc::AnswerSyncLedgerQueryV2;
251                type Payload = ResponsePayload<<Method as RpcMethod>::Response>;
252
253                let r = RpcResult(Ok(NeedsLength(RpcResult(Ok(answer)))));
254
255                let mut v = vec![];
256                <Payload as BinProtWrite>::binprot_write(&r, &mut v).unwrap_or_default();
257                Some((ResponseHeader { id: id as _ }, v.into()))
258            }
259            P2pRpcResponse::StagedLedgerAuxAndPendingCoinbasesAtBlock(staged_ledger_info) => {
260                type Method = rpc::GetStagedLedgerAuxAndPendingCoinbasesAtHashV2;
261                type Payload = ResponsePayload<<Method as RpcMethod>::Response>;
262
263                let StagedLedgerAuxAndPendingCoinbases {
264                    scan_state,
265                    staged_ledger_hash,
266                    pending_coinbase,
267                    needed_blocks,
268                } = staged_ledger_info.as_ref().clone();
269
270                let hash = staged_ledger_hash.inner().0.clone();
271
272                let r = RpcResult(Ok(NeedsLength(Some((
273                    scan_state,
274                    hash,
275                    pending_coinbase,
276                    needed_blocks,
277                )))));
278
279                let mut v = vec![];
280                <Payload as BinProtWrite>::binprot_write(&r, &mut v).unwrap_or_default();
281                Some((ResponseHeader { id: id as _ }, v.into()))
282            }
283            P2pRpcResponse::Block(block) => {
284                type Method = rpc::GetTransitionChainV2;
285                type Payload = ResponsePayload<<Method as RpcMethod>::Response>;
286
287                let r = RpcResult(Ok(NeedsLength(Some(List::one(block.as_ref().clone())))));
288
289                let mut v = vec![];
290                <Payload as BinProtWrite>::binprot_write(&r, &mut v).unwrap_or_default();
291                Some((ResponseHeader { id: id as _ }, v.into()))
292            }
293            P2pRpcResponse::Snark(_) => {
294                // should use gossipsub to broadcast
295                None
296            }
297            P2pRpcResponse::Transaction(_) => {
298                // should use gossipsub to broadcast
299                None
300            }
301            P2pRpcResponse::InitialPeers(peers) => {
302                type Method = rpc::GetSomeInitialPeersV1ForV2;
303                type Payload = ResponsePayload<<Method as RpcMethod>::Response>;
304
305                let r = peers
306                    .into_iter()
307                    .filter_map(|peer| peer.try_into_mina_rpc())
308                    .collect();
309                let r = RpcResult(Ok(NeedsLength(r)));
310
311                let mut v = vec![];
312                <Payload as BinProtWrite>::binprot_write(&r, &mut v).unwrap_or_default();
313                Some((ResponseHeader { id: id as _ }, v.into()))
314            }
315        }
316    }
317
318    pub fn internal_request_into_libp2p(
319        request: P2pRpcRequest,
320        id: P2pRpcId,
321    ) -> Option<(QueryHeader, Data)> {
322        use binprot::BinProtWrite;
323
324        match request {
325            P2pRpcRequest::BestTipWithProof => {
326                type Method = rpc::GetBestTipV2;
327                type Payload = QueryPayload<<Method as RpcMethod>::Query>;
328
329                let mut v = vec![];
330                <Payload as BinProtWrite>::binprot_write(&NeedsLength(()), &mut v)
331                    .unwrap_or_default();
332                Some((
333                    QueryHeader {
334                        tag: Method::NAME.into(),
335                        version: Method::VERSION,
336                        id: id as _,
337                    },
338                    v.into(),
339                ))
340            }
341            P2pRpcRequest::LedgerQuery(hash, q) => {
342                type Method = rpc::AnswerSyncLedgerQueryV2;
343                type Payload = QueryPayload<<Method as RpcMethod>::Query>;
344
345                let mut v = vec![];
346                <Payload as BinProtWrite>::binprot_write(&NeedsLength((hash.0.clone(), q)), &mut v)
347                    .unwrap_or_default();
348                Some((
349                    QueryHeader {
350                        tag: Method::NAME.into(),
351                        version: Method::VERSION,
352                        id: id as _,
353                    },
354                    v.into(),
355                ))
356            }
357            P2pRpcRequest::StagedLedgerAuxAndPendingCoinbasesAtBlock(hash) => {
358                type Method = rpc::GetStagedLedgerAuxAndPendingCoinbasesAtHashV2;
359                type Payload = QueryPayload<<Method as RpcMethod>::Query>;
360
361                let mut v = vec![];
362                <Payload as BinProtWrite>::binprot_write(&NeedsLength(hash.0.clone()), &mut v)
363                    .unwrap_or_default();
364                Some((
365                    QueryHeader {
366                        tag: Method::NAME.into(),
367                        version: Method::VERSION,
368                        id: id as _,
369                    },
370                    v.into(),
371                ))
372            }
373            P2pRpcRequest::Block(hash) => {
374                type Method = rpc::GetTransitionChainV2;
375                type Payload = QueryPayload<<Method as RpcMethod>::Query>;
376
377                let mut v = vec![];
378                <Payload as BinProtWrite>::binprot_write(
379                    &NeedsLength(List::one(hash.0.clone())),
380                    &mut v,
381                )
382                .unwrap_or_default();
383                Some((
384                    QueryHeader {
385                        tag: Method::NAME.into(),
386                        version: Method::VERSION,
387                        id: id as _,
388                    },
389                    v.into(),
390                ))
391            }
392            P2pRpcRequest::Snark(hash) => {
393                let _ = hash;
394                // libp2p cannot fulfill this request
395                None
396            }
397            P2pRpcRequest::Transaction(hash) => {
398                let _ = hash;
399                // libp2p cannot fulfill this request
400                None
401            }
402            P2pRpcRequest::InitialPeers => {
403                type Method = rpc::GetSomeInitialPeersV1ForV2;
404                type Payload = QueryPayload<<Method as RpcMethod>::Query>;
405
406                let mut v = vec![];
407                <Payload as BinProtWrite>::binprot_write(&NeedsLength(()), &mut v)
408                    .unwrap_or_default();
409                Some((
410                    QueryHeader {
411                        tag: Method::NAME.into(),
412                        version: Method::VERSION,
413                        id: id as _,
414                    },
415                    v.into(),
416                ))
417            }
418        }
419    }
420}