p2p/service_impl/mio/
mod.rs

1mod token;
2use self::token::{Token, TokenRegistry};
3
4use std::{
5    collections::{BTreeMap, VecDeque},
6    io::{self, Read, Write},
7    net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr},
8    process,
9};
10
11use libp2p_identity::Keypair;
12use mio::net::{TcpListener, TcpStream};
13
14use openmina_core::{bug_condition, channels::mpsc};
15use thiserror::Error;
16
17use crate::{ConnectionAddr, MioCmd, MioEvent};
18
19#[derive(Debug, Error)]
20enum MioError {
21    #[error("mio failed to create poll instance, fatal error: {0}")]
22    New(io::Error),
23    #[error("mio failed to create waker instance, fatal error: {0}")]
24    Waker(io::Error),
25    #[error("mio failed to poll events, error: {0}")]
26    Poll(io::Error),
27    #[error("mio failed to start listening on {0}, error: {1}")]
28    Listen(SocketAddr, io::Error),
29    #[error("mio failed to register the socket on {0}, error: {1}")]
30    Register(SocketAddr, io::Error),
31}
32
33impl MioError {
34    fn report(self) {
35        openmina_core::log::error!(
36            openmina_core::log::system_time();
37            kind = "MioError",
38            summary = self.to_string(),
39        );
40    }
41}
42
43// maximal ammount of queued data to send per peer is 64 MiB
44const MAX_QUEUED_BYTES: usize = 0x4000000;
45
46#[derive(Debug)]
47#[allow(clippy::large_enum_variant)]
48pub enum MioService {
49    Pending(Keypair),
50    Ready(MioRunningService),
51}
52
53#[derive(Debug)]
54pub struct MioRunningService {
55    keypair: Keypair,
56    cmd_sender: mpsc::UnboundedSender<MioCmd>,
57    waker: Option<mio::Waker>,
58}
59
60impl redux::TimeService for MioService {}
61
62impl redux::Service for MioService {}
63
64impl MioService {
65    pub fn pending(keypair: Keypair) -> Self {
66        Self::Pending(keypair)
67    }
68
69    pub fn run<F>(&mut self, event_sender: F)
70    where
71        F: 'static + Send + Sync + Fn(MioEvent),
72    {
73        *self = match self {
74            Self::Pending(keypair) => {
75                MioService::Ready(MioRunningService::run(event_sender, keypair.clone()))
76            }
77            _ => {
78                openmina_core::warn!(openmina_core::log::system_time(); "tried to run already running mio service");
79                return;
80            }
81        }
82    }
83
84    pub fn keypair(&self) -> &Keypair {
85        match self {
86            Self::Pending(keypair) => keypair,
87            Self::Ready(s) => &s.keypair,
88        }
89    }
90
91    pub fn pending_cmds(&self) -> usize {
92        match self {
93            Self::Pending(_) => 0,
94            Self::Ready(v) => v.cmd_sender.len(),
95        }
96    }
97
98    pub fn send_cmd(&mut self, cmd: MioCmd) {
99        let MioService::Ready(service) = self else {
100            bug_condition!("mio service is not initialized");
101            return;
102        };
103        service.cmd_sender.send(cmd).unwrap_or_default();
104        if let Some(w) = service.waker.as_ref() {
105            w.wake().unwrap_or_default()
106        }
107    }
108
109    pub fn mocked(keypair: Keypair) -> Self {
110        MioService::Ready(MioRunningService::mocked(keypair))
111    }
112}
113
114impl MioRunningService {
115    fn mocked(keypair: Keypair) -> Self {
116        MioRunningService {
117            keypair,
118            cmd_sender: mpsc::unbounded_channel().0,
119            waker: None,
120        }
121    }
122
123    fn run<F>(event_sender: F, keypair: Keypair) -> Self
124    where
125        F: 'static + Send + Sync + Fn(MioEvent),
126    {
127        let poll = match mio::Poll::new() {
128            Ok(v) => v,
129            Err(err) => {
130                MioError::New(err).report();
131                process::exit(1);
132            }
133        };
134
135        let mut tokens = TokenRegistry::default();
136        let waker = match mio::Waker::new(poll.registry(), tokens.register(Token::Waker)) {
137            Ok(v) => v,
138            Err(err) => {
139                MioError::Waker(err).report();
140                process::exit(1);
141            }
142        };
143
144        let (tx, rx) = mpsc::unbounded_channel();
145
146        let mut inner = MioServiceInner {
147            poll,
148            event_sender,
149            cmd_receiver: rx,
150            tokens,
151            listeners: BTreeMap::default(),
152            connections: BTreeMap::default(),
153            recv_buf: vec![0; 0x8000],
154        };
155
156        std::thread::Builder::new()
157            .name("mio-service".into())
158            .spawn(move || {
159                // fake interfaces, TODO: detect interfaces properly
160                inner.send(MioEvent::InterfaceDetected(IpAddr::V4(
161                    Ipv4Addr::UNSPECIFIED,
162                )));
163
164                let mut events = mio::Events::with_capacity(1024);
165
166                loop {
167                    inner.run(&mut events);
168                }
169            })
170            .expect("Failed: mio-service");
171
172        MioRunningService {
173            keypair,
174            cmd_sender: tx,
175            waker: Some(waker),
176        }
177    }
178}
179
180struct MioServiceInner<F> {
181    poll: mio::Poll,
182    event_sender: F,
183    cmd_receiver: mpsc::UnboundedReceiver<MioCmd>,
184    tokens: TokenRegistry,
185    listeners: BTreeMap<SocketAddr, Listener>,
186    connections: BTreeMap<ConnectionAddr, Connection>,
187    recv_buf: Vec<u8>,
188}
189
190struct Listener {
191    inner: TcpListener,
192    incomind_ready: bool,
193}
194
195struct Connection {
196    stream: TcpStream,
197    transmits: VecDeque<(Box<[u8]>, usize)>,
198    queued_bytes: usize,
199    connected: bool,
200    incoming_ready: bool,
201}
202
203impl<F> MioServiceInner<F>
204where
205    F: 'static + Send + Sync + Fn(MioEvent),
206{
207    fn run(&mut self, events: &mut mio::Events) {
208        if let Err(err) = self.poll.poll(events, None) {
209            MioError::Poll(err).report();
210        }
211
212        'events: for event in events.iter() {
213            match self.tokens.get(&event.token()) {
214                None => {}
215                Some(Token::Waker) => {
216                    while let Ok(cmd) = self.cmd_receiver.try_recv() {
217                        self.handle(cmd);
218                    }
219                }
220                Some(Token::Listener(addr)) => {
221                    let Some(mut listener) = self.listeners.remove(&addr) else {
222                        continue 'events;
223                    };
224
225                    if event.is_readable() && !listener.incomind_ready {
226                        self.send(MioEvent::IncomingConnectionIsReady { listener: addr });
227                        listener.incomind_ready = true;
228                    }
229                    self.listeners.insert(addr, listener);
230                }
231                Some(Token::Connection(mut addr)) => {
232                    let Some(mut connection) = self.connections.remove(&addr) else {
233                        continue 'events;
234                    };
235                    if event.is_error() {
236                        match connection.stream.take_error() {
237                            Ok(Some(e)) => {
238                                self.send(MioEvent::ConnectionDidClose(addr, Err(e.to_string())));
239                            }
240                            Ok(None) => {
241                                openmina_core::error!(
242                                    openmina_core::log::system_time();
243                                    summary = "mio error event without actual error",
244                                    addr = openmina_core::log::inner::field::display(addr),
245                                );
246                            }
247                            Err(e) => {
248                                openmina_core::error!(
249                                    openmina_core::log::system_time();
250                                    summary = "error getting mio error",
251                                    error = openmina_core::log::inner::field::display(e),
252                                    addr = openmina_core::log::inner::field::display(addr),
253                                );
254                            }
255                        }
256                        continue 'events;
257                    }
258                    if event.is_readable() && !connection.incoming_ready {
259                        connection.incoming_ready = true;
260                        self.send(MioEvent::IncomingDataIsReady(addr));
261                    }
262                    let mut rereg = false;
263                    if event.is_writable() {
264                        if !connection.connected {
265                            // make network debugger happy
266                            let _ = connection.stream.take_error();
267                            match connection.stream.peer_addr() {
268                                Ok(new_addr) => {
269                                    connection.connected = true;
270                                    addr.sock_addr = new_addr;
271                                    self.send(MioEvent::OutgoingConnectionDidConnect(addr, Ok(())));
272                                }
273                                Err(err) if err.kind() == io::ErrorKind::NotConnected => {
274                                    rereg = true;
275                                }
276                                #[cfg(unix)]
277                                Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {
278                                    rereg = true;
279                                }
280                                Err(err) => {
281                                    self.send(MioEvent::OutgoingConnectionDidConnect(
282                                        addr,
283                                        Err(err.to_string()),
284                                    ));
285                                    continue;
286                                }
287                            }
288                        } else {
289                            while let Some((buf, mut offset)) = connection.transmits.pop_front() {
290                                connection.queued_bytes -= buf.len() - offset;
291                                match connection.stream.write(&buf[offset..]) {
292                                    Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
293                                        connection.queued_bytes += buf.len() - offset;
294                                        connection.transmits.push_front((buf, offset));
295                                        rereg = true;
296                                        break;
297                                    }
298                                    Err(err) => {
299                                        self.send(MioEvent::OutgoingDataDidSend(
300                                            addr,
301                                            Err(err.to_string()),
302                                        ));
303                                        // drop the connection
304                                        continue 'events;
305                                    }
306                                    Ok(len) => {
307                                        rereg = true;
308                                        offset += len;
309                                        if offset == buf.len() {
310                                            self.send(MioEvent::OutgoingDataDidSend(addr, Ok(())));
311                                        } else {
312                                            connection.queued_bytes += buf.len() - offset;
313                                            connection.transmits.push_front((buf, offset));
314                                        }
315                                    }
316                                }
317                            }
318                        }
319                    }
320                    let interests = if connection.incoming_ready {
321                        mio::Interest::WRITABLE
322                    } else {
323                        mio::Interest::READABLE | mio::Interest::WRITABLE
324                    };
325                    if rereg {
326                        if let Err(err) = self.poll.registry().reregister(
327                            &mut connection.stream,
328                            event.token(),
329                            interests,
330                        ) {
331                            self.send(MioEvent::ConnectionDidClose(addr, Err(err.to_string())));
332                            continue;
333                        }
334                    }
335                    self.connections.insert(addr, connection);
336                }
337            }
338        }
339        events.clear();
340    }
341
342    fn handle(&mut self, cmd: MioCmd) {
343        use self::MioCmd::*;
344
345        match cmd {
346            ListenOn(addr) => match TcpListener::bind(addr) {
347                Ok(mut listener) => {
348                    if let Err(err) = self.poll.registry().register(
349                        &mut listener,
350                        self.tokens.register(Token::Listener(addr)),
351                        mio::Interest::READABLE,
352                    ) {
353                        self.send(MioEvent::ListenerError {
354                            listener: addr,
355                            error: err.to_string(),
356                        });
357                        MioError::Listen(addr, err).report();
358                    } else {
359                        self.listeners.insert(
360                            addr,
361                            Listener {
362                                inner: listener,
363                                incomind_ready: false,
364                            },
365                        );
366                        self.send(MioEvent::ListenerReady { listener: addr });
367                    }
368                }
369                Err(err) => {
370                    self.send(MioEvent::ListenerError {
371                        listener: addr,
372                        error: err.to_string(),
373                    });
374                    MioError::Listen(addr, err).report();
375                }
376            },
377            Accept(listener_addr) => {
378                if let Some(mut listener) = self.listeners.remove(&listener_addr) {
379                    match listener.inner.accept() {
380                        Ok((mut stream, addr)) => {
381                            let addr = ConnectionAddr {
382                                sock_addr: addr,
383                                incoming: true,
384                            };
385
386                            listener.incomind_ready = false;
387                            if let Err(err) = self.poll.registry().register(
388                                &mut stream,
389                                self.tokens.register(Token::Connection(addr)),
390                                mio::Interest::READABLE,
391                            ) {
392                                self.send(MioEvent::IncomingConnectionDidAccept(
393                                    Some(addr),
394                                    Err(err.to_string()),
395                                ));
396                            } else {
397                                self.send(MioEvent::IncomingConnectionDidAccept(
398                                    Some(addr),
399                                    Ok(()),
400                                ));
401                                let connection = Connection {
402                                    stream,
403                                    transmits: VecDeque::default(),
404                                    queued_bytes: 0,
405                                    connected: true,
406                                    incoming_ready: false,
407                                };
408                                self.connections.insert(addr, connection);
409                            }
410                            let token = self.tokens.register(Token::Listener(listener_addr));
411                            if let Err(err) = self.poll.registry().reregister(
412                                &mut listener.inner,
413                                token,
414                                mio::Interest::READABLE,
415                            ) {
416                                MioError::Listen(listener_addr, err).report();
417                            } else {
418                                self.listeners.insert(listener_addr, listener);
419                            }
420                        }
421                        Err(err) => {
422                            self.send(MioEvent::IncomingConnectionDidAccept(
423                                None,
424                                Err(err.to_string()),
425                            ));
426                        }
427                    }
428                } else {
429                    self.send(MioEvent::IncomingConnectionDidAccept(
430                        None,
431                        Err("no such listener".to_owned()),
432                    ));
433                }
434            }
435            Refuse(addr) => {
436                if let Some(listener) = self.listeners.get_mut(&addr) {
437                    if let Ok((stream, _)) = listener.inner.accept() {
438                        listener.incomind_ready = false;
439                        stream.shutdown(Shutdown::Both).unwrap_or_default();
440                    }
441                } else {
442                    self.send(MioEvent::IncomingConnectionDidAccept(
443                        None,
444                        Err("no such listener".to_owned()),
445                    ));
446                }
447            }
448            Connect(addr) => {
449                match TcpStream::connect(addr) {
450                    Ok(mut stream) => {
451                        let addr = ConnectionAddr {
452                            sock_addr: addr,
453                            incoming: false,
454                        };
455
456                        if let Err(err) = self.poll.registry().register(
457                            &mut stream,
458                            self.tokens.register(Token::Connection(addr)),
459                            mio::Interest::WRITABLE,
460                        ) {
461                            self.send(MioEvent::OutgoingConnectionDidConnect(
462                                addr,
463                                Err(err.to_string()),
464                            ));
465                        } else {
466                            self.connections.insert(
467                                addr,
468                                Connection {
469                                    stream,
470                                    transmits: VecDeque::default(),
471                                    queued_bytes: 0,
472                                    connected: false,
473                                    incoming_ready: false,
474                                },
475                            );
476                        }
477                    }
478                    Err(err) => self.send(MioEvent::OutgoingConnectionDidConnect(
479                        ConnectionAddr {
480                            sock_addr: addr,
481                            incoming: false,
482                        },
483                        Err(err.to_string()),
484                    )),
485                };
486            }
487            Recv(addr, limit) => {
488                if let Some(mut connection) = self.connections.remove(&addr) {
489                    // Ensure the buffer has enough space for the requested limit
490                    if limit > self.recv_buf.len() {
491                        // TODO: upper bound? resize to `limit` or try to allocate some extra space too?
492                        self.recv_buf.resize(limit, 0);
493
494                        openmina_core::warn!(
495                            openmina_core::log::system_time();
496                            summary = format!("Increasing buffer size to {}kb", limit / 1024)
497                        );
498                    }
499
500                    let mut keep = false;
501                    match connection.stream.read(&mut self.recv_buf[..limit]) {
502                        Ok(0) => self.send(MioEvent::ConnectionDidClose(addr, Ok(()))),
503                        Ok(read) => {
504                            self.send(MioEvent::IncomingDataDidReceive(
505                                addr,
506                                Ok(self.recv_buf[..read].to_vec().into()),
507                            ));
508                            self.send(MioEvent::IncomingDataIsReady(addr));
509                            keep = true;
510                        }
511                        Err(err)
512                            if err.kind() == io::ErrorKind::WouldBlock
513                                || err.kind() == io::ErrorKind::Interrupted =>
514                        {
515                            connection.incoming_ready = false;
516                            keep = true;
517                        }
518                        Err(err) => {
519                            self.send(MioEvent::IncomingDataDidReceive(addr, Err(err.to_string())));
520                            self.send(MioEvent::ConnectionDidClose(addr, Ok(())));
521                        }
522                    };
523
524                    if keep {
525                        let interests =
526                            match (connection.incoming_ready, connection.transmits.is_empty()) {
527                                (false, false) => {
528                                    Some(mio::Interest::READABLE | mio::Interest::WRITABLE)
529                                }
530                                (false, true) => Some(mio::Interest::READABLE),
531                                (true, false) => Some(mio::Interest::WRITABLE),
532                                (true, true) => None,
533                            };
534
535                        if let Some(interests) = interests {
536                            let token = self.tokens.register(Token::Connection(addr));
537                            if let Err(err) = self.poll.registry().reregister(
538                                &mut connection.stream,
539                                token,
540                                interests,
541                            ) {
542                                MioError::Register(addr.sock_addr, err).report();
543                                return;
544                            }
545                        }
546                        self.connections.insert(addr, connection);
547                    }
548                }
549            }
550            Send(addr, buf) => {
551                if let Some(connection) = self.connections.get_mut(&addr) {
552                    connection.queued_bytes += buf.len();
553                    connection.transmits.push_back((buf, 0));
554                    if connection.transmits.len() > 1 && connection.queued_bytes > MAX_QUEUED_BYTES
555                    {
556                        self.connections.remove(&addr);
557                        // the peer is too slow, it requires us to send more and more,
558                        // but cannot accept the data
559                        let msg = "probably malicious".to_string();
560                        self.send(MioEvent::ConnectionDidClose(addr, Err(msg)));
561                        return;
562                    }
563                    let interests =
564                        match (connection.incoming_ready, connection.transmits.is_empty()) {
565                            (false, false) => {
566                                Some(mio::Interest::READABLE | mio::Interest::WRITABLE)
567                            }
568                            (false, true) => Some(mio::Interest::READABLE),
569                            (true, false) => Some(mio::Interest::WRITABLE),
570                            (true, true) => None,
571                        };
572                    if let Some(interests) = interests {
573                        let token = self.tokens.register(Token::Connection(addr));
574                        if let Err(err) = self.poll.registry().reregister(
575                            &mut connection.stream,
576                            token,
577                            interests,
578                        ) {
579                            self.connections.remove(&addr);
580                            self.send(MioEvent::ConnectionDidClose(addr, Err(err.to_string())));
581                        }
582                    }
583                }
584            }
585            Disconnect(addr) => {
586                // drop the connection and destructor will close it
587                if let Some(mut cn) = self.connections.remove(&addr) {
588                    self.poll
589                        .registry()
590                        .deregister(&mut cn.stream)
591                        .unwrap_or_default();
592                }
593                self.send(MioEvent::ConnectionDidCloseOnDemand(addr));
594            }
595        }
596    }
597
598    pub fn send(&self, event: MioEvent) {
599        (self.event_sender)(event);
600    }
601}