1use std::{collections::BTreeMap, sync::OnceLock};
2
3use identify::P2pNetworkIdentifyStreamAction;
4use openmina_core::{bug_condition, error, warn, Substate};
5use redux::Dispatcher;
6use request::{P2pNetworkKadRequestState, P2pNetworkKadRequestStatus};
7use token::{
8 AuthKind, DiscoveryAlgorithm, IdentifyAlgorithm, MuxKind, PingAlgorithm, Protocol,
9 RpcAlgorithm, StreamKind,
10};
11
12use crate::{
13 connection::{
14 incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction,
15 P2pConnectionState,
16 },
17 disconnection::P2pDisconnectionAction,
18 identify::P2pIdentifyAction,
19 P2pConfig, P2pPeerStatus, P2pState, PeerId,
20};
21
22use super::{super::*, p2p_network_scheduler_state::P2pNetworkConnectionState, *};
23
24impl P2pNetworkSchedulerState {
25 pub fn reducer<Action, State>(
26 mut state_context: Substate<Action, State, Self>,
27 action: redux::ActionWithMeta<P2pNetworkSchedulerAction>,
28 ) -> Result<(), String>
29 where
30 State: crate::P2pStateTrait,
31 Action: crate::P2pActionTrait<State>,
32 {
33 let (action, meta) = action.split();
34 let scheduler_state = state_context.get_substate_mut()?;
35
36 match action {
37 P2pNetworkSchedulerAction::InterfaceDetected { ip, .. } => {
38 scheduler_state.interfaces.insert(ip);
39
40 let (dispatcher, state) = state_context.into_dispatcher_and_state();
41 let p2p_config: &P2pConfig = state.substate()?;
42
43 if let Some(port) = p2p_config.libp2p_port {
44 dispatcher
45 .push(P2pNetworkSchedulerEffectfulAction::InterfaceDetected { ip, port });
46 }
47
48 Ok(())
49 }
50 P2pNetworkSchedulerAction::InterfaceExpired { ip, .. } => {
51 scheduler_state.interfaces.remove(&ip);
52 Ok(())
53 }
54 P2pNetworkSchedulerAction::ListenerReady { listener } => {
55 scheduler_state.listeners.insert(listener);
56 Ok(())
57 }
58 P2pNetworkSchedulerAction::ListenerError { listener, .. } => {
59 scheduler_state.listeners.remove(&listener);
60 Ok(())
61 }
62 P2pNetworkSchedulerAction::IncomingDataIsReady { addr } => {
63 let (dispatcher, state) = state_context.into_dispatcher_and_state();
64 let scheduler: &Self = state.substate()?;
65 let Some(connection_state) = scheduler.connection_state(&addr) else {
66 bug_condition!(
67 "Invalid state for `P2pNetworkSchedulerAction::IncomingDataIsReady`"
68 );
69 return Ok(());
70 };
71
72 let limit = connection_state.limit();
73 if limit > 0 {
74 dispatcher.push(P2pNetworkSchedulerEffectfulAction::IncomingDataIsReady {
75 addr,
76 limit,
77 });
78 }
79
80 Ok(())
81 }
82 P2pNetworkSchedulerAction::IncomingDidAccept { addr, result } => {
83 if let Some(addr) = addr {
84 scheduler_state.connections.insert(
85 addr,
86 P2pNetworkConnectionState {
87 incoming: true,
88 pnet: P2pNetworkPnetState::new(scheduler_state.pnet_key, meta.time()),
89 select_auth: P2pNetworkSelectState::default(),
90 auth: None,
91 select_mux: P2pNetworkSelectState::default(),
92 mux: None,
93 streams: BTreeMap::default(),
94 closed: None,
95 limit: P2pNetworkConnectionState::INITIAL_LIMIT,
96 },
97 );
98 };
99
100 let dispatcher = state_context.into_dispatcher();
101 if let Some(addr) = addr {
102 dispatcher.push(P2pNetworkSchedulerEffectfulAction::IncomingDidAccept {
103 addr,
104 result,
105 });
106 }
107
108 Ok(())
109 }
110 P2pNetworkSchedulerAction::OutgoingConnect { addr } => {
111 scheduler_state.connections.insert(
112 ConnectionAddr {
113 sock_addr: addr,
114 incoming: false,
115 },
116 P2pNetworkConnectionState {
117 incoming: false,
118 pnet: P2pNetworkPnetState::new(scheduler_state.pnet_key, meta.time()),
119 select_auth: P2pNetworkSelectState::initiator_auth(
120 token::AuthKind::Noise,
121 meta.time(),
122 ),
123 auth: None,
124 select_mux: P2pNetworkSelectState::initiator_mux(
125 token::MuxKind::Yamux1_0_0,
126 meta.time(),
127 ),
128 mux: None,
129 streams: BTreeMap::default(),
130 closed: None,
131 limit: P2pNetworkConnectionState::INITIAL_LIMIT,
132 },
133 );
134
135 let dispatcher = state_context.into_dispatcher();
136 dispatcher.push(P2pNetworkSchedulerEffectfulAction::OutgoingConnect { addr });
137 Ok(())
138 }
139 P2pNetworkSchedulerAction::OutgoingDidConnect { addr, result } => {
140 let (dispatcher, state) = state_context.into_dispatcher_and_state();
143 let p2p_state: &P2pState = state.substate()?;
144
145 match result {
146 Ok(()) => {
147 dispatcher
148 .push(P2pNetworkSchedulerEffectfulAction::OutgoingDidConnect { addr });
149 }
150 Err(error) => {
151 let Some((peer_id, peer_state)) = p2p_state.peer_with_connection(addr)
152 else {
153 bug_condition!(
154 "outgoing connection to {addr} failed, but there is no peer for it"
155 );
156 return Ok(());
157 };
158 if matches!(
159 peer_state.status,
160 P2pPeerStatus::Connecting(P2pConnectionState::Outgoing(_))
161 ) {
162 dispatcher.push(P2pConnectionOutgoingAction::FinalizeError {
163 peer_id,
164 error: error.to_string(),
165 });
166 } else {
167 bug_condition!("Invalid status for `P2pNetworkSchedulerAction::OutgoingDidConnect`: {:?}", peer_state.status);
168 }
169 }
170 }
171 Ok(())
172 }
173 P2pNetworkSchedulerAction::IncomingDataDidReceive { result, addr } => {
174 let Some(state) = scheduler_state.connection_state_mut(&addr) else {
176 bug_condition!("Unable to find connection for `P2pNetworkSchedulerAction::IncomingDataDidReceive`");
177 return Ok(());
178 };
179
180 if let Ok(data) = &result {
181 state.consume(data.len());
182 };
183
184 let dispatcher = state_context.into_dispatcher();
185 match result {
186 Ok(data) => {
187 dispatcher.push(P2pNetworkPnetAction::IncomingData { addr, data });
188 }
189 Err(error) => dispatcher.push(P2pNetworkSchedulerAction::Error {
190 addr,
191 error: P2pNetworkConnectionError::MioError(error),
192 }),
193 }
194 Ok(())
195 }
196 P2pNetworkSchedulerAction::SelectDone {
197 addr,
198 kind,
199 protocol,
200 incoming,
201 expected_peer_id,
202 } => {
203 scheduler_state.reducer_select_done(
204 addr,
205 kind,
206 protocol,
207 incoming,
208 expected_peer_id,
209 );
210
211 let (dispatcher, state) = state_context.into_dispatcher_and_state();
212 let p2p_state: &P2pState = state.substate()?;
213 Self::forward_select_done(dispatcher, p2p_state, protocol, addr, incoming, kind);
214 Ok(())
215 }
216 P2pNetworkSchedulerAction::SelectError { addr, kind, .. } => {
217 let dispatcher = state_context.into_dispatcher();
218
219 match kind {
220 SelectKind::Stream(peer_id, stream_id)
221 if keep_connection_with_unknown_stream() =>
222 {
223 warn!(meta.time(); summary="select error for stream", addr = display(addr), peer_id = display(peer_id));
224 dispatcher.push(P2pNetworkYamuxAction::OutgoingData {
226 addr,
227 stream_id,
228 data: Data::default(),
229 flags: YamuxFlags::RST,
230 });
231 dispatcher
232 .push(P2pNetworkSchedulerAction::PruneStream { peer_id, stream_id });
233 }
234 _ => {
235 dispatcher.push(P2pNetworkSchedulerAction::Error {
236 addr,
237 error: P2pNetworkConnectionError::SelectError,
238 });
239 }
240 }
241
242 dispatcher.push(P2pNetworkSchedulerEffectfulAction::Disconnect {
243 addr,
244 reason: P2pNetworkConnectionCloseReason::Error(
245 P2pNetworkConnectionError::SelectError,
246 ),
247 });
248
249 Ok(())
250 }
251 P2pNetworkSchedulerAction::YamuxDidInit {
252 addr,
253 peer_id,
254 message_size_limit,
255 pending_outgoing_limit,
256 } => {
257 let Some(cn) = scheduler_state.connections.get_mut(&addr) else {
258 bug_condition!(
259 "Missing connection state for `P2pNetworkSchedulerAction::YamuxDidInit`"
260 );
261 return Ok(());
262 };
263 if let Some(P2pNetworkConnectionMuxState::Yamux(yamux)) = &mut cn.mux {
264 yamux.init = true;
265 yamux.message_size_limit = message_size_limit;
266 yamux.pending_outgoing_limit = pending_outgoing_limit;
267 }
268
269 let incoming = cn.incoming;
270 let dispatcher = state_context.into_dispatcher();
271
272 if incoming {
273 dispatcher.push(P2pConnectionIncomingAction::Libp2pReceived { peer_id });
274 } else {
275 dispatcher.push(P2pConnectionOutgoingAction::FinalizeSuccess {
276 peer_id,
277 remote_auth: None,
278 });
279 }
280
281 dispatcher.push(P2pIdentifyAction::NewRequest { peer_id, addr });
282 Ok(())
283 }
284 P2pNetworkSchedulerAction::Disconnect { addr, reason } => {
285 let Some(conn_state) = scheduler_state.connections.get_mut(&addr) else {
286 bug_condition!(
287 "`P2pNetworkSchedulerAction::Disconnect`: connection {addr} does not exist"
288 );
289 return Ok(());
290 };
291 if conn_state.closed.is_some() {
292 bug_condition!(
293 "`P2pNetworkSchedulerAction::Disconnect`: {addr} already disconnected"
294 );
295 return Ok(());
296 }
297 conn_state.closed = Some(reason.clone().into());
298
299 let dispatcher = state_context.into_dispatcher();
300 dispatcher.push(P2pNetworkSchedulerEffectfulAction::Disconnect {
301 addr,
302 reason: P2pNetworkConnectionCloseReason::Disconnect(reason),
303 });
304
305 Ok(())
306 }
307 P2pNetworkSchedulerAction::Error { addr, error } => {
308 let Some(conn_state) = scheduler_state.connections.get_mut(&addr) else {
309 bug_condition!(
310 "`P2pNetworkSchedulerAction::Error`: connection {addr} does not exist"
311 );
312 return Ok(());
313 };
314 if conn_state.closed.is_some() {
315 bug_condition!(
316 "`P2pNetworkSchedulerAction::Error`: {addr} already disconnected"
317 );
318 return Ok(());
319 }
320 conn_state.closed = Some(error.clone().into());
321
322 let dispatcher = state_context.into_dispatcher();
323 dispatcher.push(P2pNetworkSchedulerEffectfulAction::Disconnect {
324 addr,
325 reason: P2pNetworkConnectionCloseReason::Error(error),
326 });
327 Ok(())
328 }
329 P2pNetworkSchedulerAction::Disconnected { addr, reason } => {
330 let Some(cn) = scheduler_state.connections.get_mut(&addr) else {
331 bug_condition!(
332 "P2pNetworkSchedulerAction::Disconnected: connection {addr} does not exist"
333 );
334 return Ok(());
335 };
336 if cn.closed.is_none() {
337 bug_condition!(
338 "P2pNetworkSchedulerAction::Disconnect: {addr} is not disconnected"
339 );
340 }
341
342 let incoming = cn.incoming;
343 let (dispatcher, state) = state_context.into_dispatcher_and_state();
344 let state: &P2pState = state.substate()?;
345
346 let peer_with_state = state.peer_with_connection(addr);
347
348 if reason.is_disconnected() {
349 return Ok(());
351 }
352
353 match peer_with_state {
354 Some((peer_id, peer_state)) => {
355 match &peer_state.status {
357 P2pPeerStatus::Connecting(
358 crate::connection::P2pConnectionState::Incoming(_),
359 ) => {
360 dispatcher.push(P2pConnectionIncomingAction::FinalizeError {
361 peer_id,
362 error: reason.to_string(),
363 });
364 }
365 P2pPeerStatus::Connecting(
366 crate::connection::P2pConnectionState::Outgoing(_),
367 ) => {
368 dispatcher.push(P2pConnectionOutgoingAction::FinalizeError {
369 peer_id,
370 error: reason.to_string(),
371 });
372 }
373 P2pPeerStatus::Disconnecting { .. } => {}
374 P2pPeerStatus::Disconnected { .. } => {
375 if !incoming {
377 error!(meta.time(); "disconnected peer connection for address {addr}");
378 } else {
379 }
381 }
382 P2pPeerStatus::Ready(_) => {
383 dispatcher.push(P2pDisconnectionAction::Finish { peer_id });
384 }
385 }
386 }
387 None => {
388 if !incoming {
390 } else {
392 }
394 }
395 }
396 Ok(())
397 }
398 P2pNetworkSchedulerAction::Prune { addr } => {
399 if let Some(old) = scheduler_state.connections.remove(&addr) {
400 if let Some(peer_id) = old.peer_id() {
401 scheduler_state.prune_peer_state(peer_id);
402 }
403 }
404 Ok(())
405 }
406 P2pNetworkSchedulerAction::PruneStream { peer_id, stream_id } => {
407 let Some((_, conn_state)) = scheduler_state
408 .connections
409 .iter_mut()
410 .find(|(_, conn_state)| conn_state.peer_id() == Some(&peer_id))
411 else {
412 bug_condition!("PruneStream: peer {peer_id} not found");
413 return Ok(());
414 };
415
416 if conn_state.streams.remove(&stream_id).is_none() {
417 bug_condition!("PruneStream: peer {peer_id} does not have stream {stream_id}");
418 }
419
420 Ok(())
421 }
422 P2pNetworkSchedulerAction::IncomingConnectionIsReady { listener } => {
423 let (dispatcher, state) = state_context.into_dispatcher_and_state();
424 let p2p_state: &P2pState = state.substate()?;
425
426 let should_accept = p2p_state.network.scheduler.connections.len()
427 < p2p_state.config.limits.max_connections();
428
429 dispatcher.push(
430 P2pNetworkSchedulerEffectfulAction::IncomingConnectionIsReady {
431 listener,
432 should_accept,
433 },
434 );
435 Ok(())
436 }
437 }
438 }
439
440 fn reducer_select_done(
441 &mut self,
442 addr: ConnectionAddr,
443 kind: SelectKind,
444 protocol: Option<Protocol>,
445 incoming: bool,
446 expected_peer_id: Option<PeerId>,
447 ) {
448 let Some(connection) = self.connections.get_mut(&addr) else {
449 bug_condition!("Missing connection state for `P2pNetworkSchedulerAction::SelectDone`");
450 return;
451 };
452
453 match protocol {
454 Some(token::Protocol::Auth(token::AuthKind::Noise)) => {
455 connection.auth = Some(P2pNetworkAuthState::Noise(P2pNetworkNoiseState::new(
456 self.local_pk.clone(),
457 expected_peer_id,
458 )));
459 }
460 Some(token::Protocol::Mux(
461 token::MuxKind::Yamux1_0_0 | token::MuxKind::YamuxNoNewLine1_0_0,
462 )) => {
463 connection.mux = Some(P2pNetworkConnectionMuxState::Yamux(P2pNetworkYamuxState {
464 init: true,
465 ..Default::default()
466 }));
467 }
468 Some(token::Protocol::Stream(stream_kind)) => {
469 let SelectKind::Stream(peer_id, stream_id) = kind else {
470 bug_condition!(
471 "incorrect stream kind {kind:?} for protocol stream: {stream_kind:?}"
472 );
473 return;
474 };
475 match stream_kind {
476 token::StreamKind::Rpc(_) => {
477 if incoming {
478 self.rpc_incoming_streams
479 .entry(peer_id)
480 .or_default()
481 .insert(stream_id, P2pNetworkRpcState::new(addr, stream_id));
482 } else {
483 self.rpc_outgoing_streams
484 .entry(peer_id)
485 .or_default()
486 .insert(stream_id, P2pNetworkRpcState::new(addr, stream_id));
487 }
488 }
489 token::StreamKind::Broadcast(_) => {}
490 token::StreamKind::Identify(_) => {}
491 token::StreamKind::Discovery(_) => {}
492 token::StreamKind::Ping(_) => {}
493 token::StreamKind::Bitswap(_) => {}
494 token::StreamKind::Status(_) => {}
495 }
496 }
497 None => {}
498 }
499 }
500
501 fn forward_select_done<Action, State>(
502 dispatcher: &mut Dispatcher<Action, State>,
503 p2p_state: &P2pState,
504 protocol: Option<Protocol>,
505 addr: ConnectionAddr,
506 incoming: bool,
507 select_kind: SelectKind,
508 ) where
509 State: crate::P2pStateTrait,
510 Action: crate::P2pActionTrait<State>,
511 {
512 match protocol {
513 Some(Protocol::Auth(AuthKind::Noise)) => {
514 dispatcher
515 .push(P2pNetworkSchedulerEffectfulAction::NoiseSelectDone { addr, incoming });
516 }
517 Some(Protocol::Mux(MuxKind::Yamux1_0_0 | MuxKind::YamuxNoNewLine1_0_0)) => {
518 let SelectKind::Multiplexing(peer_id) = select_kind else {
519 bug_condition!("wrong kind for multiplexing protocol action: {select_kind:?}");
520 return;
521 };
522 let message_size_limit = p2p_state.config.limits.yamux_message_size();
523 let pending_outgoing_limit =
524 p2p_state.config.limits.yamux_pending_outgoing_per_peer();
525 dispatcher.push(P2pNetworkSchedulerAction::YamuxDidInit {
526 addr,
527 peer_id,
528 message_size_limit,
529 pending_outgoing_limit,
530 });
531 }
532 Some(Protocol::Stream(kind)) => {
533 let SelectKind::Stream(peer_id, stream_id) = select_kind else {
534 bug_condition!("wrong kind for stream protocol action: {kind:?}");
535 return;
536 };
537 match kind {
538 StreamKind::Status(_)
539 | StreamKind::Identify(IdentifyAlgorithm::IdentifyPush1_0_0)
540 | StreamKind::Bitswap(_)
541 | StreamKind::Ping(PingAlgorithm::Ping1_0_0) => {
542 }
544 StreamKind::Identify(IdentifyAlgorithm::Identify1_0_0) => {
545 dispatcher.push(P2pNetworkIdentifyStreamAction::New {
546 addr,
547 peer_id,
548 stream_id,
549 incoming,
550 });
551 }
552
553 StreamKind::Broadcast(protocol) => {
554 dispatcher.push(P2pNetworkPubsubAction::NewStream {
555 incoming,
556 peer_id,
557 addr,
558 stream_id,
559 protocol,
560 });
561 }
562 StreamKind::Discovery(DiscoveryAlgorithm::Kademlia1_0_0) => {
563 if let Some(discovery_state) = p2p_state.network.scheduler.discovery_state()
564 {
565 let request = !incoming && discovery_state.request(&peer_id).is_some();
566 dispatcher.push(P2pNetworkKademliaStreamAction::New {
567 addr,
568 peer_id,
569 stream_id,
570 incoming,
571 });
572 if request {
574 dispatcher.push(P2pNetworkKadRequestAction::StreamReady {
575 peer_id,
576 addr,
577 stream_id,
578 callback: redux::callback!(
579 on_p2p_network_kad_request_stream_ready((
580 addr: ConnectionAddr,
581 peer_id: PeerId,
582 stream_id: StreamId,
583 data: P2pNetworkKademliaRpcRequest
584 )) -> crate::P2pAction{
585 P2pNetworkKademliaStreamAction::SendRequest {
586 addr,
587 peer_id,
588 stream_id,
589 data
590 }
591 }
592 ),
593 });
594 }
595 }
596 }
597 StreamKind::Rpc(RpcAlgorithm::Rpc0_0_1) => {
598 dispatcher.push(P2pNetworkRpcAction::Init {
599 addr,
600 peer_id,
601 stream_id,
602 incoming,
603 });
604 }
605 }
606 }
607 None => {
608 match &select_kind {
609 SelectKind::Authentication => {
610 }
612 SelectKind::MultiplexingNoPeerId => {
613 bug_condition!("`SelectKind::MultiplexingNoPeerId` not handled");
614 }
616 SelectKind::Multiplexing(_) => {
617 }
619 SelectKind::Stream(peer_id, stream_id) => {
620 if let Some(discovery_state) = p2p_state.network.scheduler.discovery_state()
621 {
622 if let Some(P2pNetworkKadRequestState {
623 status: P2pNetworkKadRequestStatus::WaitingForKadStream(id),
624 ..
625 }) = discovery_state.request(peer_id)
626 {
627 if id == stream_id {
628 dispatcher.push(P2pNetworkKadRequestAction::Error {
629 peer_id: *peer_id,
630 error: "stream protocol is not negotiated".into(),
631 });
632 }
633 }
634 }
635 }
636 }
637 }
638 }
639 }
640}
641
642fn keep_connection_with_unknown_stream() -> bool {
643 static VAL: OnceLock<bool> = OnceLock::new();
644 *VAL.get_or_init(|| {
645 std::env::var("KEEP_CONNECTION_WITH_UNKNOWN_STREAM")
646 .ok()
647 .and_then(|v| v.parse().ok())
648 .unwrap_or(false)
649 })
650}