1mod p2p_channels_rpc_state;
2pub use p2p_channels_rpc_state::*;
3
4mod p2p_channels_rpc_actions;
5pub use p2p_channels_rpc_actions::*;
6
7mod p2p_channels_rpc_reducer;
8
9use std::{sync::Arc, time::Duration};
10
11use binprot_derive::{BinProtRead, BinProtWrite};
12use mina_core::{
13 block::ArcBlock,
14 snark::{Snark, SnarkJobId},
15 transaction::{Transaction, TransactionHash},
16};
17use mina_p2p_messages::{
18 list::List,
19 rpc_kernel::QueryID,
20 v2::{
21 LedgerHash, MerkleAddressBinableArgStableV1, MinaBasePendingCoinbaseStableV2,
22 MinaBaseStateBodyHashStableV1, MinaLedgerSyncLedgerAnswerStableV2,
23 MinaLedgerSyncLedgerQueryStableV1, MinaStateProtocolStateValueStableV2, StateHash,
24 TransactionSnarkScanStateStableV2,
25 },
26};
27use serde::{Deserialize, Serialize};
28
29use crate::{connection::outgoing::P2pConnectionOutgoingInitOpts, P2pTimeouts};
30
31pub type P2pRpcId = QueryID;
32
33#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, Clone)]
34pub enum RpcChannelMsg {
35 Request(P2pRpcId, P2pRpcRequest),
36 Response(P2pRpcId, Option<P2pRpcResponse>),
37}
38
39impl RpcChannelMsg {
40 pub fn request_id(&self) -> P2pRpcId {
41 match self {
42 Self::Request(id, _) => *id,
43 Self::Response(id, _) => *id,
44 }
45 }
46}
47
48#[derive(Serialize, Deserialize, Debug, Ord, PartialOrd, Eq, PartialEq, Clone)]
49pub enum P2pRpcKind {
50 BestTipWithProof,
51 LedgerQuery,
52 StagedLedgerAuxAndPendingCoinbasesAtBlock,
53 Block,
54 Snark,
55 Transaction,
56 InitialPeers,
57}
58
59impl P2pRpcKind {
60 pub fn timeout(self, config: &P2pTimeouts) -> Option<Duration> {
61 match self {
62 Self::BestTipWithProof => config.best_tip_with_proof,
63 Self::LedgerQuery => config.ledger_query,
64 Self::StagedLedgerAuxAndPendingCoinbasesAtBlock => {
65 config.staged_ledger_aux_and_pending_coinbases_at_block
66 }
67 Self::Block => config.block,
68 Self::Snark => config.snark,
69 Self::Transaction => config.transaction,
70 Self::InitialPeers => config.initial_peers,
71 }
72 }
73
74 pub fn supported_by_libp2p(self) -> bool {
75 match self {
76 Self::BestTipWithProof => true,
77 Self::LedgerQuery => true,
78 Self::StagedLedgerAuxAndPendingCoinbasesAtBlock => true,
79 Self::Block => true,
80 Self::Snark => false,
81 Self::Transaction => false,
82 Self::InitialPeers => true,
83 }
84 }
85}
86
87#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
88pub enum P2pRpcRequest {
89 #[default]
90 BestTipWithProof,
91 LedgerQuery(LedgerHash, MinaLedgerSyncLedgerQueryStableV1),
92 StagedLedgerAuxAndPendingCoinbasesAtBlock(StateHash),
93 Block(StateHash),
94 Snark(SnarkJobId),
95 Transaction(TransactionHash),
96 InitialPeers,
97}
98
99impl P2pRpcRequest {
100 pub fn kind(&self) -> P2pRpcKind {
101 match self {
102 Self::BestTipWithProof => P2pRpcKind::BestTipWithProof,
103 Self::LedgerQuery(..) => P2pRpcKind::LedgerQuery,
104 Self::StagedLedgerAuxAndPendingCoinbasesAtBlock(_) => {
105 P2pRpcKind::StagedLedgerAuxAndPendingCoinbasesAtBlock
106 }
107 Self::Block(_) => P2pRpcKind::Block,
108 Self::Snark(_) => P2pRpcKind::Snark,
109 Self::Transaction(_) => P2pRpcKind::Transaction,
110 Self::InitialPeers => P2pRpcKind::InitialPeers,
111 }
112 }
113}
114
115fn addr_to_str(
116 MerkleAddressBinableArgStableV1(mina_p2p_messages::number::Number(length), byte_string): &MerkleAddressBinableArgStableV1,
117) -> String {
118 let addr = byte_string
119 .as_ref()
120 .iter()
121 .copied()
122 .flat_map(|byte| {
123 (0..8)
124 .map(move |b| byte & (1 << (7 - b)) != 0)
125 .map(|b| if b { '1' } else { '0' })
126 })
127 .take(*length as usize)
128 .collect::<String>();
129
130 format!("depth: {length}, addr: {addr}")
131}
132
133impl std::fmt::Display for P2pRpcRequest {
134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135 write!(f, "{:?}", self.kind())?;
136 match self {
137 Self::BestTipWithProof => Ok(()),
138 Self::LedgerQuery(ledger_hash, query) => {
139 match query {
140 MinaLedgerSyncLedgerQueryStableV1::NumAccounts => write!(f, ", NumAccounts, ")?,
141 MinaLedgerSyncLedgerQueryStableV1::WhatChildHashes(addr) => {
142 write!(f, ", ChildHashes, {}, ", addr_to_str(addr))?
143 }
144 MinaLedgerSyncLedgerQueryStableV1::WhatContents(addr) => {
145 write!(f, ", ChildContents, {}, ", addr_to_str(addr))?
146 }
147 }
148 write!(f, "ledger: {ledger_hash}")
149 }
150 Self::StagedLedgerAuxAndPendingCoinbasesAtBlock(block_hash)
151 | Self::Block(block_hash) => {
152 write!(f, ", {block_hash}")
153 }
154 Self::Snark(job_id) => {
155 write!(f, ", {job_id}")
156 }
157 Self::Transaction(hash) => {
158 write!(f, ", {hash}")
159 }
160 Self::InitialPeers => Ok(()),
161 }
162 }
163}
164
165#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, Clone)]
166pub struct BestTipWithProof {
167 pub best_tip: ArcBlock,
168 pub proof: (List<MinaBaseStateBodyHashStableV1>, ArcBlock),
169}
170
171#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, Clone)]
173pub struct StagedLedgerAuxAndPendingCoinbases {
174 pub scan_state: TransactionSnarkScanStateStableV2,
175 pub staged_ledger_hash: LedgerHash,
176 pub pending_coinbase: MinaBasePendingCoinbaseStableV2,
177 pub needed_blocks: List<MinaStateProtocolStateValueStableV2>,
178}
179
180#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, Clone)]
181pub enum P2pRpcResponse {
182 BestTipWithProof(BestTipWithProof),
183 LedgerQuery(MinaLedgerSyncLedgerAnswerStableV2),
184 StagedLedgerAuxAndPendingCoinbasesAtBlock(Arc<StagedLedgerAuxAndPendingCoinbases>),
185 Block(ArcBlock),
186 Snark(Snark),
187 Transaction(Transaction),
188 InitialPeers(List<P2pConnectionOutgoingInitOpts>),
189}
190
191impl P2pRpcResponse {
192 pub fn kind(&self) -> P2pRpcKind {
193 match self {
194 Self::BestTipWithProof(_) => P2pRpcKind::BestTipWithProof,
195 Self::LedgerQuery(_) => P2pRpcKind::LedgerQuery,
196 Self::StagedLedgerAuxAndPendingCoinbasesAtBlock(_) => {
197 P2pRpcKind::StagedLedgerAuxAndPendingCoinbasesAtBlock
198 }
199 Self::Block(_) => P2pRpcKind::Block,
200 Self::Snark(_) => P2pRpcKind::Snark,
201 Self::Transaction(_) => P2pRpcKind::Transaction,
202 Self::InitialPeers(_) => P2pRpcKind::InitialPeers,
203 }
204 }
205}
206
207#[cfg(feature = "p2p-libp2p")]
208mod libp2p {
209 use super::*;
210 use crate::Data;
211 use mina_p2p_messages::{
212 rpc,
213 rpc_kernel::{
214 NeedsLength, QueryHeader, QueryPayload, ResponseHeader, ResponsePayload, RpcMethod,
215 RpcResult,
216 },
217 };
218
219 pub fn internal_response_into_libp2p(
220 response: P2pRpcResponse,
221 id: P2pRpcId,
222 ) -> Option<(ResponseHeader, Data)> {
223 use binprot::BinProtWrite;
224
225 match response {
226 P2pRpcResponse::BestTipWithProof(r) => {
227 type Method = rpc::GetBestTipV2;
228 type Payload = ResponsePayload<<Method as RpcMethod>::Response>;
229
230 let BestTipWithProof {
231 best_tip,
232 proof: (middle, block),
233 } = r;
234
235 let r = RpcResult(Ok(NeedsLength(Some(rpc::ProofCarryingDataStableV1 {
236 data: best_tip.as_ref().clone(),
237 proof: (middle, block.as_ref().clone()),
238 }))));
239
240 let mut v = vec![];
241 <Payload as BinProtWrite>::binprot_write(&r, &mut v).unwrap_or_default();
242 Some((ResponseHeader { id: id as _ }, v.into()))
243 }
244 P2pRpcResponse::LedgerQuery(answer) => {
245 type Method = rpc::AnswerSyncLedgerQueryV2;
246 type Payload = ResponsePayload<<Method as RpcMethod>::Response>;
247
248 let r = RpcResult(Ok(NeedsLength(RpcResult(Ok(answer)))));
249
250 let mut v = vec![];
251 <Payload as BinProtWrite>::binprot_write(&r, &mut v).unwrap_or_default();
252 Some((ResponseHeader { id: id as _ }, v.into()))
253 }
254 P2pRpcResponse::StagedLedgerAuxAndPendingCoinbasesAtBlock(staged_ledger_info) => {
255 type Method = rpc::GetStagedLedgerAuxAndPendingCoinbasesAtHashV2;
256 type Payload = ResponsePayload<<Method as RpcMethod>::Response>;
257
258 let StagedLedgerAuxAndPendingCoinbases {
259 scan_state,
260 staged_ledger_hash,
261 pending_coinbase,
262 needed_blocks,
263 } = staged_ledger_info.as_ref().clone();
264
265 let hash = staged_ledger_hash.inner().0.clone();
266
267 let r = RpcResult(Ok(NeedsLength(Some((
268 scan_state,
269 hash,
270 pending_coinbase,
271 needed_blocks,
272 )))));
273
274 let mut v = vec![];
275 <Payload as BinProtWrite>::binprot_write(&r, &mut v).unwrap_or_default();
276 Some((ResponseHeader { id: id as _ }, v.into()))
277 }
278 P2pRpcResponse::Block(block) => {
279 type Method = rpc::GetTransitionChainV2;
280 type Payload = ResponsePayload<<Method as RpcMethod>::Response>;
281
282 let r = RpcResult(Ok(NeedsLength(Some(List::one(block.as_ref().clone())))));
283
284 let mut v = vec![];
285 <Payload as BinProtWrite>::binprot_write(&r, &mut v).unwrap_or_default();
286 Some((ResponseHeader { id: id as _ }, v.into()))
287 }
288 P2pRpcResponse::Snark(_) => {
289 None
291 }
292 P2pRpcResponse::Transaction(_) => {
293 None
295 }
296 P2pRpcResponse::InitialPeers(peers) => {
297 type Method = rpc::GetSomeInitialPeersV1ForV2;
298 type Payload = ResponsePayload<<Method as RpcMethod>::Response>;
299
300 let r = peers
301 .into_iter()
302 .filter_map(|peer| peer.try_into_mina_rpc())
303 .collect();
304 let r = RpcResult(Ok(NeedsLength(r)));
305
306 let mut v = vec![];
307 <Payload as BinProtWrite>::binprot_write(&r, &mut v).unwrap_or_default();
308 Some((ResponseHeader { id: id as _ }, v.into()))
309 }
310 }
311 }
312
313 pub fn internal_request_into_libp2p(
314 request: P2pRpcRequest,
315 id: P2pRpcId,
316 ) -> Option<(QueryHeader, Data)> {
317 use binprot::BinProtWrite;
318
319 match request {
320 P2pRpcRequest::BestTipWithProof => {
321 type Method = rpc::GetBestTipV2;
322 type Payload = QueryPayload<<Method as RpcMethod>::Query>;
323
324 let mut v = vec![];
325 <Payload as BinProtWrite>::binprot_write(&NeedsLength(()), &mut v)
326 .unwrap_or_default();
327 Some((
328 QueryHeader {
329 tag: Method::NAME.into(),
330 version: Method::VERSION,
331 id: id as _,
332 },
333 v.into(),
334 ))
335 }
336 P2pRpcRequest::LedgerQuery(hash, q) => {
337 type Method = rpc::AnswerSyncLedgerQueryV2;
338 type Payload = QueryPayload<<Method as RpcMethod>::Query>;
339
340 let mut v = vec![];
341 <Payload as BinProtWrite>::binprot_write(&NeedsLength((hash.0.clone(), q)), &mut v)
342 .unwrap_or_default();
343 Some((
344 QueryHeader {
345 tag: Method::NAME.into(),
346 version: Method::VERSION,
347 id: id as _,
348 },
349 v.into(),
350 ))
351 }
352 P2pRpcRequest::StagedLedgerAuxAndPendingCoinbasesAtBlock(hash) => {
353 type Method = rpc::GetStagedLedgerAuxAndPendingCoinbasesAtHashV2;
354 type Payload = QueryPayload<<Method as RpcMethod>::Query>;
355
356 let mut v = vec![];
357 <Payload as BinProtWrite>::binprot_write(&NeedsLength(hash.0.clone()), &mut v)
358 .unwrap_or_default();
359 Some((
360 QueryHeader {
361 tag: Method::NAME.into(),
362 version: Method::VERSION,
363 id: id as _,
364 },
365 v.into(),
366 ))
367 }
368 P2pRpcRequest::Block(hash) => {
369 type Method = rpc::GetTransitionChainV2;
370 type Payload = QueryPayload<<Method as RpcMethod>::Query>;
371
372 let mut v = vec![];
373 <Payload as BinProtWrite>::binprot_write(
374 &NeedsLength(List::one(hash.0.clone())),
375 &mut v,
376 )
377 .unwrap_or_default();
378 Some((
379 QueryHeader {
380 tag: Method::NAME.into(),
381 version: Method::VERSION,
382 id: id as _,
383 },
384 v.into(),
385 ))
386 }
387 P2pRpcRequest::Snark(hash) => {
388 let _ = hash;
389 None
391 }
392 P2pRpcRequest::Transaction(hash) => {
393 let _ = hash;
394 None
396 }
397 P2pRpcRequest::InitialPeers => {
398 type Method = rpc::GetSomeInitialPeersV1ForV2;
399 type Payload = QueryPayload<<Method as RpcMethod>::Query>;
400
401 let mut v = vec![];
402 <Payload as BinProtWrite>::binprot_write(&NeedsLength(()), &mut v)
403 .unwrap_or_default();
404 Some((
405 QueryHeader {
406 tag: Method::NAME.into(),
407 version: Method::VERSION,
408 id: id as _,
409 },
410 v.into(),
411 ))
412 }
413 }
414 }
415}