1use std::sync::Arc;
2
3use binprot::BinProtRead;
4use mina_p2p_messages::{
5 rpc,
6 rpc_kernel::{
7 MessageHeader, PayloadBinprotReader as _, QueryHeader, ResponseHeader, RpcMethod,
8 RpcQueryReadError, RpcResponseReadError,
9 },
10 v2,
11 versioned::Ver,
12};
13use openmina_core::{bug_condition, error, fuzz_maybe, fuzzed_maybe, Substate};
14use redux::Dispatcher;
15
16use crate::{
17 channels::rpc::{
18 BestTipWithProof, P2pChannelsRpcAction, P2pRpcRequest, P2pRpcResponse,
19 StagedLedgerAuxAndPendingCoinbases,
20 },
21 connection::outgoing::P2pConnectionOutgoingInitOpts,
22 disconnection::{P2pDisconnectionAction, P2pDisconnectionReason},
23 Data, Limit, P2pLimits, P2pNetworkState, P2pNetworkYamuxAction, PeerId,
24};
25
26use self::p2p_network_rpc_state::P2pNetworkRpcError;
27
28use super::*;
29
30impl P2pNetworkRpcState {
31 pub fn reducer<State, Action>(
33 mut state_context: Substate<Action, State, P2pNetworkState>,
34 action: redux::ActionWithMeta<P2pNetworkRpcAction>,
35 limits: &P2pLimits,
36 ) -> Result<(), String>
37 where
38 State: crate::P2pStateTrait,
39 Action: crate::P2pActionTrait<State>,
40 {
41 let (action, meta) = action.split();
42 let rpc_state = state_context
43 .get_substate_mut()?
44 .find_rpc_state_mut(&action)
45 .ok_or_else(|| format!("RPC state not found for action: {:?}", action))
46 .inspect_err(|e| bug_condition!("{}", e))?;
47
48 match action {
49 P2pNetworkRpcAction::Init {
50 incoming,
51 addr,
52 peer_id,
53 stream_id,
54 } => {
55 rpc_state.is_incoming = incoming;
56
57 let dispatcher = state_context.into_dispatcher();
58 dispatcher.push(P2pNetworkRpcAction::OutgoingData {
59 addr,
60 peer_id,
61 stream_id,
62 data: Data::from(RpcMessage::Handshake.into_bytes()),
63 fin: false,
64 });
65 Ok(())
66 }
67 P2pNetworkRpcAction::IncomingData {
68 data,
69 addr,
70 peer_id,
71 stream_id,
72 } => {
73 rpc_state.buffer.extend_from_slice(&data);
74 let mut offset = 0;
75 loop {
77 let Some(buf) = &rpc_state.buffer.get(offset..) else {
78 bug_condition!("Invalid range `buffer[{offset}..]`");
79 return Ok(());
80 };
81 if let Some(len_bytes) = buf.get(..8).and_then(|s| s.try_into().ok()) {
82 let len = u64::from_le_bytes(len_bytes) as usize;
83 if let Err(err) = rpc_state.check_rpc_limit(len, limits) {
84 rpc_state.error = Some(err);
85 break;
86 }
87 if let Some(mut slice) = buf.get(8..(8 + len)) {
88 offset += 8 + len;
89 let msg = match MessageHeader::binprot_read(&mut slice) {
90 Ok(MessageHeader::Heartbeat) => RpcMessage::Heartbeat,
91 Ok(MessageHeader::Response(h))
92 if h.id == u64::from_le_bytes(*b"RPC\x00\x00\x00\x00\x00") =>
93 {
94 RpcMessage::Handshake
95 }
96 Ok(MessageHeader::Query(header)) => RpcMessage::Query {
97 header,
98 bytes: slice.to_vec().into(),
99 },
100 Ok(MessageHeader::Response(header)) => RpcMessage::Response {
101 header,
102 bytes: slice.to_vec().into(),
103 },
104 Err(err) => {
105 rpc_state.error =
106 Some(P2pNetworkRpcError::Binprot(err.to_string()));
107 continue;
108 }
109 };
110 rpc_state.incoming.push_back(msg);
111 continue;
112 }
113 }
114
115 if offset != 0 {
116 let Some(buf) = rpc_state.buffer.get(offset..) else {
117 bug_condition!("Invalid range `buffer[{offset}..]`");
118 return Ok(());
119 };
120 rpc_state.buffer = buf.to_vec();
121 }
122 break;
123 }
124
125 let incoming = rpc_state.incoming.front().cloned();
126 let dispatcher = state_context.into_dispatcher();
127
128 if let Some(message) = incoming {
129 dispatcher.push(P2pNetworkRpcAction::IncomingMessage {
130 addr,
131 peer_id,
132 stream_id,
133 message,
134 })
135 }
136
137 Ok(())
138 }
139 ref action @ P2pNetworkRpcAction::IncomingMessage {
140 ref message,
141 addr,
142 peer_id,
143 stream_id,
144 } => {
145 if let RpcMessage::Response { header, .. } = &message {
146 if let Some(QueryHeader { id, tag, version }) = &rpc_state.pending {
147 *rpc_state
148 .total_stats
149 .entry((tag.clone(), *version))
150 .or_default() += 1;
151 if id != &header.id {
152 error!(meta.time(); "receiving response with wrong id: {}", header.id);
153 }
154 } else {
155 error!(meta.time(); "receiving response without query");
156 }
157 } else if let RpcMessage::Query { header, .. } = &message {
158 if rpc_state.pending.is_none() {
159 rpc_state.pending = Some(header.clone());
160 } else {
161 error!(meta.time(); "receiving query while another query is pending");
162 }
163 }
164
165 rpc_state.incoming.pop_front();
166
167 let (dispatcher, state) = state_context.into_dispatcher_and_state();
168 let network_state: &P2pNetworkState = state.substate()?;
169 let state = network_state
170 .find_rpc_state(action)
171 .ok_or_else(|| format!("RPC state not found for action: {:?}", action))?;
172
173 match &message {
174 RpcMessage::Handshake => {
175 if !state.is_incoming {
176 dispatcher.push(P2pChannelsRpcAction::Ready { peer_id });
177 }
178 }
179 RpcMessage::Heartbeat => {}
180 RpcMessage::Query { header, bytes } => {
181 if let Err(e) = dispatch_rpc_query(peer_id, header, bytes, dispatcher) {
182 dispatcher.push(P2pDisconnectionAction::Init {
183 peer_id,
184 reason: P2pDisconnectionReason::P2pChannelReceiveFailed(
185 e.to_string(),
186 ),
187 });
188 }
189 }
190 RpcMessage::Response {
191 header: ResponseHeader { id },
192 bytes,
193 } => {
194 let query_header = match state.pending.as_ref() {
195 Some(header) if &header.id == id => header.clone(),
196 Some(header) => {
197 error!(meta.time(); "received response with it {} while expecting {id}", header.id);
198 return Ok(());
199 }
200 None => {
201 error!(meta.time(); "received response while no query is sent");
202 return Ok(());
203 }
204 };
205 dispatcher.push(P2pNetworkRpcAction::PrunePending { peer_id, stream_id });
207
208 if let Err(e) =
209 dispatch_rpc_response(peer_id, &query_header, bytes, dispatcher)
210 {
211 dispatcher.push(P2pDisconnectionAction::Init {
212 peer_id,
213 reason: P2pDisconnectionReason::P2pChannelReceiveFailed(
214 e.to_string(),
215 ),
216 });
217 }
218 }
219 }
220
221 if let Some(message) = state.incoming.front().cloned() {
222 dispatcher.push(P2pNetworkRpcAction::IncomingMessage {
223 addr,
224 peer_id,
225 stream_id,
226 message,
227 });
228 }
229 Ok(())
230 }
231 P2pNetworkRpcAction::PrunePending { .. } => {
232 rpc_state.pending = None;
233 Ok(())
234 }
235 P2pNetworkRpcAction::HeartbeatSend {
236 addr,
237 peer_id,
238 stream_id,
239 } => {
240 rpc_state.last_heartbeat_sent = Some(meta.time());
241
242 let dispatcher = state_context.into_dispatcher();
243
244 dispatcher.push(P2pNetworkRpcAction::OutgoingData {
245 addr,
246 peer_id,
247 stream_id,
248 data: Data::from(RpcMessage::Heartbeat.into_bytes()),
249 fin: false,
250 });
251
252 Ok(())
253 }
254 P2pNetworkRpcAction::OutgoingQuery {
255 query,
256 data,
257 peer_id,
258 } => {
259 rpc_state.last_id = query.id;
260 rpc_state.pending = Some(query.clone());
261
262 let addr = rpc_state.addr;
263 let stream_id = rpc_state.stream_id;
264 let dispatcher = state_context.into_dispatcher();
265 dispatcher.push(P2pNetworkRpcAction::OutgoingData {
266 addr,
267 peer_id,
268 stream_id,
269 data: Data::from(
270 RpcMessage::Query {
271 header: query,
272 bytes: data,
273 }
274 .into_bytes(),
275 ),
276 fin: false,
277 });
278
279 Ok(())
280 }
281 P2pNetworkRpcAction::OutgoingData {
282 addr,
283 stream_id,
284 mut data,
285 ..
286 } => {
287 let dispatcher = state_context.into_dispatcher();
288 fuzz_maybe!(&mut data, crate::fuzzer::mutate_rpc_data);
289 let flags = fuzzed_maybe!(Default::default(), crate::fuzzer::mutate_yamux_flags);
290
291 dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
292 addr,
293 stream_id,
294 data,
295 flags,
296 });
297
298 Ok(())
299 }
300 P2pNetworkRpcAction::OutgoingResponse {
301 peer_id,
302 response,
303 data,
304 } => {
305 if !matches!(rpc_state.pending, Some(QueryHeader { id, .. }) if id == response.id) {
306 bug_condition!("pending query does not match the response");
307 return Ok(());
308 }
309 let stream_id = rpc_state.stream_id;
310 let addr = rpc_state.addr;
311 let dispatcher = state_context.into_dispatcher();
312
313 dispatcher.push(P2pNetworkRpcAction::PrunePending { peer_id, stream_id });
314 dispatcher.push(P2pNetworkRpcAction::OutgoingData {
315 addr,
316 peer_id,
317 stream_id,
318 data: Data::from(
319 RpcMessage::Response {
320 header: response,
321 bytes: data,
322 }
323 .into_bytes(),
324 ),
325 fin: false,
326 });
327 Ok(())
328 }
329 }
330 }
331
332 fn check_rpc_limit(&self, len: usize, limits: &P2pLimits) -> Result<(), P2pNetworkRpcError> {
333 let (limit, kind): (_, &[u8]) = if self.is_incoming {
334 (limits.rpc_query(), b"<query>")
336 } else if let Some(QueryHeader { tag, .. }) = self.pending.as_ref() {
337 use mina_p2p_messages::rpc::*;
338 match tag.as_ref() {
339 GetBestTipV2::NAME => (limits.rpc_get_best_tip(), GetBestTipV2::NAME),
340 AnswerSyncLedgerQueryV2::NAME => (
341 limits.rpc_answer_sync_ledger_query(),
342 AnswerSyncLedgerQueryV2::NAME,
343 ),
344 GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::NAME => (
345 limits.rpc_get_staged_ledger(),
346 GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::NAME,
347 ),
348 GetTransitionChainV2::NAME => (
349 limits.rpc_get_transition_chain(),
350 GetTransitionChainV2::NAME,
351 ),
352 GetSomeInitialPeersV1ForV2::NAME => (
353 limits.rpc_get_some_initial_peers(),
354 GetSomeInitialPeersV1ForV2::NAME,
355 ),
356 _ => (Limit::Some(0), b"<unimplemented>"),
357 }
358 } else {
359 (limits.rpc_service_message(), b"<service_messages>")
360 };
361 let kind = String::from_utf8_lossy(kind);
362 if len > limit {
363 Err(P2pNetworkRpcError::Limit(kind.into_owned(), len, limit))
364 } else {
365 Ok(())
366 }
367 }
368}
369
370fn dispatch_rpc_query<'a, State, Action>(
371 peer_id: PeerId,
372 QueryHeader { tag, version, id }: &'a QueryHeader,
373 mut bytes: &[u8],
374 dispatcher: &mut Dispatcher<Action, State>,
375) -> Result<(), RpcQueryError<'a>>
376where
377 State: crate::P2pStateTrait,
378 Action: crate::P2pActionTrait<State>,
379{
380 let id = *id;
381 match (tag.as_ref(), *version) {
382 (rpc::GetBestTipV2::NAME, rpc::GetBestTipV2::VERSION) => {
383 rpc::GetBestTipV2::query_payload(&mut bytes)?;
384 dispatcher.push(P2pChannelsRpcAction::RequestReceived {
385 peer_id,
386 id,
387 request: Box::new(P2pRpcRequest::BestTipWithProof),
388 });
389 }
390 (rpc::AnswerSyncLedgerQueryV2::NAME, rpc::AnswerSyncLedgerQueryV2::VERSION) => {
391 let (hash, query) = rpc::AnswerSyncLedgerQueryV2::query_payload(&mut bytes)?;
392 let hash = v2::LedgerHash::from(v2::MinaBaseLedgerHash0StableV1(hash));
393
394 dispatcher.push(P2pChannelsRpcAction::RequestReceived {
395 peer_id,
396 id,
397 request: Box::new(P2pRpcRequest::LedgerQuery(hash, query)),
398 });
399 }
400 (
401 rpc::GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::NAME,
402 rpc::GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::VERSION,
403 ) => {
404 let hash =
405 rpc::GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::query_payload(&mut bytes)?;
406 let hash = v2::StateHash::from(v2::DataHashLibStateHashStableV1(hash));
407 let request = Box::new(P2pRpcRequest::StagedLedgerAuxAndPendingCoinbasesAtBlock(
408 hash,
409 ));
410
411 dispatcher.push(P2pChannelsRpcAction::RequestReceived {
412 peer_id,
413 id,
414 request,
415 });
416 }
417 (rpc::GetTransitionChainV2::NAME, rpc::GetTransitionChainV2::VERSION) => {
418 let hashes = rpc::GetTransitionChainV2::query_payload(&mut bytes)?;
419 for hash in hashes {
420 let hash = v2::StateHash::from(v2::DataHashLibStateHashStableV1(hash));
421
422 dispatcher.push(P2pChannelsRpcAction::RequestReceived {
423 peer_id,
424 id,
425 request: Box::new(P2pRpcRequest::Block(hash)),
426 });
427 }
428 }
429 (rpc::GetSomeInitialPeersV1ForV2::NAME, rpc::GetSomeInitialPeersV1ForV2::VERSION) => {
430 let () = rpc::GetSomeInitialPeersV1ForV2::query_payload(&mut bytes)?;
431 dispatcher.push(P2pChannelsRpcAction::RequestReceived {
432 peer_id,
433 id,
434 request: Box::new(P2pRpcRequest::InitialPeers),
435 });
436 }
437 (name, version) => return Err(RpcQueryError::Unimplemented(name, version)),
438 }
439 Ok(())
440}
441
442fn dispatch_rpc_response<State, Action>(
443 peer_id: PeerId,
444 QueryHeader { tag, version, id }: &QueryHeader,
445 mut bytes: &[u8],
446 dispatcher: &mut Dispatcher<Action, State>,
447) -> Result<(), RpcResponseError>
448where
449 State: crate::P2pStateTrait,
450 Action: crate::P2pActionTrait<State>,
451{
452 let id = *id;
453 match (tag.as_ref(), *version) {
454 (rpc::GetBestTipV2::NAME, rpc::GetBestTipV2::VERSION) => {
455 let response = rpc::GetBestTipV2::response_payload(&mut bytes)?
456 .map(|resp| BestTipWithProof {
457 best_tip: resp.data.into(),
458 proof: (resp.proof.0, resp.proof.1.into()),
459 })
460 .map(P2pRpcResponse::BestTipWithProof)
461 .map(Box::new);
462
463 dispatcher.push(P2pChannelsRpcAction::ResponseReceived {
464 peer_id,
465 id,
466 response,
467 });
468 }
469 (rpc::AnswerSyncLedgerQueryV2::NAME, rpc::AnswerSyncLedgerQueryV2::VERSION) => {
470 let response = Result::from(rpc::AnswerSyncLedgerQueryV2::response_payload(
471 &mut bytes,
472 )?)
473 .map_err(|e| RpcResponseError::Other {
474 rpc_id: rpc::AnswerSyncLedgerQueryV2::rpc_id(),
475 error: e.to_string(),
476 })?;
477 let response = Some(Box::new(P2pRpcResponse::LedgerQuery(response)));
478
479 dispatcher.push(P2pChannelsRpcAction::ResponseReceived {
480 peer_id,
481 id,
482 response,
483 });
484 }
485 (
486 rpc::GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::NAME,
487 rpc::GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::VERSION,
488 ) => {
489 let response =
490 rpc::GetStagedLedgerAuxAndPendingCoinbasesAtHashV2::response_payload(&mut bytes)?;
491 let response = response
492 .map(|(scan_state, hash, pending_coinbase, needed_blocks)| {
493 let staged_ledger_hash = v2::MinaBaseLedgerHash0StableV1(hash).into();
494 Arc::new(StagedLedgerAuxAndPendingCoinbases {
495 scan_state,
496 staged_ledger_hash,
497 pending_coinbase,
498 needed_blocks,
499 })
500 })
501 .map(P2pRpcResponse::StagedLedgerAuxAndPendingCoinbasesAtBlock)
502 .map(Box::new);
503
504 dispatcher.push(P2pChannelsRpcAction::ResponseReceived {
505 peer_id,
506 id,
507 response,
508 });
509 }
510 (rpc::GetTransitionChainV2::NAME, rpc::GetTransitionChainV2::VERSION) => {
511 let response = rpc::GetTransitionChainV2::response_payload(&mut bytes)?;
512 match response {
513 Some(response) if !response.is_empty() => {
514 for block in response {
515 let response = Some(Box::new(P2pRpcResponse::Block(Arc::new(block))));
516 dispatcher.push(P2pChannelsRpcAction::ResponseReceived {
517 peer_id,
518 id,
519 response,
520 });
521 }
522 }
523 _ => {
524 dispatcher.push(P2pChannelsRpcAction::ResponseReceived {
525 peer_id,
526 id,
527 response: None,
528 });
529 }
530 }
531 }
532 (rpc::GetSomeInitialPeersV1ForV2::NAME, rpc::GetSomeInitialPeersV1ForV2::VERSION) => {
533 let response = rpc::GetSomeInitialPeersV1ForV2::response_payload(&mut bytes)?;
534 if response.is_empty() {
535 dispatcher.push(P2pChannelsRpcAction::ResponseReceived {
536 peer_id,
537 id,
538 response: None,
539 });
540 } else {
541 let peers = response
542 .into_iter()
543 .filter_map(P2pConnectionOutgoingInitOpts::try_from_mina_rpc)
544 .collect();
545 dispatcher.push(P2pChannelsRpcAction::ResponseReceived {
546 peer_id,
547 id,
548 response: Some(Box::new(P2pRpcResponse::InitialPeers(peers))),
549 });
550 }
551 }
552 _ => {}
553 }
554 Ok(())
555}
556
557#[derive(Debug, thiserror::Error)]
558enum RpcQueryError<'a> {
559 #[error(transparent)]
560 Read(#[from] RpcQueryReadError),
561 #[error("unimplemented rpc {}:{1}", String::from_utf8_lossy(.0))]
562 Unimplemented(&'a [u8], Ver),
563}
564
565#[derive(Debug, thiserror::Error)]
566enum RpcResponseError {
567 #[error(transparent)]
568 Read(#[from] RpcResponseReadError),
569 #[error("rpc response {rpc_id} error: {error}")]
570 Other { rpc_id: String, error: String },
571}