1mod p2p_channels_rpc_state;
2pub use p2p_channels_rpc_state::*;
3
4mod p2p_channels_rpc_actions;
5pub use p2p_channels_rpc_actions::*;
6
7mod p2p_channels_rpc_reducer;
8
9use std::{sync::Arc, time::Duration};
10
11use binprot_derive::{BinProtRead, BinProtWrite};
12use mina_p2p_messages::{
13 list::List,
14 rpc_kernel::QueryID,
15 v2::{
16 LedgerHash, MerkleAddressBinableArgStableV1, MinaBasePendingCoinbaseStableV2,
17 MinaBaseStateBodyHashStableV1, MinaLedgerSyncLedgerAnswerStableV2,
18 MinaLedgerSyncLedgerQueryStableV1, MinaStateProtocolStateValueStableV2, StateHash,
19 TransactionSnarkScanStateStableV2,
20 },
21};
22use openmina_core::{
23 block::ArcBlock,
24 snark::{Snark, SnarkJobId},
25 transaction::{Transaction, TransactionHash},
26};
27use serde::{Deserialize, Serialize};
28
29use crate::{connection::outgoing::P2pConnectionOutgoingInitOpts, P2pTimeouts};
30
31pub type P2pRpcId = QueryID;
32
33#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, Clone)]
34pub enum RpcChannelMsg {
35 Request(P2pRpcId, P2pRpcRequest),
36 Response(P2pRpcId, Option<P2pRpcResponse>),
37}
38
39impl RpcChannelMsg {
40 pub fn request_id(&self) -> P2pRpcId {
41 match self {
42 Self::Request(id, _) => *id,
43 Self::Response(id, _) => *id,
44 }
45 }
46}
47
48#[derive(Serialize, Deserialize, Debug, Ord, PartialOrd, Eq, PartialEq, Clone)]
49pub enum P2pRpcKind {
50 BestTipWithProof,
51 LedgerQuery,
52 StagedLedgerAuxAndPendingCoinbasesAtBlock,
53 Block,
54 Snark,
55 Transaction,
56 InitialPeers,
57}
58
59impl P2pRpcKind {
60 pub fn timeout(self, config: &P2pTimeouts) -> Option<Duration> {
61 match self {
62 Self::BestTipWithProof => config.best_tip_with_proof,
63 Self::LedgerQuery => config.ledger_query,
64 Self::StagedLedgerAuxAndPendingCoinbasesAtBlock => {
65 config.staged_ledger_aux_and_pending_coinbases_at_block
66 }
67 Self::Block => config.block,
68 Self::Snark => config.snark,
69 Self::Transaction => config.transaction,
70 Self::InitialPeers => config.initial_peers,
71 }
72 }
73
74 pub fn supported_by_libp2p(self) -> bool {
75 match self {
76 Self::BestTipWithProof => true,
77 Self::LedgerQuery => true,
78 Self::StagedLedgerAuxAndPendingCoinbasesAtBlock => true,
79 Self::Block => true,
80 Self::Snark => false,
81 Self::Transaction => false,
82 Self::InitialPeers => true,
83 }
84 }
85}
86
87#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, PartialEq, Clone)]
88pub enum P2pRpcRequest {
89 BestTipWithProof,
90 LedgerQuery(LedgerHash, MinaLedgerSyncLedgerQueryStableV1),
91 StagedLedgerAuxAndPendingCoinbasesAtBlock(StateHash),
92 Block(StateHash),
93 Snark(SnarkJobId),
94 Transaction(TransactionHash),
95 InitialPeers,
96}
97
98impl P2pRpcRequest {
99 pub fn kind(&self) -> P2pRpcKind {
100 match self {
101 Self::BestTipWithProof => P2pRpcKind::BestTipWithProof,
102 Self::LedgerQuery(..) => P2pRpcKind::LedgerQuery,
103 Self::StagedLedgerAuxAndPendingCoinbasesAtBlock(_) => {
104 P2pRpcKind::StagedLedgerAuxAndPendingCoinbasesAtBlock
105 }
106 Self::Block(_) => P2pRpcKind::Block,
107 Self::Snark(_) => P2pRpcKind::Snark,
108 Self::Transaction(_) => P2pRpcKind::Transaction,
109 Self::InitialPeers => P2pRpcKind::InitialPeers,
110 }
111 }
112}
113
114impl Default for P2pRpcRequest {
115 fn default() -> Self {
116 Self::BestTipWithProof
117 }
118}
119
120fn addr_to_str(
121 MerkleAddressBinableArgStableV1(mina_p2p_messages::number::Number(length), byte_string): &MerkleAddressBinableArgStableV1,
122) -> String {
123 let addr = byte_string
124 .as_ref()
125 .iter()
126 .copied()
127 .flat_map(|byte| {
128 (0..8)
129 .map(move |b| byte & (1 << (7 - b)) != 0)
130 .map(|b| if b { '1' } else { '0' })
131 })
132 .take(*length as usize)
133 .collect::<String>();
134
135 format!("depth: {length}, addr: {addr}")
136}
137
138impl std::fmt::Display for P2pRpcRequest {
139 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140 write!(f, "{:?}", self.kind())?;
141 match self {
142 Self::BestTipWithProof => Ok(()),
143 Self::LedgerQuery(ledger_hash, query) => {
144 match query {
145 MinaLedgerSyncLedgerQueryStableV1::NumAccounts => write!(f, ", NumAccounts, ")?,
146 MinaLedgerSyncLedgerQueryStableV1::WhatChildHashes(addr) => {
147 write!(f, ", ChildHashes, {}, ", addr_to_str(addr))?
148 }
149 MinaLedgerSyncLedgerQueryStableV1::WhatContents(addr) => {
150 write!(f, ", ChildContents, {}, ", addr_to_str(addr))?
151 }
152 }
153 write!(f, "ledger: {ledger_hash}")
154 }
155 Self::StagedLedgerAuxAndPendingCoinbasesAtBlock(block_hash)
156 | Self::Block(block_hash) => {
157 write!(f, ", {block_hash}")
158 }
159 Self::Snark(job_id) => {
160 write!(f, ", {job_id}")
161 }
162 Self::Transaction(hash) => {
163 write!(f, ", {hash}")
164 }
165 Self::InitialPeers => Ok(()),
166 }
167 }
168}
169
170#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, Clone)]
171pub struct BestTipWithProof {
172 pub best_tip: ArcBlock,
173 pub proof: (List<MinaBaseStateBodyHashStableV1>, ArcBlock),
174}
175
176#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, Clone)]
178pub struct StagedLedgerAuxAndPendingCoinbases {
179 pub scan_state: TransactionSnarkScanStateStableV2,
180 pub staged_ledger_hash: LedgerHash,
181 pub pending_coinbase: MinaBasePendingCoinbaseStableV2,
182 pub needed_blocks: List<MinaStateProtocolStateValueStableV2>,
183}
184
185#[derive(BinProtWrite, BinProtRead, Serialize, Deserialize, Debug, Clone)]
186pub enum P2pRpcResponse {
187 BestTipWithProof(BestTipWithProof),
188 LedgerQuery(MinaLedgerSyncLedgerAnswerStableV2),
189 StagedLedgerAuxAndPendingCoinbasesAtBlock(Arc<StagedLedgerAuxAndPendingCoinbases>),
190 Block(ArcBlock),
191 Snark(Snark),
192 Transaction(Transaction),
193 InitialPeers(List<P2pConnectionOutgoingInitOpts>),
194}
195
196impl P2pRpcResponse {
197 pub fn kind(&self) -> P2pRpcKind {
198 match self {
199 Self::BestTipWithProof(_) => P2pRpcKind::BestTipWithProof,
200 Self::LedgerQuery(_) => P2pRpcKind::LedgerQuery,
201 Self::StagedLedgerAuxAndPendingCoinbasesAtBlock(_) => {
202 P2pRpcKind::StagedLedgerAuxAndPendingCoinbasesAtBlock
203 }
204 Self::Block(_) => P2pRpcKind::Block,
205 Self::Snark(_) => P2pRpcKind::Snark,
206 Self::Transaction(_) => P2pRpcKind::Transaction,
207 Self::InitialPeers(_) => P2pRpcKind::InitialPeers,
208 }
209 }
210}
211
212#[cfg(feature = "p2p-libp2p")]
213mod libp2p {
214 use super::*;
215 use crate::Data;
216 use mina_p2p_messages::{
217 rpc,
218 rpc_kernel::{
219 NeedsLength, QueryHeader, QueryPayload, ResponseHeader, ResponsePayload, RpcMethod,
220 RpcResult,
221 },
222 };
223
224 pub fn internal_response_into_libp2p(
225 response: P2pRpcResponse,
226 id: P2pRpcId,
227 ) -> Option<(ResponseHeader, Data)> {
228 use binprot::BinProtWrite;
229
230 match response {
231 P2pRpcResponse::BestTipWithProof(r) => {
232 type Method = rpc::GetBestTipV2;
233 type Payload = ResponsePayload<<Method as RpcMethod>::Response>;
234
235 let BestTipWithProof {
236 best_tip,
237 proof: (middle, block),
238 } = r;
239
240 let r = RpcResult(Ok(NeedsLength(Some(rpc::ProofCarryingDataStableV1 {
241 data: best_tip.as_ref().clone(),
242 proof: (middle, block.as_ref().clone()),
243 }))));
244
245 let mut v = vec![];
246 <Payload as BinProtWrite>::binprot_write(&r, &mut v).unwrap_or_default();
247 Some((ResponseHeader { id: id as _ }, v.into()))
248 }
249 P2pRpcResponse::LedgerQuery(answer) => {
250 type Method = rpc::AnswerSyncLedgerQueryV2;
251 type Payload = ResponsePayload<<Method as RpcMethod>::Response>;
252
253 let r = RpcResult(Ok(NeedsLength(RpcResult(Ok(answer)))));
254
255 let mut v = vec![];
256 <Payload as BinProtWrite>::binprot_write(&r, &mut v).unwrap_or_default();
257 Some((ResponseHeader { id: id as _ }, v.into()))
258 }
259 P2pRpcResponse::StagedLedgerAuxAndPendingCoinbasesAtBlock(staged_ledger_info) => {
260 type Method = rpc::GetStagedLedgerAuxAndPendingCoinbasesAtHashV2;
261 type Payload = ResponsePayload<<Method as RpcMethod>::Response>;
262
263 let StagedLedgerAuxAndPendingCoinbases {
264 scan_state,
265 staged_ledger_hash,
266 pending_coinbase,
267 needed_blocks,
268 } = staged_ledger_info.as_ref().clone();
269
270 let hash = staged_ledger_hash.inner().0.clone();
271
272 let r = RpcResult(Ok(NeedsLength(Some((
273 scan_state,
274 hash,
275 pending_coinbase,
276 needed_blocks,
277 )))));
278
279 let mut v = vec![];
280 <Payload as BinProtWrite>::binprot_write(&r, &mut v).unwrap_or_default();
281 Some((ResponseHeader { id: id as _ }, v.into()))
282 }
283 P2pRpcResponse::Block(block) => {
284 type Method = rpc::GetTransitionChainV2;
285 type Payload = ResponsePayload<<Method as RpcMethod>::Response>;
286
287 let r = RpcResult(Ok(NeedsLength(Some(List::one(block.as_ref().clone())))));
288
289 let mut v = vec![];
290 <Payload as BinProtWrite>::binprot_write(&r, &mut v).unwrap_or_default();
291 Some((ResponseHeader { id: id as _ }, v.into()))
292 }
293 P2pRpcResponse::Snark(_) => {
294 None
296 }
297 P2pRpcResponse::Transaction(_) => {
298 None
300 }
301 P2pRpcResponse::InitialPeers(peers) => {
302 type Method = rpc::GetSomeInitialPeersV1ForV2;
303 type Payload = ResponsePayload<<Method as RpcMethod>::Response>;
304
305 let r = peers
306 .into_iter()
307 .filter_map(|peer| peer.try_into_mina_rpc())
308 .collect();
309 let r = RpcResult(Ok(NeedsLength(r)));
310
311 let mut v = vec![];
312 <Payload as BinProtWrite>::binprot_write(&r, &mut v).unwrap_or_default();
313 Some((ResponseHeader { id: id as _ }, v.into()))
314 }
315 }
316 }
317
318 pub fn internal_request_into_libp2p(
319 request: P2pRpcRequest,
320 id: P2pRpcId,
321 ) -> Option<(QueryHeader, Data)> {
322 use binprot::BinProtWrite;
323
324 match request {
325 P2pRpcRequest::BestTipWithProof => {
326 type Method = rpc::GetBestTipV2;
327 type Payload = QueryPayload<<Method as RpcMethod>::Query>;
328
329 let mut v = vec![];
330 <Payload as BinProtWrite>::binprot_write(&NeedsLength(()), &mut v)
331 .unwrap_or_default();
332 Some((
333 QueryHeader {
334 tag: Method::NAME.into(),
335 version: Method::VERSION,
336 id: id as _,
337 },
338 v.into(),
339 ))
340 }
341 P2pRpcRequest::LedgerQuery(hash, q) => {
342 type Method = rpc::AnswerSyncLedgerQueryV2;
343 type Payload = QueryPayload<<Method as RpcMethod>::Query>;
344
345 let mut v = vec![];
346 <Payload as BinProtWrite>::binprot_write(&NeedsLength((hash.0.clone(), q)), &mut v)
347 .unwrap_or_default();
348 Some((
349 QueryHeader {
350 tag: Method::NAME.into(),
351 version: Method::VERSION,
352 id: id as _,
353 },
354 v.into(),
355 ))
356 }
357 P2pRpcRequest::StagedLedgerAuxAndPendingCoinbasesAtBlock(hash) => {
358 type Method = rpc::GetStagedLedgerAuxAndPendingCoinbasesAtHashV2;
359 type Payload = QueryPayload<<Method as RpcMethod>::Query>;
360
361 let mut v = vec![];
362 <Payload as BinProtWrite>::binprot_write(&NeedsLength(hash.0.clone()), &mut v)
363 .unwrap_or_default();
364 Some((
365 QueryHeader {
366 tag: Method::NAME.into(),
367 version: Method::VERSION,
368 id: id as _,
369 },
370 v.into(),
371 ))
372 }
373 P2pRpcRequest::Block(hash) => {
374 type Method = rpc::GetTransitionChainV2;
375 type Payload = QueryPayload<<Method as RpcMethod>::Query>;
376
377 let mut v = vec![];
378 <Payload as BinProtWrite>::binprot_write(
379 &NeedsLength(List::one(hash.0.clone())),
380 &mut v,
381 )
382 .unwrap_or_default();
383 Some((
384 QueryHeader {
385 tag: Method::NAME.into(),
386 version: Method::VERSION,
387 id: id as _,
388 },
389 v.into(),
390 ))
391 }
392 P2pRpcRequest::Snark(hash) => {
393 let _ = hash;
394 None
396 }
397 P2pRpcRequest::Transaction(hash) => {
398 let _ = hash;
399 None
401 }
402 P2pRpcRequest::InitialPeers => {
403 type Method = rpc::GetSomeInitialPeersV1ForV2;
404 type Payload = QueryPayload<<Method as RpcMethod>::Query>;
405
406 let mut v = vec![];
407 <Payload as BinProtWrite>::binprot_write(&NeedsLength(()), &mut v)
408 .unwrap_or_default();
409 Some((
410 QueryHeader {
411 tag: Method::NAME.into(),
412 version: Method::VERSION,
413 id: id as _,
414 },
415 v.into(),
416 ))
417 }
418 }
419 }
420}