timeout cleanup

This commit is contained in:
soruh 2023-03-04 19:16:06 +01:00
parent 07d62937c0
commit 6ec0d1f122

View File

@ -19,7 +19,7 @@ use tokio::{
net::{TcpListener, TcpStream}, net::{TcpListener, TcpStream},
select, select,
sync::Mutex, sync::Mutex,
time::{sleep, Instant}, time::{sleep, timeout, Instant},
}; };
use crate::packets::{dyn_ip_update, PacketKind, REJECT_OOP, REJECT_TIMEOUT}; use crate::packets::{dyn_ip_update, PacketKind, REJECT_OOP, REJECT_TIMEOUT};
@ -222,9 +222,9 @@ async fn connection_handler(
let mut packet = Packet::default(); let mut packet = Packet::default();
select! { match timeout(AUTH_TIMEOUT, packet.recv_into_cancelation_safe(&mut reader)).await {
res = packet.recv_into_cancelation_safe(&mut reader) => res?, Ok(res) => res?,
_ = sleep(AUTH_TIMEOUT) => { Err(_) => {
writer.write_all(REJECT_TIMEOUT).await?; writer.write_all(REJECT_TIMEOUT).await?;
return Ok(()); return Ok(());
} }
@ -325,10 +325,12 @@ async fn connection_handler(
let mut last_ping_received_at = Instant::now(); let mut last_ping_received_at = Instant::now();
let result = loop { let result = loop {
let now = Instant::now();
// println!("next ping in {:?}s", SEND_PING_INTERVAL.saturating_sub(now.saturating_duration_since(last_ping_sent_at)).as_secs()); // println!("next ping in {:?}s", SEND_PING_INTERVAL.saturating_sub(now.saturating_duration_since(last_ping_sent_at)).as_secs());
// println!("will timeout in in {:?}s", PING_TIMEOUT.saturating_sub(now.saturating_duration_since(last_ping_received_at)).as_secs()); // println!("will timeout in in {:?}s", PING_TIMEOUT.saturating_sub(now.saturating_duration_since(last_ping_received_at)).as_secs());
let send_next_ping_in = SEND_PING_INTERVAL.saturating_sub(last_ping_sent_at.elapsed());
let next_ping_expected_in = PING_TIMEOUT.saturating_sub(last_ping_received_at.elapsed());
select! { select! {
caller = listener.accept() => { caller = listener.accept() => {
let (stream, addr) = caller?; let (stream, addr) = caller?;
@ -344,12 +346,12 @@ async fn connection_handler(
break Result::Packet { packet } break Result::Packet { packet }
} }
}, },
_ = sleep(SEND_PING_INTERVAL.saturating_sub(now.saturating_duration_since(last_ping_sent_at))) => { _ = sleep(send_next_ping_in) => {
// println!("sending ping"); // println!("sending ping");
writer.write_all(bytemuck::bytes_of(& Header { kind: PacketKind::Ping.raw(), length: 0 })).await?; writer.write_all(bytemuck::bytes_of(& Header { kind: PacketKind::Ping.raw(), length: 0 })).await?;
last_ping_sent_at = Instant::now(); last_ping_sent_at = Instant::now();
} }
_ = sleep(PING_TIMEOUT.saturating_sub(now.saturating_duration_since(last_ping_received_at))) => { _ = sleep(next_ping_expected_in) => {
writer.write_all(REJECT_TIMEOUT).await?; writer.write_all(REJECT_TIMEOUT).await?;
return Ok(()); return Ok(());
} }
@ -402,9 +404,14 @@ async fn connection_handler(
} }
}; };
select! { match timeout(
res = packet.recv_into_cancelation_safe(&mut reader) => res?, CALL_ACK_TIMEOUT,
_ = sleep(CALL_ACK_TIMEOUT) => { packet.recv_into_cancelation_safe(&mut reader),
)
.await
{
Ok(res) => res?,
Err(_) => {
writer.write_all(REJECT_TIMEOUT).await?; writer.write_all(REJECT_TIMEOUT).await?;
return Ok(()); return Ok(());
} }
@ -456,10 +463,11 @@ async fn connection_handler(
stream.set_nodelay(true)?; stream.set_nodelay(true)?;
client.set_nodelay(true)?; client.set_nodelay(true)?;
select! { let _ = timeout(
_ = tokio::io::copy_bidirectional(stream, &mut client) => {} CALL_TIMEOUT,
_ = sleep(CALL_TIMEOUT) => {} tokio::io::copy_bidirectional(stream, &mut client),
} )
.await;
{ {
let mut port_handler = port_handler.lock().await; let mut port_handler = port_handler.lock().await;