libp2p_rpc_behaviour/
stream.rs

1use std::{
2    collections::BTreeSet,
3    io,
4    sync::Arc,
5    task::{self, Context, Poll},
6};
7
8use libp2p::{core::upgrade::ReadyUpgrade, swarm::handler::InboundUpgradeSend, StreamProtocol};
9use mina_p2p_messages::rpc_kernel::RpcTag;
10
11use super::{
12    behaviour::{Event, StreamId},
13    state,
14};
15
16pub struct Stream {
17    opening_state: Option<OpeningState>,
18    inner_state: state::Inner,
19}
20
21enum OpeningState {
22    Requested,
23    Negotiated {
24        io: <ReadyUpgrade<StreamProtocol> as InboundUpgradeSend>::Output,
25    },
26}
27
28pub enum StreamEvent {
29    Request(u32),
30    Event(Event),
31}
32
33impl Stream {
34    pub fn new_outgoing(ask_menu: bool) -> Self {
35        Stream {
36            opening_state: None,
37            // empty menu for outgoing stream
38            inner_state: state::Inner::new(Arc::new(BTreeSet::default()), ask_menu),
39        }
40    }
41
42    pub fn new_incoming(menu: Arc<BTreeSet<(RpcTag, u32)>>) -> Self {
43        Stream {
44            opening_state: None,
45            inner_state: state::Inner::new(menu, false),
46        }
47    }
48
49    pub fn negotiated(&mut self, io: <ReadyUpgrade<StreamProtocol> as InboundUpgradeSend>::Output) {
50        self.opening_state = Some(OpeningState::Negotiated { io });
51    }
52
53    pub fn add(&mut self, bytes: Vec<u8>) {
54        self.inner_state.add(bytes);
55    }
56
57    pub fn poll_stream(
58        &mut self,
59        stream_id: StreamId,
60        cx: &mut Context<'_>,
61    ) -> Poll<io::Result<StreamEvent>> {
62        match &mut self.opening_state {
63            None => {
64                if let StreamId::Outgoing(id) = stream_id {
65                    self.opening_state = Some(OpeningState::Requested);
66                    Poll::Ready(Ok(StreamEvent::Request(id)))
67                } else {
68                    Poll::Pending
69                }
70            }
71            Some(OpeningState::Requested) => Poll::Pending,
72            Some(OpeningState::Negotiated { io }) => {
73                let received = match task::ready!(self.inner_state.poll(cx, io)) {
74                    Err(err) => {
75                        if err.kind() == io::ErrorKind::UnexpectedEof {
76                            if let StreamId::Outgoing(id) = stream_id {
77                                log::warn!("reopen stream");
78                                self.opening_state = Some(OpeningState::Requested);
79                                return Poll::Ready(Ok(StreamEvent::Request(id)));
80                            } else {
81                                return Poll::Ready(Err(err));
82                            }
83                        } else {
84                            return Poll::Ready(Err(err));
85                        }
86                    }
87                    Ok(v) => v,
88                };
89                Poll::Ready(Ok(StreamEvent::Event(Event::Stream {
90                    stream_id,
91                    received,
92                })))
93            }
94        }
95    }
96}