diff --git a/src/auth.rs b/src/auth.rs new file mode 100644 index 0000000..6afe1ba --- /dev/null +++ b/src/auth.rs @@ -0,0 +1,68 @@ +use std::net::SocketAddr; + +use anyhow::bail; +use tracing::debug; + +use crate::packets::{Header, Packet, PacketKind}; + +pub async fn dyn_ip_update( + server: &SocketAddr, + number: u32, + pin: u16, + port: u16, +) -> anyhow::Result { + debug!(%number, %port, "starting dyn ip update"); + + let mut packet = Packet::default(); + packet.header = Header { + kind: PacketKind::DynIpUpdate.raw(), + length: 8, + }; + + packet.data.clear(); + packet.data.reserve(packet.header.length as usize); + packet.data.extend_from_slice(&number.to_le_bytes()); + packet.data.extend_from_slice(&pin.to_le_bytes()); + packet.data.extend_from_slice(&port.to_le_bytes()); + + let mut socket = tokio::net::TcpStream::connect(server).await?; + + let (mut reader, mut writer) = socket.split(); + + packet.send(&mut writer).await?; + + packet.recv_into(&mut reader).await?; + + let result = match packet.kind() { + PacketKind::DynIpUpdateResponse => Ok(<[u8; 4]>::try_from(packet.data) + .map_err(|err| { + anyhow::anyhow!( + "too little data for ip address. Need 4 bytes got {}", + err.len() + ) + })? + .into()), + PacketKind::Error => { + let first_zero = packet + .data + .iter() + .enumerate() + .find_map(|(i, x)| (*x == 0).then_some(i)); + + bail!( + "{}", + std::str::from_utf8( + first_zero + .map(|i| &packet.data[..i]) + .unwrap_or(&packet.data), + )? + ) + } + + _ => bail!("server returned unexpected packet"), + }; + + debug!(?result, "finished dyn ip update"); + + result +} diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..4f44100 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,319 @@ +use anyhow::{bail, Context}; +use std::{net::SocketAddr, time::Instant}; +use tokio::{ + io::AsyncWriteExt, + net::{TcpListener, TcpStream}, + select, + sync::Mutex, + time::{sleep, timeout}, +}; +use tracing::{info, trace}; + +use crate::{ + auth::dyn_ip_update, + constants::{AUTH_TIMEOUT, CALL_ACK_TIMEOUT, CALL_TIMEOUT, PING_TIMEOUT, SEND_PING_INTERVAL}, + packets::{Header, Packet, PacketKind, RemConnect, REJECT_OOP, REJECT_TIMEOUT}, + ports::{PortHandler, PortStatus}, + Config, HandlerMetadata, +}; + +pub async fn connection_handler( + config: &Config, + handler_metadata: &mut HandlerMetadata, + port_handler: &Mutex, + stream: &mut TcpStream, +) -> anyhow::Result<()> { + let (mut reader, mut writer) = stream.split(); + + let mut packet = Packet::default(); + + match timeout(AUTH_TIMEOUT, packet.recv_into_cancelation_safe(&mut reader)).await { + Ok(res) => res?, + Err(_) => { + writer.write_all(REJECT_TIMEOUT).await?; + return Ok(()); + } + } + + let RemConnect { number, pin } = packet.as_rem_connect()?; + + handler_metadata.number = Some(number); + + let mut authenticated = false; + let port = loop { + let mut updated_server = false; + + let port = port_handler + .lock() + .await + .allocate_port_for_number(config, number); + + info!(port, "allocated port"); + + let Some(port) = port else { + writer.write_all(REJECT_OOP).await?; + return Ok(()); + }; + + // make sure the client is authenticated before opening any ports + if !authenticated { + let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port) + .await + .context("dy-ip update")?; + authenticated = true; + updated_server = true; + } + + let mut port_handler = port_handler.lock().await; + + let listener = if let Some((listener, _packet)) = port_handler.stop_rejector(port).await { + Ok(listener) + } else { + TcpListener::bind((config.listen_addr.ip(), port)).await + }; + + match listener { + Ok(listener) => { + // make sure that if we have an error, we still have access + // to the listener in the error handler. + handler_metadata.listener = Some(listener); + + // if we authenticated a client for a port we then failed to open + // we need to update the server here once a port that can be opened + // has been found + if !updated_server { + let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port) + .await + .context("dy-ip update")?; + } + + port_handler.register_update(); + port_handler + .port_state + .entry(port) + .or_default() + .new_state(PortStatus::Idle); + + handler_metadata.port = Some(port); + + break port; + } + Err(_err) => { + port_handler.mark_port_error(number, port); + continue; + } + }; + }; + + let listener = handler_metadata.listener.as_mut().unwrap(); // we only break from the loop if this is set + + packet.header = Header { + kind: PacketKind::RemConfirm.raw(), + length: 0, + }; + packet.data.clear(); + packet.send(&mut writer).await?; + + #[derive(Debug)] + enum Result { + Caller { + packet: Packet, + stream: TcpStream, + addr: SocketAddr, + }, + Packet { + packet: Packet, + }, + } + + let mut last_ping_sent_at = Instant::now(); + let mut last_ping_received_at = Instant::now(); + + let result = loop { + trace!( + seconds = SEND_PING_INTERVAL + .saturating_sub(last_ping_sent_at.elapsed()) + .as_secs(), + "next ping in" + ); + trace!( + seconds = PING_TIMEOUT + .saturating_sub(last_ping_received_at.elapsed()) + .as_secs(), + "timeout in", + ); + + let send_next_ping_in = SEND_PING_INTERVAL.saturating_sub(last_ping_sent_at.elapsed()); + let next_ping_expected_in = PING_TIMEOUT.saturating_sub(last_ping_received_at.elapsed()); + + select! { + caller = listener.accept() => { + let (stream, addr) = caller?; + break Result::Caller { packet, stream, addr } + }, + _ = Packet::peek_packet_kind(&mut reader) => { + packet.recv_into(&mut reader).await?; + + if packet.kind() == PacketKind::Ping { + trace!("received ping"); + last_ping_received_at = Instant::now(); + } else { + break Result::Packet { packet } + } + }, + _ = sleep(send_next_ping_in) => { + trace!("sending ping"); + writer.write_all(bytemuck::bytes_of(& Header { kind: PacketKind::Ping.raw(), length: 0 })).await?; + last_ping_sent_at = Instant::now(); + } + _ = sleep(next_ping_expected_in) => { + writer.write_all(REJECT_TIMEOUT).await?; + return Ok(()); + } + } + }; + + let (mut client, mut packet) = match result { + Result::Packet { mut packet } => { + if matches!(packet.kind(), PacketKind::End | PacketKind::Reject) { + info!(?packet, "got disconnect packet"); + + if packet.kind() == PacketKind::End { + packet.header.kind = PacketKind::Reject.raw(); + packet.data.clear(); + packet.data.extend_from_slice(b"nc\0"); + packet.header.length = packet.data.len() as u8; + } + + port_handler.lock().await.start_rejector( + port, + handler_metadata + .listener + .take() + .expect("tried to start rejector twice"), + packet, + )?; + return Ok(()); + } else { + bail!("unexpected packet: {:?}", packet.kind()) + } + } + Result::Caller { + mut packet, + stream, + addr, + } => { + info!(%addr, "got caller from"); + + packet.data.clear(); + /* The I-Telex Clients can't handle data in this packet due to a bug + match addr.ip() { + IpAddr::V4(addr) => packet.data.extend_from_slice(&addr.octets()), + IpAddr::V6(addr) => packet.data.extend_from_slice(&addr.octets()), + } + */ + packet.header = Header { + kind: PacketKind::RemCall.raw(), + length: packet.data.len() as u8, + }; + + packet.send(&mut writer).await?; + + (stream, packet) + } + }; + + match timeout( + CALL_ACK_TIMEOUT, + packet.recv_into_cancelation_safe(&mut reader), + ) + .await + { + Ok(res) => res?, + Err(_) => { + writer.write_all(REJECT_TIMEOUT).await?; + return Ok(()); + } + } + + match packet.kind() { + PacketKind::End | PacketKind::Reject => { + port_handler.lock().await.start_rejector( + port, + handler_metadata + .listener + .take() + .expect("tried to start rejector twice"), + packet, + )?; + + return Ok(()); + } + + PacketKind::RemAck => { + packet.header = Header { + kind: PacketKind::Reject.raw(), + length: 4, + }; + packet.data.clear(); + packet.data.extend_from_slice(b"occ"); + packet.data.push(0); + + { + let mut port_handler = port_handler.lock().await; + + port_handler.register_update(); + port_handler + .port_state + .entry(port) + .or_default() + .new_state(PortStatus::InCall); + + port_handler.start_rejector( + port, + handler_metadata + .listener + .take() + .expect("tried to start rejector twice"), + packet, + )?; + } + + stream.set_nodelay(true)?; + client.set_nodelay(true)?; + + let _ = timeout( + CALL_TIMEOUT, + tokio::io::copy_bidirectional(stream, &mut client), + ) + .await; + + { + let mut port_handler = port_handler.lock().await; + + port_handler.register_update(); + port_handler + .port_state + .entry(port) + .or_default() + .new_state(PortStatus::Disconnected); + + port_handler + .change_rejector(port, |packet| { + packet.data.clear(); + packet.data.extend_from_slice(b"nc"); + packet.data.push(0); + packet.header = Header { + kind: PacketKind::Reject.raw(), + length: packet.data.len() as u8, + }; + }) + .await?; + } + + return Ok(()); + } + + kind => bail!("unexpected packet: {:?}", kind), + } +} diff --git a/src/constants.rs b/src/constants.rs new file mode 100644 index 0000000..afcdb33 --- /dev/null +++ b/src/constants.rs @@ -0,0 +1,11 @@ +use std::time::Duration; + +pub const AUTH_TIMEOUT: Duration = Duration::from_secs(30); +pub const CALL_ACK_TIMEOUT: Duration = Duration::from_secs(30); +pub const CALL_TIMEOUT: Duration = Duration::from_secs(24 * 60 * 60); +pub const PORT_RETRY_TIME: Duration = Duration::from_secs(15 * 60); +pub const PORT_OWNERSHIP_TIMEOUT: Duration = Duration::from_secs(1 * 60 * 60); +pub const PING_TIMEOUT: Duration = Duration::from_secs(30); +pub const SEND_PING_INTERVAL: Duration = Duration::from_secs(20); + +pub const CACHE_STORE_INTERVAL: Duration = Duration::from_secs(5);