libp2p_rpc_behaviour/
stream.rs1use std::{
2 collections::BTreeSet,
3 io,
4 sync::Arc,
5 task::{self, Context, Poll},
6};
7
8use libp2p::{core::upgrade::ReadyUpgrade, swarm::handler::InboundUpgradeSend, StreamProtocol};
9use mina_p2p_messages::rpc_kernel::RpcTag;
10
11use super::{
12 behaviour::{Event, StreamId},
13 state,
14};
15
16pub struct Stream {
17 opening_state: Option<OpeningState>,
18 inner_state: state::Inner,
19}
20
21enum OpeningState {
22 Requested,
23 Negotiated {
24 io: <ReadyUpgrade<StreamProtocol> as InboundUpgradeSend>::Output,
25 },
26}
27
28pub enum StreamEvent {
29 Request(u32),
30 Event(Event),
31}
32
33impl Stream {
34 pub fn new_outgoing(ask_menu: bool) -> Self {
35 Stream {
36 opening_state: None,
37 inner_state: state::Inner::new(Arc::new(BTreeSet::default()), ask_menu),
39 }
40 }
41
42 pub fn new_incoming(menu: Arc<BTreeSet<(RpcTag, u32)>>) -> Self {
43 Stream {
44 opening_state: None,
45 inner_state: state::Inner::new(menu, false),
46 }
47 }
48
49 pub fn negotiated(&mut self, io: <ReadyUpgrade<StreamProtocol> as InboundUpgradeSend>::Output) {
50 self.opening_state = Some(OpeningState::Negotiated { io });
51 }
52
53 pub fn add(&mut self, bytes: Vec<u8>) {
54 self.inner_state.add(bytes);
55 }
56
57 pub fn poll_stream(
58 &mut self,
59 stream_id: StreamId,
60 cx: &mut Context<'_>,
61 ) -> Poll<io::Result<StreamEvent>> {
62 match &mut self.opening_state {
63 None => {
64 if let StreamId::Outgoing(id) = stream_id {
65 self.opening_state = Some(OpeningState::Requested);
66 Poll::Ready(Ok(StreamEvent::Request(id)))
67 } else {
68 Poll::Pending
69 }
70 }
71 Some(OpeningState::Requested) => Poll::Pending,
72 Some(OpeningState::Negotiated { io }) => {
73 let received = match task::ready!(self.inner_state.poll(cx, io)) {
74 Err(err) => {
75 if err.kind() == io::ErrorKind::UnexpectedEof {
76 if let StreamId::Outgoing(id) = stream_id {
77 log::warn!("reopen stream");
78 self.opening_state = Some(OpeningState::Requested);
79 return Poll::Ready(Ok(StreamEvent::Request(id)));
80 } else {
81 return Poll::Ready(Err(err));
82 }
83 } else {
84 return Poll::Ready(Err(err));
85 }
86 }
87 Ok(v) => v,
88 };
89 Poll::Ready(Ok(StreamEvent::Event(Event::Stream {
90 stream_id,
91 received,
92 })))
93 }
94 }
95 }
96}