From 0c4be4541cd5577a780ae6cb4f28589a3e822395 Mon Sep 17 00:00:00 2001 From: soruh Date: Sat, 18 Mar 2023 22:36:04 +0100 Subject: [PATCH] split code into modules --- src/main.rs | 349 +++---------------------------------------------- src/packets.rs | 65 +-------- src/ports.rs | 11 +- 3 files changed, 26 insertions(+), 399 deletions(-) diff --git a/src/main.rs b/src/main.rs index 110a646..0e82a38 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,38 +8,32 @@ use std::{ time::Duration, }; -use anyhow::{bail, Context}; use debug_server::debug_server; use futures::Future; -use packets::{Header, Packet, RemConnect}; +use packets::{Header, Packet}; use serde::{Deserialize, Deserializer}; use time::format_description::OwnedFormatItem; use tokio::{ io::AsyncWriteExt, - net::{TcpListener, TcpStream}, - select, + net::TcpListener, sync::Mutex, - time::{sleep, timeout, Instant}, + time::{sleep, Instant}, }; -use tracing::{error, info, trace, warn, Level}; +use tracing::{error, info, warn, Level}; -use crate::packets::{dyn_ip_update, PacketKind, REJECT_OOP, REJECT_TIMEOUT}; -use crate::ports::{AllowedPorts, PortHandler, PortStatus}; - -const AUTH_TIMEOUT: Duration = Duration::from_secs(30); -const CALL_ACK_TIMEOUT: Duration = Duration::from_secs(30); -const CALL_TIMEOUT: Duration = Duration::from_secs(24 * 60 * 60); -const PORT_RETRY_TIME: Duration = Duration::from_secs(15 * 60); -const PORT_OWNERSHIP_TIMEOUT: Duration = Duration::from_secs(1 * 60 * 60); -const PING_TIMEOUT: Duration = Duration::from_secs(30); -const SEND_PING_INTERVAL: Duration = Duration::from_secs(20); - -const CACHE_STORE_INTERVAL: Duration = Duration::from_secs(5); +use crate::{ + client::connection_handler, + ports::{AllowedPorts, PortHandler, PortStatus}, +}; +use crate::{constants::CACHE_STORE_INTERVAL, packets::PacketKind}; +pub mod auth; +pub mod client; +pub mod constants; #[cfg(feature = "debug_server")] -mod debug_server; -mod packets; -mod ports; +pub mod debug_server; +pub mod packets; +pub mod ports; type Port = u16; type Number = u32; @@ -152,10 +146,11 @@ fn main() -> anyhow::Result<()> { TIME_FORMAT.set(config.time_format.clone()).unwrap(); + // we need to get this while still single threaded + // as getting the time zone offset in a multithreaded programm + // is UB in some environments TIME_ZONE_OFFSET - .set(time::UtcOffset::local_offset_at( - time::OffsetDateTime::UNIX_EPOCH, - )?) + .set(time::UtcOffset::current_local_offset()?) .unwrap(); tokio::runtime::Builder::new_multi_thread() @@ -328,312 +323,8 @@ fn main() -> anyhow::Result<()> { } #[derive(Debug, Default)] -struct HandlerMetadata { +pub struct HandlerMetadata { number: Option, port: Option, listener: Option, } - -async fn connection_handler( - config: &Config, - handler_metadata: &mut HandlerMetadata, - port_handler: &Mutex, - stream: &mut TcpStream, -) -> anyhow::Result<()> { - let (mut reader, mut writer) = stream.split(); - - let mut packet = Packet::default(); - - match timeout(AUTH_TIMEOUT, packet.recv_into_cancelation_safe(&mut reader)).await { - Ok(res) => res?, - Err(_) => { - writer.write_all(REJECT_TIMEOUT).await?; - return Ok(()); - } - } - - let RemConnect { number, pin } = packet.as_rem_connect()?; - - handler_metadata.number = Some(number); - - let mut authenticated = false; - let port = loop { - let mut updated_server = false; - - let port = port_handler - .lock() - .await - .allocate_port_for_number(config, number); - - info!(port, "allocated port"); - - let Some(port) = port else { - writer.write_all(REJECT_OOP).await?; - return Ok(()); - }; - - // make sure the client is authenticated before opening any ports - if !authenticated { - let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port) - .await - .context("dy-ip update")?; - authenticated = true; - updated_server = true; - } - - let mut port_handler = port_handler.lock().await; - - let listener = if let Some((listener, _packet)) = port_handler.stop_rejector(port).await { - Ok(listener) - } else { - TcpListener::bind((config.listen_addr.ip(), port)).await - }; - - match listener { - Ok(listener) => { - // make sure that if we have an error, we still have access - // to the listener in the error handler. - handler_metadata.listener = Some(listener); - - // if we authenticated a client for a port we then failed to open - // we need to update the server here once a port that can be opened - // has been found - if !updated_server { - let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port) - .await - .context("dy-ip update")?; - } - - port_handler.register_update(); - port_handler - .port_state - .entry(port) - .or_default() - .new_state(PortStatus::Idle); - - handler_metadata.port = Some(port); - - break port; - } - Err(_err) => { - port_handler.mark_port_error(number, port); - continue; - } - }; - }; - - let listener = handler_metadata.listener.as_mut().unwrap(); // we only break from the loop if this is set - - packet.header = Header { - kind: PacketKind::RemConfirm.raw(), - length: 0, - }; - packet.data.clear(); - packet.send(&mut writer).await?; - - #[derive(Debug)] - enum Result { - Caller { - packet: Packet, - stream: TcpStream, - addr: SocketAddr, - }, - Packet { - packet: Packet, - }, - } - - let mut last_ping_sent_at = Instant::now(); - let mut last_ping_received_at = Instant::now(); - - let result = loop { - trace!( - seconds = SEND_PING_INTERVAL - .saturating_sub(last_ping_sent_at.elapsed()) - .as_secs(), - "next ping in" - ); - trace!( - seconds = PING_TIMEOUT - .saturating_sub(last_ping_received_at.elapsed()) - .as_secs(), - "timeout in", - ); - - let send_next_ping_in = SEND_PING_INTERVAL.saturating_sub(last_ping_sent_at.elapsed()); - let next_ping_expected_in = PING_TIMEOUT.saturating_sub(last_ping_received_at.elapsed()); - - select! { - caller = listener.accept() => { - let (stream, addr) = caller?; - break Result::Caller { packet, stream, addr } - }, - _ = Packet::peek_packet_kind(&mut reader) => { - packet.recv_into(&mut reader).await?; - - if packet.kind() == PacketKind::Ping { - trace!("received ping"); - last_ping_received_at = Instant::now(); - } else { - break Result::Packet { packet } - } - }, - _ = sleep(send_next_ping_in) => { - trace!("sending ping"); - writer.write_all(bytemuck::bytes_of(& Header { kind: PacketKind::Ping.raw(), length: 0 })).await?; - last_ping_sent_at = Instant::now(); - } - _ = sleep(next_ping_expected_in) => { - writer.write_all(REJECT_TIMEOUT).await?; - return Ok(()); - } - } - }; - - let (mut client, mut packet) = match result { - Result::Packet { mut packet } => { - if matches!( - packet.kind(), - packets::PacketKind::End | packets::PacketKind::Reject - ) { - info!(?packet, "got disconnect packet"); - - if packet.kind() == packets::PacketKind::End { - packet.header.kind = packets::PacketKind::Reject.raw(); - packet.data.clear(); - packet.data.extend_from_slice(b"nc\0"); - packet.header.length = packet.data.len() as u8; - } - - port_handler.lock().await.start_rejector( - port, - handler_metadata - .listener - .take() - .expect("tried to start rejector twice"), - packet, - )?; - return Ok(()); - } else { - bail!("unexpected packet: {:?}", packet.kind()) - } - } - Result::Caller { - mut packet, - stream, - addr, - } => { - info!(%addr, "got caller from"); - - packet.data.clear(); - /* The I-Telex Clients can't handle data in this packet due to a bug - match addr.ip() { - IpAddr::V4(addr) => packet.data.extend_from_slice(&addr.octets()), - IpAddr::V6(addr) => packet.data.extend_from_slice(&addr.octets()), - } - */ - packet.header = Header { - kind: PacketKind::RemCall.raw(), - length: packet.data.len() as u8, - }; - - packet.send(&mut writer).await?; - - (stream, packet) - } - }; - - match timeout( - CALL_ACK_TIMEOUT, - packet.recv_into_cancelation_safe(&mut reader), - ) - .await - { - Ok(res) => res?, - Err(_) => { - writer.write_all(REJECT_TIMEOUT).await?; - return Ok(()); - } - } - - match packet.kind() { - PacketKind::End | PacketKind::Reject => { - port_handler.lock().await.start_rejector( - port, - handler_metadata - .listener - .take() - .expect("tried to start rejector twice"), - packet, - )?; - - return Ok(()); - } - - PacketKind::RemAck => { - packet.header = Header { - kind: PacketKind::Reject.raw(), - length: 4, - }; - packet.data.clear(); - packet.data.extend_from_slice(b"occ"); - packet.data.push(0); - - { - let mut port_handler = port_handler.lock().await; - - port_handler.register_update(); - port_handler - .port_state - .entry(port) - .or_default() - .new_state(PortStatus::InCall); - - port_handler.start_rejector( - port, - handler_metadata - .listener - .take() - .expect("tried to start rejector twice"), - packet, - )?; - } - - stream.set_nodelay(true)?; - client.set_nodelay(true)?; - - let _ = timeout( - CALL_TIMEOUT, - tokio::io::copy_bidirectional(stream, &mut client), - ) - .await; - - { - let mut port_handler = port_handler.lock().await; - - port_handler.register_update(); - port_handler - .port_state - .entry(port) - .or_default() - .new_state(PortStatus::Disconnected); - - port_handler - .change_rejector(port, |packet| { - packet.data.clear(); - packet.data.extend_from_slice(b"nc"); - packet.data.push(0); - packet.header = Header { - kind: PacketKind::Reject.raw(), - length: packet.data.len() as u8, - }; - }) - .await?; - } - - return Ok(()); - } - - kind => bail!("unexpected packet: {:?}", kind), - } -} diff --git a/src/packets.rs b/src/packets.rs index b506e2c..2750440 100644 --- a/src/packets.rs +++ b/src/packets.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, net::SocketAddr}; +use std::fmt::Debug; use anyhow::bail; use bytemuck::{Pod, Zeroable}; @@ -6,7 +6,6 @@ use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::tcp::{ReadHalf, WriteHalf}, }; -use tracing::debug; pub const REJECT_OOP: &[u8; 6] = b"\x04\x04oop\x00"; pub const REJECT_TIMEOUT: &[u8; 10] = b"\x04\x08timeout\x00"; @@ -177,65 +176,3 @@ impl Packet { }) } } - -pub async fn dyn_ip_update( - server: &SocketAddr, - number: u32, - pin: u16, - port: u16, -) -> anyhow::Result { - debug!(%number, %port, "starting dyn ip update"); - - let mut packet = Packet::default(); - packet.header = Header { - kind: PacketKind::DynIpUpdate.raw(), - length: 8, - }; - - packet.data.clear(); - packet.data.reserve(packet.header.length as usize); - packet.data.extend_from_slice(&number.to_le_bytes()); - packet.data.extend_from_slice(&pin.to_le_bytes()); - packet.data.extend_from_slice(&port.to_le_bytes()); - - let mut socket = tokio::net::TcpStream::connect(server).await?; - - let (mut reader, mut writer) = socket.split(); - - packet.send(&mut writer).await?; - - packet.recv_into(&mut reader).await?; - - let result = match packet.kind() { - PacketKind::DynIpUpdateResponse => Ok(<[u8; 4]>::try_from(packet.data) - .map_err(|err| { - anyhow::anyhow!( - "too little data for ip address. Need 4 bytes got {}", - err.len() - ) - })? - .into()), - PacketKind::Error => { - let first_zero = packet - .data - .iter() - .enumerate() - .find_map(|(i, x)| (*x == 0).then_some(i)); - - bail!( - "{}", - std::str::from_utf8( - first_zero - .map(|i| &packet.data[..i]) - .unwrap_or(&packet.data), - )? - ) - } - - _ => bail!("server returned unexpected packet"), - }; - - debug!(?result, "finished dyn ip update"); - - result -} diff --git a/src/ports.rs b/src/ports.rs index b891452..88d5750 100644 --- a/src/ports.rs +++ b/src/ports.rs @@ -16,8 +16,9 @@ use tokio::{net::TcpListener, sync::Mutex, task::JoinHandle, time::Instant}; use tracing::{error, info, warn}; use crate::{ - packets::Packet, spawn, Config, Number, Port, UnixTimestamp, PORT_OWNERSHIP_TIMEOUT, - PORT_RETRY_TIME, TIME_FORMAT, TIME_ZONE_OFFSET, + constants::{PORT_OWNERSHIP_TIMEOUT, PORT_RETRY_TIME}, + packets::Packet, + spawn, Config, Number, Port, UnixTimestamp, TIME_FORMAT, TIME_ZONE_OFFSET, }; #[derive(Default, Serialize, Deserialize)] @@ -421,15 +422,13 @@ impl PortHandler { self.port_state .get(port) .map(|port_state| { - dbg!(port_state).status == PortStatus::Disconnected - && dbg!(now.saturating_sub(Duration::from_secs(port_state.last_change))) + port_state.status == PortStatus::Disconnected + && now.saturating_sub(Duration::from_secs(port_state.last_change)) >= PORT_OWNERSHIP_TIMEOUT }) .unwrap_or(true) }); - dbg!(&removable_entry); - if let Some((&old_number, &port)) = removable_entry { self.register_update(); info!(port, old_number, "reused port");