1use openmina_core::{bug_condition, fuzzed_maybe, warn, Substate, SubstateAccess};
2use quick_protobuf::{serialize_into_vec, BytesReader};
3use redux::ActionWithMeta;
4
5use crate::{
6 stream::P2pNetworkKadOutgoingStreamError, Data, P2pLimits, P2pNetworkConnectionError,
7 P2pNetworkKadRequestAction, P2pNetworkKadState, P2pNetworkKademliaRpcReply,
8 P2pNetworkKademliaRpcRequest, P2pNetworkSchedulerAction, P2pNetworkStreamProtobufError,
9 P2pNetworkYamuxAction, P2pState, YamuxFlags,
10};
11
12use super::{
13 super::Message, P2pNetworkKadIncomingStreamError, P2pNetworkKadIncomingStreamState,
14 P2pNetworkKadOutgoingStreamState, P2pNetworkKadStreamState, P2pNetworkKademliaStreamAction,
15 P2pNetworkKademliaStreamWaitOutgoingCallback,
16};
17
18impl P2pNetworkKadIncomingStreamState {
19 pub fn reducer<State, Action>(
20 mut state_context: Substate<Action, State, P2pNetworkKadState>,
21 action: ActionWithMeta<P2pNetworkKademliaStreamAction>,
22 limits: &P2pLimits,
23 ) -> Result<(), String>
24 where
25 State: SubstateAccess<P2pNetworkKadState> + SubstateAccess<P2pState>,
26 Action: crate::P2pActionTrait<State>,
27 {
28 let (action, meta) = action.split();
29 let Some(P2pNetworkKadStreamState::Incoming(state)) = state_context
30 .get_substate_mut()?
31 .find_kad_stream_state_mut(action.peer_id(), action.stream_id())
32 else {
33 return Err("invalid stream".to_owned());
34 };
35
36 match (&state, action) {
37 (
38 P2pNetworkKadIncomingStreamState::Default,
39 P2pNetworkKademliaStreamAction::New { incoming, .. },
40 ) if incoming => {
41 *state = P2pNetworkKadIncomingStreamState::WaitingForRequest {
42 expect_close: false,
43 };
44 Ok(())
45 }
46 (
47 P2pNetworkKadIncomingStreamState::WaitingForRequest { .. },
48 P2pNetworkKademliaStreamAction::IncomingData {
49 data,
50 addr,
51 peer_id,
52 stream_id,
53 },
54 ) => {
55 let data = &data.0;
56 let mut reader = BytesReader::from_bytes(data);
57
58 match reader.read_varint32(data).map(|v| v as usize) {
59 Ok(encoded_len) if encoded_len > limits.kademlia_request() => {
60 *state = P2pNetworkKadIncomingStreamState::Error(
61 P2pNetworkStreamProtobufError::Limit(
62 encoded_len,
63 limits.kademlia_request(),
64 ),
65 );
66 }
67 Ok(encoded_len) => {
68 let remaining_len = reader.len();
69 if let Some(remaining_data) = data.get(data.len() - remaining_len..) {
70 if encoded_len > remaining_len {
71 *state = P2pNetworkKadIncomingStreamState::PartialRequestReceived {
72 len: encoded_len,
73 data: remaining_data.to_vec(),
74 };
75 return Ok(());
76 }
77
78 state.handle_incoming_request(encoded_len, remaining_data)?;
79 } else {
80 *state = P2pNetworkKadIncomingStreamState::Error(
81 P2pNetworkStreamProtobufError::Message("out of bounds".to_owned()),
82 );
83 }
84 }
85 Err(_) => {
86 *state = P2pNetworkKadIncomingStreamState::Error(
87 P2pNetworkStreamProtobufError::MessageLength,
88 );
89 }
90 };
91
92 let state = state.clone();
93 let dispatcher = state_context.into_dispatcher();
94
95 match state {
96 P2pNetworkKadIncomingStreamState::RequestIsReady {
97 data: P2pNetworkKademliaRpcRequest::FindNode { key },
98 } => {
99 dispatcher.push(P2pNetworkKademliaStreamAction::WaitOutgoing {
100 addr,
101 peer_id,
102 stream_id,
103 callback: P2pNetworkKademliaStreamWaitOutgoingCallback::answer_find_node_request(key)
104 });
105 }
106 P2pNetworkKadIncomingStreamState::Error(error) => {
107 dispatcher.push(P2pNetworkSchedulerAction::Error {
108 addr,
109 error: P2pNetworkConnectionError::from(
110 P2pNetworkKadIncomingStreamError::from(error),
111 ),
112 });
113 }
114 _ => bug_condition!("Invalid state"),
115 }
116
117 Ok(())
118 }
119 (
120 P2pNetworkKadIncomingStreamState::PartialRequestReceived { len, data },
121 P2pNetworkKademliaStreamAction::IncomingData {
122 data: new_data,
123 addr,
124 peer_id,
125 stream_id,
126 },
127 ) => {
128 let mut data = data.clone();
129 data.extend_from_slice(&new_data.0);
130
131 if *len > data.len() {
132 *state = P2pNetworkKadIncomingStreamState::PartialRequestReceived {
133 len: *len,
134 data,
135 };
136 return Ok(());
137 }
138
139 state.handle_incoming_request(*len, &data)?;
140 let state = state.clone();
141 let dispatcher = state_context.into_dispatcher();
142
143 match state {
144 P2pNetworkKadIncomingStreamState::RequestIsReady {
145 data: P2pNetworkKademliaRpcRequest::FindNode { key },
146 } => {
147 dispatcher.push(P2pNetworkKademliaStreamAction::WaitOutgoing {
148 addr,
149 peer_id,
150 stream_id,
151 callback: P2pNetworkKademliaStreamWaitOutgoingCallback::answer_find_node_request(key)
152 });
153 }
154 P2pNetworkKadIncomingStreamState::Error(error) => {
155 warn!(meta.time(); summary = "error handling kademlia action", error = display(&error));
156 dispatcher.push(P2pNetworkSchedulerAction::Error {
157 addr,
158 error: P2pNetworkConnectionError::from(
159 P2pNetworkKadIncomingStreamError::from(error),
160 ),
161 });
162 }
163 _ => bug_condition!("Invalid state"),
164 }
165
166 Ok(())
167 }
168 (
169 P2pNetworkKadIncomingStreamState::RequestIsReady { .. },
170 P2pNetworkKademliaStreamAction::WaitOutgoing {
171 addr,
172 peer_id,
173 stream_id,
174 callback,
175 },
176 ) => {
177 *state = P2pNetworkKadIncomingStreamState::WaitingForReply;
178 let dispatcher = state_context.into_dispatcher();
179 match callback {
180 P2pNetworkKademliaStreamWaitOutgoingCallback::AnswerFindNodeRequest {
181 callback,
182 args,
183 } => dispatcher.push_callback(callback, (addr, peer_id, stream_id, args)),
184 P2pNetworkKademliaStreamWaitOutgoingCallback::UpdateFindNodeRequest {
185 callback,
186 args,
187 } => dispatcher.push_callback(callback, (addr, peer_id, stream_id, args)),
188 };
189 Ok(())
190 }
191 (
192 P2pNetworkKadIncomingStreamState::WaitingForReply,
193 P2pNetworkKademliaStreamAction::SendResponse {
194 data,
195 peer_id,
196 addr,
197 stream_id,
198 },
199 ) => {
200 let message = Message::try_from(&data).map_err(|e| e.to_string())?;
201 let bytes = serialize_into_vec(&message).map_err(|e| format!("{e}"))?;
202 *state = P2pNetworkKadIncomingStreamState::ResponseBytesAreReady {
203 bytes: bytes.clone(),
204 };
205
206 let dispatcher = state_context.into_dispatcher();
207 let data = fuzzed_maybe!(Data::from(bytes), crate::fuzzer::mutate_kad_data);
208 let flags = fuzzed_maybe!(Default::default(), crate::fuzzer::mutate_yamux_flags);
209
210 dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
211 addr,
212 stream_id,
213 data,
214 flags,
215 });
216 dispatcher.push(P2pNetworkKademliaStreamAction::WaitIncoming {
217 addr,
218 peer_id,
219 stream_id,
220 });
221 Ok(())
222 }
223 (
224 P2pNetworkKadIncomingStreamState::ResponseBytesAreReady { .. },
225 P2pNetworkKademliaStreamAction::WaitIncoming { .. },
226 ) => {
227 *state = P2pNetworkKadIncomingStreamState::WaitingForRequest { expect_close: true };
228 Ok(())
229 }
230 (
231 P2pNetworkKadIncomingStreamState::WaitingForRequest { expect_close, .. },
232 P2pNetworkKademliaStreamAction::RemoteClose {
233 addr,
234 peer_id,
235 stream_id,
236 },
237 ) if *expect_close => {
238 *state = P2pNetworkKadIncomingStreamState::Closing;
239
240 let dispatcher = state_context.into_dispatcher();
241 dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
242 addr,
243 stream_id,
244 data: Data::empty(),
245 flags: YamuxFlags::FIN,
246 });
247 dispatcher.push(P2pNetworkKademliaStreamAction::Prune {
248 addr,
249 peer_id,
250 stream_id,
251 });
252 Ok(())
253 }
254 action => Err(format!(
255 "kademlia incoming stream state {state:?} is incorrect for action {action:?}",
256 )),
257 }
258 }
259
260 fn handle_incoming_request(&mut self, len: usize, data: &[u8]) -> Result<(), String> {
261 let mut reader = BytesReader::from_bytes(data);
262
263 let message = match reader.read_message_by_len::<Message>(data, len) {
264 Ok(v) => v,
265 Err(e) => {
266 *self = P2pNetworkKadIncomingStreamState::Error(
267 P2pNetworkStreamProtobufError::Message(e.to_string()),
268 );
269 return Ok(());
270 }
271 };
272
273 let data = match P2pNetworkKademliaRpcRequest::try_from(message) {
274 Ok(v) => v,
275 Err(e) => {
276 *self = P2pNetworkKadIncomingStreamState::Error(e.into());
277 return Ok(());
278 }
279 };
280
281 *self = P2pNetworkKadIncomingStreamState::RequestIsReady { data };
282 Ok(())
283 }
284}
285
286impl P2pNetworkKadOutgoingStreamState {
287 pub fn reducer<State, Action>(
288 mut state_context: Substate<Action, State, P2pNetworkKadState>,
289 action: ActionWithMeta<P2pNetworkKademliaStreamAction>,
290 limits: &P2pLimits,
291 ) -> Result<(), String>
292 where
293 State: SubstateAccess<P2pNetworkKadState> + SubstateAccess<P2pState>,
294 Action: crate::P2pActionTrait<State>,
295 {
296 let (action, meta) = action.split();
297 let Some(P2pNetworkKadStreamState::Outgoing(state)) = state_context
298 .get_substate_mut()?
299 .find_kad_stream_state_mut(action.peer_id(), action.stream_id())
300 else {
301 bug_condition!("Stream not found");
302 return Ok(());
303 };
304
305 match (&state, action) {
306 (
307 P2pNetworkKadOutgoingStreamState::Default,
308 P2pNetworkKademliaStreamAction::New { incoming, .. },
309 ) if !incoming => {
310 *state = P2pNetworkKadOutgoingStreamState::WaitingForRequest {
311 expect_close: false,
312 };
313 Ok(())
314 }
315
316 (
317 P2pNetworkKadOutgoingStreamState::WaitingForRequest { .. },
318 P2pNetworkKademliaStreamAction::SendRequest {
319 data,
320 addr,
321 stream_id,
322 peer_id,
323 },
324 ) => {
325 let message = Message::from(&data);
326 let bytes = serialize_into_vec(&message).map_err(|e| format!("{e}"))?;
327 *state = P2pNetworkKadOutgoingStreamState::RequestBytesAreReady {
328 bytes: bytes.clone(),
329 };
330
331 let dispatcher = state_context.into_dispatcher();
332 let data = fuzzed_maybe!(Data::from(bytes), crate::fuzzer::mutate_kad_data);
333 let flags = fuzzed_maybe!(Default::default(), crate::fuzzer::mutate_yamux_flags);
334
335 dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
336 addr,
337 stream_id,
338 data,
339 flags,
340 });
341 dispatcher.push(P2pNetworkKademliaStreamAction::WaitIncoming {
342 addr,
343 peer_id,
344 stream_id,
345 });
346 dispatcher.push(P2pNetworkKadRequestAction::RequestSent { peer_id });
347 Ok(())
348 }
349 (
350 P2pNetworkKadOutgoingStreamState::RequestBytesAreReady { .. },
351 P2pNetworkKademliaStreamAction::WaitIncoming { .. },
352 ) => {
353 *state = P2pNetworkKadOutgoingStreamState::WaitingForReply;
354 Ok(())
355 }
356
357 (
358 P2pNetworkKadOutgoingStreamState::WaitingForReply { .. },
359 P2pNetworkKademliaStreamAction::IncomingData {
360 data,
361 peer_id,
362 stream_id,
363 addr,
364 },
365 ) => {
366 let data = &data.0;
367
368 let mut reader = BytesReader::from_bytes(data);
369
370 match reader.read_varint32(data).map(|v| v as usize) {
371 Ok(encoded_len) if encoded_len > limits.kademlia_response() => {
372 *state = P2pNetworkKadOutgoingStreamState::Error(
373 P2pNetworkStreamProtobufError::Limit(
374 encoded_len,
375 limits.kademlia_response(),
376 ),
377 );
378 }
379 Ok(encoded_len) => {
380 let remaining_len = reader.len();
381 if let Some(remaining_data) = data.get(data.len() - remaining_len..) {
382 if encoded_len > remaining_len {
383 *state = P2pNetworkKadOutgoingStreamState::PartialReplyReceived {
384 len: encoded_len,
385 data: remaining_data.to_vec(),
386 };
387 return Ok(());
388 }
389
390 state.handle_incoming_response(encoded_len, remaining_data)?;
391 } else {
392 *state = P2pNetworkKadOutgoingStreamState::Error(
393 P2pNetworkStreamProtobufError::Message("out of bounds".to_owned()),
394 );
395 }
396 }
397 Err(_) => {
398 *state = P2pNetworkKadOutgoingStreamState::Error(
399 P2pNetworkStreamProtobufError::MessageLength,
400 );
401 }
402 };
403
404 let state = state.clone();
405 let dispatcher = state_context.into_dispatcher();
406
407 match state {
408 P2pNetworkKadOutgoingStreamState::ResponseIsReady {
409 data:
410 P2pNetworkKademliaRpcReply::FindNode {
411 closer_peers: closest_peers,
412 },
413 } => {
414 dispatcher.push(P2pNetworkKademliaStreamAction::WaitOutgoing {
415 addr,
416 peer_id,
417 stream_id,
418 callback: P2pNetworkKademliaStreamWaitOutgoingCallback::update_find_node_request(closest_peers)
419 });
420 }
421 P2pNetworkKadOutgoingStreamState::Error(error) => {
422 warn!(meta.time(); summary = "error handling kademlia action", error = display(&error));
423 dispatcher.push(P2pNetworkSchedulerAction::Error {
424 addr,
425 error: P2pNetworkConnectionError::from(
426 P2pNetworkKadOutgoingStreamError::from(error),
427 ),
428 });
429 }
430 _ => {
431 bug_condition!("Invalid state");
432 }
433 }
434
435 Ok(())
436 }
437 (
438 P2pNetworkKadOutgoingStreamState::PartialReplyReceived { len, data },
439 P2pNetworkKademliaStreamAction::IncomingData {
440 data: new_data,
441 stream_id,
442 peer_id,
443 addr,
444 },
445 ) => {
446 let mut data = data.clone();
447 data.extend_from_slice(&new_data);
448
449 if *len > data.len() {
450 *state =
451 P2pNetworkKadOutgoingStreamState::PartialReplyReceived { len: *len, data };
452 return Ok(());
453 }
454
455 state.handle_incoming_response(*len, &data)?;
456 let state = state.clone();
457 let dispatcher = state_context.into_dispatcher();
458
459 match state {
460 P2pNetworkKadOutgoingStreamState::ResponseIsReady {
461 data:
462 P2pNetworkKademliaRpcReply::FindNode {
463 closer_peers: closest_peers,
464 },
465 } => {
466 dispatcher.push(P2pNetworkKademliaStreamAction::WaitOutgoing {
467 addr,
468 peer_id,
469 stream_id,
470 callback: P2pNetworkKademliaStreamWaitOutgoingCallback::update_find_node_request(closest_peers)
471 });
472 }
473 P2pNetworkKadOutgoingStreamState::Error(error) => {
474 warn!(meta.time(); summary = "error handling kademlia action", error = display(&error));
475 dispatcher.push(P2pNetworkSchedulerAction::Error {
476 addr,
477 error: P2pNetworkConnectionError::from(
478 P2pNetworkKadOutgoingStreamError::from(error),
479 ),
480 });
481 }
482 _ => bug_condition!("Invalid state"),
483 }
484
485 Ok(())
486 }
487 (
488 P2pNetworkKadOutgoingStreamState::ResponseIsReady { .. },
489 P2pNetworkKademliaStreamAction::WaitOutgoing {
490 addr,
491 peer_id,
492 stream_id,
493 callback,
494 },
495 ) => {
496 *state = P2pNetworkKadOutgoingStreamState::WaitingForRequest { expect_close: true };
497 let dispatcher = state_context.into_dispatcher();
498 match callback {
499 P2pNetworkKademliaStreamWaitOutgoingCallback::AnswerFindNodeRequest {
500 callback,
501 args,
502 } => dispatcher.push_callback(callback, (addr, peer_id, stream_id, args)),
503 P2pNetworkKademliaStreamWaitOutgoingCallback::UpdateFindNodeRequest {
504 callback,
505 args,
506 } => dispatcher.push_callback(callback, (addr, peer_id, stream_id, args)),
507 };
508 Ok(())
509 }
510 (
511 P2pNetworkKadOutgoingStreamState::WaitingForRequest { expect_close },
512 P2pNetworkKademliaStreamAction::Close {
513 addr,
514 peer_id,
515 stream_id,
516 },
517 ) if *expect_close => {
518 *state =
519 P2pNetworkKadOutgoingStreamState::RequestBytesAreReady { bytes: Vec::new() };
520
521 let dispatcher = state_context.into_dispatcher();
522 dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
523 addr,
524 stream_id,
525 data: Data::empty(),
526 flags: YamuxFlags::FIN,
527 });
528 dispatcher.push(P2pNetworkKademliaStreamAction::Prune {
529 addr,
530 peer_id,
531 stream_id,
532 });
533 Ok(())
534 }
535 (
536 P2pNetworkKadOutgoingStreamState::Closing,
537 P2pNetworkKademliaStreamAction::RemoteClose {
538 addr,
539 peer_id,
540 stream_id,
541 },
542 ) => {
543 *state = P2pNetworkKadOutgoingStreamState::Closed;
544 let dispatcher = state_context.into_dispatcher();
545 dispatcher.push(P2pNetworkKademliaStreamAction::Prune {
546 addr,
547 peer_id,
548 stream_id,
549 });
550 Ok(())
551 }
552 (state, action) => Err(format!(
553 "kademlia outgoing stream state {state:?} is incorrect for action {action:?}",
554 )),
555 }
556 }
557
558 fn handle_incoming_response(&mut self, len: usize, data: &[u8]) -> Result<(), String> {
559 let mut reader = BytesReader::from_bytes(data);
560
561 let message = match reader.read_message_by_len::<Message>(data, len) {
562 Ok(v) => v,
563 Err(e) => {
564 *self = P2pNetworkKadOutgoingStreamState::Error(
565 P2pNetworkStreamProtobufError::Message(e.to_string()),
566 );
567 return Ok(());
568 }
569 };
570
571 let data = match P2pNetworkKademliaRpcReply::try_from(message) {
572 Ok(v) => v,
573 Err(e) => {
574 *self = P2pNetworkKadOutgoingStreamState::Error(e.into());
575 return Ok(());
576 }
577 };
578
579 *self = P2pNetworkKadOutgoingStreamState::ResponseIsReady { data };
580 Ok(())
581 }
582}
583
584impl P2pNetworkKadStreamState {
585 pub fn reducer<State, Action>(
586 mut state_context: Substate<Action, State, P2pNetworkKadState>,
587 action: ActionWithMeta<P2pNetworkKademliaStreamAction>,
588 limits: &P2pLimits,
589 ) -> Result<(), String>
590 where
591 State: SubstateAccess<P2pNetworkKadState> + SubstateAccess<P2pState>,
592 Action: crate::P2pActionTrait<State>,
593 {
594 let state = state_context.get_substate_mut()?;
595 let (action, meta) = action.split();
596 let stream_state = match &action {
597 P2pNetworkKademliaStreamAction::New {
598 peer_id,
599 stream_id,
600 incoming,
601 ..
602 } => state
603 .create_kad_stream_state(*incoming, peer_id, stream_id)
604 .map_err(|stream| {
605 format!("kademlia stream already exists for action {action:?}: {stream:?}")
606 })?,
607 P2pNetworkKademliaStreamAction::Prune {
608 peer_id, stream_id, ..
609 } => {
610 return state
611 .remove_kad_stream_state(peer_id, stream_id)
612 .then_some(())
613 .ok_or_else(|| format!("kademlia stream not found for action {action:?}"))
614 }
615 _ => state
616 .find_kad_stream_state(action.peer_id(), action.stream_id())
617 .ok_or_else(|| format!("kademlia stream not found for action {action:?}"))?,
618 };
619
620 match stream_state {
621 P2pNetworkKadStreamState::Incoming(_) => P2pNetworkKadIncomingStreamState::reducer(
622 state_context,
623 meta.with_action(action),
624 limits,
625 ),
626 P2pNetworkKadStreamState::Outgoing(_) => P2pNetworkKadOutgoingStreamState::reducer(
627 state_context,
628 meta.with_action(action),
629 limits,
630 ),
631 }
632 }
633}