From 6ec0d1f12284a27a8c78ae7f460e412383273bdc Mon Sep 17 00:00:00 2001 From: soruh Date: Sat, 4 Mar 2023 19:16:06 +0100 Subject: [PATCH] timeout cleanup --- src/main.rs | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/src/main.rs b/src/main.rs index 05bc274..7950768 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,7 +19,7 @@ use tokio::{ net::{TcpListener, TcpStream}, select, sync::Mutex, - time::{sleep, Instant}, + time::{sleep, timeout, Instant}, }; use crate::packets::{dyn_ip_update, PacketKind, REJECT_OOP, REJECT_TIMEOUT}; @@ -222,9 +222,9 @@ async fn connection_handler( let mut packet = Packet::default(); - select! { - res = packet.recv_into_cancelation_safe(&mut reader) => res?, - _ = sleep(AUTH_TIMEOUT) => { + match timeout(AUTH_TIMEOUT, packet.recv_into_cancelation_safe(&mut reader)).await { + Ok(res) => res?, + Err(_) => { writer.write_all(REJECT_TIMEOUT).await?; return Ok(()); } @@ -325,10 +325,12 @@ async fn connection_handler( let mut last_ping_received_at = Instant::now(); let result = loop { - let now = Instant::now(); // println!("next ping in {:?}s", SEND_PING_INTERVAL.saturating_sub(now.saturating_duration_since(last_ping_sent_at)).as_secs()); // println!("will timeout in in {:?}s", PING_TIMEOUT.saturating_sub(now.saturating_duration_since(last_ping_received_at)).as_secs()); + let send_next_ping_in = SEND_PING_INTERVAL.saturating_sub(last_ping_sent_at.elapsed()); + let next_ping_expected_in = PING_TIMEOUT.saturating_sub(last_ping_received_at.elapsed()); + select! { caller = listener.accept() => { let (stream, addr) = caller?; @@ -344,12 +346,12 @@ async fn connection_handler( break Result::Packet { packet } } }, - _ = sleep(SEND_PING_INTERVAL.saturating_sub(now.saturating_duration_since(last_ping_sent_at))) => { + _ = sleep(send_next_ping_in) => { // println!("sending ping"); writer.write_all(bytemuck::bytes_of(& Header { kind: PacketKind::Ping.raw(), length: 0 })).await?; last_ping_sent_at = Instant::now(); } - _ = sleep(PING_TIMEOUT.saturating_sub(now.saturating_duration_since(last_ping_received_at))) => { + _ = sleep(next_ping_expected_in) => { writer.write_all(REJECT_TIMEOUT).await?; return Ok(()); } @@ -402,9 +404,14 @@ async fn connection_handler( } }; - select! { - res = packet.recv_into_cancelation_safe(&mut reader) => res?, - _ = sleep(CALL_ACK_TIMEOUT) => { + match timeout( + CALL_ACK_TIMEOUT, + packet.recv_into_cancelation_safe(&mut reader), + ) + .await + { + Ok(res) => res?, + Err(_) => { writer.write_all(REJECT_TIMEOUT).await?; return Ok(()); } @@ -456,10 +463,11 @@ async fn connection_handler( stream.set_nodelay(true)?; client.set_nodelay(true)?; - select! { - _ = tokio::io::copy_bidirectional(stream, &mut client) => {} - _ = sleep(CALL_TIMEOUT) => {} - } + let _ = timeout( + CALL_TIMEOUT, + tokio::io::copy_bidirectional(stream, &mut client), + ) + .await; { let mut port_handler = port_handler.lock().await;