1mod token;
2use self::token::{Token, TokenRegistry};
3
4use std::{
5 collections::{BTreeMap, VecDeque},
6 io::{self, Read, Write},
7 net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr},
8 process,
9};
10
11use libp2p_identity::Keypair;
12use mio::net::{TcpListener, TcpStream};
13
14use openmina_core::{bug_condition, channels::mpsc};
15use thiserror::Error;
16
17use crate::{ConnectionAddr, MioCmd, MioEvent};
18
19#[derive(Debug, Error)]
20enum MioError {
21 #[error("mio failed to create poll instance, fatal error: {0}")]
22 New(io::Error),
23 #[error("mio failed to create waker instance, fatal error: {0}")]
24 Waker(io::Error),
25 #[error("mio failed to poll events, error: {0}")]
26 Poll(io::Error),
27 #[error("mio failed to start listening on {0}, error: {1}")]
28 Listen(SocketAddr, io::Error),
29 #[error("mio failed to register the socket on {0}, error: {1}")]
30 Register(SocketAddr, io::Error),
31}
32
33impl MioError {
34 fn report(self) {
35 openmina_core::log::error!(
36 openmina_core::log::system_time();
37 kind = "MioError",
38 summary = self.to_string(),
39 );
40 }
41}
42
43const MAX_QUEUED_BYTES: usize = 0x4000000;
45
46#[derive(Debug)]
47#[allow(clippy::large_enum_variant)]
48pub enum MioService {
49 Pending(Keypair),
50 Ready(MioRunningService),
51}
52
53#[derive(Debug)]
54pub struct MioRunningService {
55 keypair: Keypair,
56 cmd_sender: mpsc::UnboundedSender<MioCmd>,
57 waker: Option<mio::Waker>,
58}
59
60impl redux::TimeService for MioService {}
61
62impl redux::Service for MioService {}
63
64impl MioService {
65 pub fn pending(keypair: Keypair) -> Self {
66 Self::Pending(keypair)
67 }
68
69 pub fn run<F>(&mut self, event_sender: F)
70 where
71 F: 'static + Send + Sync + Fn(MioEvent),
72 {
73 *self = match self {
74 Self::Pending(keypair) => {
75 MioService::Ready(MioRunningService::run(event_sender, keypair.clone()))
76 }
77 _ => {
78 openmina_core::warn!(openmina_core::log::system_time(); "tried to run already running mio service");
79 return;
80 }
81 }
82 }
83
84 pub fn keypair(&self) -> &Keypair {
85 match self {
86 Self::Pending(keypair) => keypair,
87 Self::Ready(s) => &s.keypair,
88 }
89 }
90
91 pub fn pending_cmds(&self) -> usize {
92 match self {
93 Self::Pending(_) => 0,
94 Self::Ready(v) => v.cmd_sender.len(),
95 }
96 }
97
98 pub fn send_cmd(&mut self, cmd: MioCmd) {
99 let MioService::Ready(service) = self else {
100 bug_condition!("mio service is not initialized");
101 return;
102 };
103 service.cmd_sender.send(cmd).unwrap_or_default();
104 if let Some(w) = service.waker.as_ref() {
105 w.wake().unwrap_or_default()
106 }
107 }
108
109 pub fn mocked(keypair: Keypair) -> Self {
110 MioService::Ready(MioRunningService::mocked(keypair))
111 }
112}
113
114impl MioRunningService {
115 fn mocked(keypair: Keypair) -> Self {
116 MioRunningService {
117 keypair,
118 cmd_sender: mpsc::unbounded_channel().0,
119 waker: None,
120 }
121 }
122
123 fn run<F>(event_sender: F, keypair: Keypair) -> Self
124 where
125 F: 'static + Send + Sync + Fn(MioEvent),
126 {
127 let poll = match mio::Poll::new() {
128 Ok(v) => v,
129 Err(err) => {
130 MioError::New(err).report();
131 process::exit(1);
132 }
133 };
134
135 let mut tokens = TokenRegistry::default();
136 let waker = match mio::Waker::new(poll.registry(), tokens.register(Token::Waker)) {
137 Ok(v) => v,
138 Err(err) => {
139 MioError::Waker(err).report();
140 process::exit(1);
141 }
142 };
143
144 let (tx, rx) = mpsc::unbounded_channel();
145
146 let mut inner = MioServiceInner {
147 poll,
148 event_sender,
149 cmd_receiver: rx,
150 tokens,
151 listeners: BTreeMap::default(),
152 connections: BTreeMap::default(),
153 recv_buf: vec![0; 0x8000],
154 };
155
156 std::thread::Builder::new()
157 .name("mio-service".into())
158 .spawn(move || {
159 inner.send(MioEvent::InterfaceDetected(IpAddr::V4(
161 Ipv4Addr::UNSPECIFIED,
162 )));
163
164 let mut events = mio::Events::with_capacity(1024);
165
166 loop {
167 inner.run(&mut events);
168 }
169 })
170 .expect("Failed: mio-service");
171
172 MioRunningService {
173 keypair,
174 cmd_sender: tx,
175 waker: Some(waker),
176 }
177 }
178}
179
180struct MioServiceInner<F> {
181 poll: mio::Poll,
182 event_sender: F,
183 cmd_receiver: mpsc::UnboundedReceiver<MioCmd>,
184 tokens: TokenRegistry,
185 listeners: BTreeMap<SocketAddr, Listener>,
186 connections: BTreeMap<ConnectionAddr, Connection>,
187 recv_buf: Vec<u8>,
188}
189
190struct Listener {
191 inner: TcpListener,
192 incomind_ready: bool,
193}
194
195struct Connection {
196 stream: TcpStream,
197 transmits: VecDeque<(Box<[u8]>, usize)>,
198 queued_bytes: usize,
199 connected: bool,
200 incoming_ready: bool,
201}
202
203impl<F> MioServiceInner<F>
204where
205 F: 'static + Send + Sync + Fn(MioEvent),
206{
207 fn run(&mut self, events: &mut mio::Events) {
208 if let Err(err) = self.poll.poll(events, None) {
209 MioError::Poll(err).report();
210 }
211
212 'events: for event in events.iter() {
213 match self.tokens.get(&event.token()) {
214 None => {}
215 Some(Token::Waker) => {
216 while let Ok(cmd) = self.cmd_receiver.try_recv() {
217 self.handle(cmd);
218 }
219 }
220 Some(Token::Listener(addr)) => {
221 let Some(mut listener) = self.listeners.remove(&addr) else {
222 continue 'events;
223 };
224
225 if event.is_readable() && !listener.incomind_ready {
226 self.send(MioEvent::IncomingConnectionIsReady { listener: addr });
227 listener.incomind_ready = true;
228 }
229 self.listeners.insert(addr, listener);
230 }
231 Some(Token::Connection(mut addr)) => {
232 let Some(mut connection) = self.connections.remove(&addr) else {
233 continue 'events;
234 };
235 if event.is_error() {
236 match connection.stream.take_error() {
237 Ok(Some(e)) => {
238 self.send(MioEvent::ConnectionDidClose(addr, Err(e.to_string())));
239 }
240 Ok(None) => {
241 openmina_core::error!(
242 openmina_core::log::system_time();
243 summary = "mio error event without actual error",
244 addr = openmina_core::log::inner::field::display(addr),
245 );
246 }
247 Err(e) => {
248 openmina_core::error!(
249 openmina_core::log::system_time();
250 summary = "error getting mio error",
251 error = openmina_core::log::inner::field::display(e),
252 addr = openmina_core::log::inner::field::display(addr),
253 );
254 }
255 }
256 continue 'events;
257 }
258 if event.is_readable() && !connection.incoming_ready {
259 connection.incoming_ready = true;
260 self.send(MioEvent::IncomingDataIsReady(addr));
261 }
262 let mut rereg = false;
263 if event.is_writable() {
264 if !connection.connected {
265 let _ = connection.stream.take_error();
267 match connection.stream.peer_addr() {
268 Ok(new_addr) => {
269 connection.connected = true;
270 addr.sock_addr = new_addr;
271 self.send(MioEvent::OutgoingConnectionDidConnect(addr, Ok(())));
272 }
273 Err(err) if err.kind() == io::ErrorKind::NotConnected => {
274 rereg = true;
275 }
276 #[cfg(unix)]
277 Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {
278 rereg = true;
279 }
280 Err(err) => {
281 self.send(MioEvent::OutgoingConnectionDidConnect(
282 addr,
283 Err(err.to_string()),
284 ));
285 continue;
286 }
287 }
288 } else {
289 while let Some((buf, mut offset)) = connection.transmits.pop_front() {
290 connection.queued_bytes -= buf.len() - offset;
291 match connection.stream.write(&buf[offset..]) {
292 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
293 connection.queued_bytes += buf.len() - offset;
294 connection.transmits.push_front((buf, offset));
295 rereg = true;
296 break;
297 }
298 Err(err) => {
299 self.send(MioEvent::OutgoingDataDidSend(
300 addr,
301 Err(err.to_string()),
302 ));
303 continue 'events;
305 }
306 Ok(len) => {
307 rereg = true;
308 offset += len;
309 if offset == buf.len() {
310 self.send(MioEvent::OutgoingDataDidSend(addr, Ok(())));
311 } else {
312 connection.queued_bytes += buf.len() - offset;
313 connection.transmits.push_front((buf, offset));
314 }
315 }
316 }
317 }
318 }
319 }
320 let interests = if connection.incoming_ready {
321 mio::Interest::WRITABLE
322 } else {
323 mio::Interest::READABLE | mio::Interest::WRITABLE
324 };
325 if rereg {
326 if let Err(err) = self.poll.registry().reregister(
327 &mut connection.stream,
328 event.token(),
329 interests,
330 ) {
331 self.send(MioEvent::ConnectionDidClose(addr, Err(err.to_string())));
332 continue;
333 }
334 }
335 self.connections.insert(addr, connection);
336 }
337 }
338 }
339 events.clear();
340 }
341
342 fn handle(&mut self, cmd: MioCmd) {
343 use self::MioCmd::*;
344
345 match cmd {
346 ListenOn(addr) => match TcpListener::bind(addr) {
347 Ok(mut listener) => {
348 if let Err(err) = self.poll.registry().register(
349 &mut listener,
350 self.tokens.register(Token::Listener(addr)),
351 mio::Interest::READABLE,
352 ) {
353 self.send(MioEvent::ListenerError {
354 listener: addr,
355 error: err.to_string(),
356 });
357 MioError::Listen(addr, err).report();
358 } else {
359 self.listeners.insert(
360 addr,
361 Listener {
362 inner: listener,
363 incomind_ready: false,
364 },
365 );
366 self.send(MioEvent::ListenerReady { listener: addr });
367 }
368 }
369 Err(err) => {
370 self.send(MioEvent::ListenerError {
371 listener: addr,
372 error: err.to_string(),
373 });
374 MioError::Listen(addr, err).report();
375 }
376 },
377 Accept(listener_addr) => {
378 if let Some(mut listener) = self.listeners.remove(&listener_addr) {
379 match listener.inner.accept() {
380 Ok((mut stream, addr)) => {
381 let addr = ConnectionAddr {
382 sock_addr: addr,
383 incoming: true,
384 };
385
386 listener.incomind_ready = false;
387 if let Err(err) = self.poll.registry().register(
388 &mut stream,
389 self.tokens.register(Token::Connection(addr)),
390 mio::Interest::READABLE,
391 ) {
392 self.send(MioEvent::IncomingConnectionDidAccept(
393 Some(addr),
394 Err(err.to_string()),
395 ));
396 } else {
397 self.send(MioEvent::IncomingConnectionDidAccept(
398 Some(addr),
399 Ok(()),
400 ));
401 let connection = Connection {
402 stream,
403 transmits: VecDeque::default(),
404 queued_bytes: 0,
405 connected: true,
406 incoming_ready: false,
407 };
408 self.connections.insert(addr, connection);
409 }
410 let token = self.tokens.register(Token::Listener(listener_addr));
411 if let Err(err) = self.poll.registry().reregister(
412 &mut listener.inner,
413 token,
414 mio::Interest::READABLE,
415 ) {
416 MioError::Listen(listener_addr, err).report();
417 } else {
418 self.listeners.insert(listener_addr, listener);
419 }
420 }
421 Err(err) => {
422 self.send(MioEvent::IncomingConnectionDidAccept(
423 None,
424 Err(err.to_string()),
425 ));
426 }
427 }
428 } else {
429 self.send(MioEvent::IncomingConnectionDidAccept(
430 None,
431 Err("no such listener".to_owned()),
432 ));
433 }
434 }
435 Refuse(addr) => {
436 if let Some(listener) = self.listeners.get_mut(&addr) {
437 if let Ok((stream, _)) = listener.inner.accept() {
438 listener.incomind_ready = false;
439 stream.shutdown(Shutdown::Both).unwrap_or_default();
440 }
441 } else {
442 self.send(MioEvent::IncomingConnectionDidAccept(
443 None,
444 Err("no such listener".to_owned()),
445 ));
446 }
447 }
448 Connect(addr) => {
449 match TcpStream::connect(addr) {
450 Ok(mut stream) => {
451 let addr = ConnectionAddr {
452 sock_addr: addr,
453 incoming: false,
454 };
455
456 if let Err(err) = self.poll.registry().register(
457 &mut stream,
458 self.tokens.register(Token::Connection(addr)),
459 mio::Interest::WRITABLE,
460 ) {
461 self.send(MioEvent::OutgoingConnectionDidConnect(
462 addr,
463 Err(err.to_string()),
464 ));
465 } else {
466 self.connections.insert(
467 addr,
468 Connection {
469 stream,
470 transmits: VecDeque::default(),
471 queued_bytes: 0,
472 connected: false,
473 incoming_ready: false,
474 },
475 );
476 }
477 }
478 Err(err) => self.send(MioEvent::OutgoingConnectionDidConnect(
479 ConnectionAddr {
480 sock_addr: addr,
481 incoming: false,
482 },
483 Err(err.to_string()),
484 )),
485 };
486 }
487 Recv(addr, limit) => {
488 if let Some(mut connection) = self.connections.remove(&addr) {
489 if limit > self.recv_buf.len() {
491 self.recv_buf.resize(limit, 0);
493
494 openmina_core::warn!(
495 openmina_core::log::system_time();
496 summary = format!("Increasing buffer size to {}kb", limit / 1024)
497 );
498 }
499
500 let mut keep = false;
501 match connection.stream.read(&mut self.recv_buf[..limit]) {
502 Ok(0) => self.send(MioEvent::ConnectionDidClose(addr, Ok(()))),
503 Ok(read) => {
504 self.send(MioEvent::IncomingDataDidReceive(
505 addr,
506 Ok(self.recv_buf[..read].to_vec().into()),
507 ));
508 self.send(MioEvent::IncomingDataIsReady(addr));
509 keep = true;
510 }
511 Err(err)
512 if err.kind() == io::ErrorKind::WouldBlock
513 || err.kind() == io::ErrorKind::Interrupted =>
514 {
515 connection.incoming_ready = false;
516 keep = true;
517 }
518 Err(err) => {
519 self.send(MioEvent::IncomingDataDidReceive(addr, Err(err.to_string())));
520 self.send(MioEvent::ConnectionDidClose(addr, Ok(())));
521 }
522 };
523
524 if keep {
525 let interests =
526 match (connection.incoming_ready, connection.transmits.is_empty()) {
527 (false, false) => {
528 Some(mio::Interest::READABLE | mio::Interest::WRITABLE)
529 }
530 (false, true) => Some(mio::Interest::READABLE),
531 (true, false) => Some(mio::Interest::WRITABLE),
532 (true, true) => None,
533 };
534
535 if let Some(interests) = interests {
536 let token = self.tokens.register(Token::Connection(addr));
537 if let Err(err) = self.poll.registry().reregister(
538 &mut connection.stream,
539 token,
540 interests,
541 ) {
542 MioError::Register(addr.sock_addr, err).report();
543 return;
544 }
545 }
546 self.connections.insert(addr, connection);
547 }
548 }
549 }
550 Send(addr, buf) => {
551 if let Some(connection) = self.connections.get_mut(&addr) {
552 connection.queued_bytes += buf.len();
553 connection.transmits.push_back((buf, 0));
554 if connection.transmits.len() > 1 && connection.queued_bytes > MAX_QUEUED_BYTES
555 {
556 self.connections.remove(&addr);
557 let msg = "probably malicious".to_string();
560 self.send(MioEvent::ConnectionDidClose(addr, Err(msg)));
561 return;
562 }
563 let interests =
564 match (connection.incoming_ready, connection.transmits.is_empty()) {
565 (false, false) => {
566 Some(mio::Interest::READABLE | mio::Interest::WRITABLE)
567 }
568 (false, true) => Some(mio::Interest::READABLE),
569 (true, false) => Some(mio::Interest::WRITABLE),
570 (true, true) => None,
571 };
572 if let Some(interests) = interests {
573 let token = self.tokens.register(Token::Connection(addr));
574 if let Err(err) = self.poll.registry().reregister(
575 &mut connection.stream,
576 token,
577 interests,
578 ) {
579 self.connections.remove(&addr);
580 self.send(MioEvent::ConnectionDidClose(addr, Err(err.to_string())));
581 }
582 }
583 }
584 }
585 Disconnect(addr) => {
586 if let Some(mut cn) = self.connections.remove(&addr) {
588 self.poll
589 .registry()
590 .deregister(&mut cn.stream)
591 .unwrap_or_default();
592 }
593 self.send(MioEvent::ConnectionDidCloseOnDemand(addr));
594 }
595 }
596 }
597
598 pub fn send(&self, event: MioEvent) {
599 (self.event_sender)(event);
600 }
601}