split code into modules
This commit is contained in:
parent
887a1b1db9
commit
0c4be4541c
349
src/main.rs
349
src/main.rs
@ -8,38 +8,32 @@ use std::{
|
|||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use anyhow::{bail, Context};
|
|
||||||
use debug_server::debug_server;
|
use debug_server::debug_server;
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
use packets::{Header, Packet, RemConnect};
|
use packets::{Header, Packet};
|
||||||
use serde::{Deserialize, Deserializer};
|
use serde::{Deserialize, Deserializer};
|
||||||
use time::format_description::OwnedFormatItem;
|
use time::format_description::OwnedFormatItem;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::AsyncWriteExt,
|
io::AsyncWriteExt,
|
||||||
net::{TcpListener, TcpStream},
|
net::TcpListener,
|
||||||
select,
|
|
||||||
sync::Mutex,
|
sync::Mutex,
|
||||||
time::{sleep, timeout, Instant},
|
time::{sleep, Instant},
|
||||||
};
|
};
|
||||||
use tracing::{error, info, trace, warn, Level};
|
use tracing::{error, info, warn, Level};
|
||||||
|
|
||||||
use crate::packets::{dyn_ip_update, PacketKind, REJECT_OOP, REJECT_TIMEOUT};
|
use crate::{
|
||||||
use crate::ports::{AllowedPorts, PortHandler, PortStatus};
|
client::connection_handler,
|
||||||
|
ports::{AllowedPorts, PortHandler, PortStatus},
|
||||||
const AUTH_TIMEOUT: Duration = Duration::from_secs(30);
|
};
|
||||||
const CALL_ACK_TIMEOUT: Duration = Duration::from_secs(30);
|
use crate::{constants::CACHE_STORE_INTERVAL, packets::PacketKind};
|
||||||
const CALL_TIMEOUT: Duration = Duration::from_secs(24 * 60 * 60);
|
|
||||||
const PORT_RETRY_TIME: Duration = Duration::from_secs(15 * 60);
|
|
||||||
const PORT_OWNERSHIP_TIMEOUT: Duration = Duration::from_secs(1 * 60 * 60);
|
|
||||||
const PING_TIMEOUT: Duration = Duration::from_secs(30);
|
|
||||||
const SEND_PING_INTERVAL: Duration = Duration::from_secs(20);
|
|
||||||
|
|
||||||
const CACHE_STORE_INTERVAL: Duration = Duration::from_secs(5);
|
|
||||||
|
|
||||||
|
pub mod auth;
|
||||||
|
pub mod client;
|
||||||
|
pub mod constants;
|
||||||
#[cfg(feature = "debug_server")]
|
#[cfg(feature = "debug_server")]
|
||||||
mod debug_server;
|
pub mod debug_server;
|
||||||
mod packets;
|
pub mod packets;
|
||||||
mod ports;
|
pub mod ports;
|
||||||
|
|
||||||
type Port = u16;
|
type Port = u16;
|
||||||
type Number = u32;
|
type Number = u32;
|
||||||
@ -152,10 +146,11 @@ fn main() -> anyhow::Result<()> {
|
|||||||
|
|
||||||
TIME_FORMAT.set(config.time_format.clone()).unwrap();
|
TIME_FORMAT.set(config.time_format.clone()).unwrap();
|
||||||
|
|
||||||
|
// we need to get this while still single threaded
|
||||||
|
// as getting the time zone offset in a multithreaded programm
|
||||||
|
// is UB in some environments
|
||||||
TIME_ZONE_OFFSET
|
TIME_ZONE_OFFSET
|
||||||
.set(time::UtcOffset::local_offset_at(
|
.set(time::UtcOffset::current_local_offset()?)
|
||||||
time::OffsetDateTime::UNIX_EPOCH,
|
|
||||||
)?)
|
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
tokio::runtime::Builder::new_multi_thread()
|
tokio::runtime::Builder::new_multi_thread()
|
||||||
@ -328,312 +323,8 @@ fn main() -> anyhow::Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
struct HandlerMetadata {
|
pub struct HandlerMetadata {
|
||||||
number: Option<Number>,
|
number: Option<Number>,
|
||||||
port: Option<Port>,
|
port: Option<Port>,
|
||||||
listener: Option<TcpListener>,
|
listener: Option<TcpListener>,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn connection_handler(
|
|
||||||
config: &Config,
|
|
||||||
handler_metadata: &mut HandlerMetadata,
|
|
||||||
port_handler: &Mutex<PortHandler>,
|
|
||||||
stream: &mut TcpStream,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let (mut reader, mut writer) = stream.split();
|
|
||||||
|
|
||||||
let mut packet = Packet::default();
|
|
||||||
|
|
||||||
match timeout(AUTH_TIMEOUT, packet.recv_into_cancelation_safe(&mut reader)).await {
|
|
||||||
Ok(res) => res?,
|
|
||||||
Err(_) => {
|
|
||||||
writer.write_all(REJECT_TIMEOUT).await?;
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let RemConnect { number, pin } = packet.as_rem_connect()?;
|
|
||||||
|
|
||||||
handler_metadata.number = Some(number);
|
|
||||||
|
|
||||||
let mut authenticated = false;
|
|
||||||
let port = loop {
|
|
||||||
let mut updated_server = false;
|
|
||||||
|
|
||||||
let port = port_handler
|
|
||||||
.lock()
|
|
||||||
.await
|
|
||||||
.allocate_port_for_number(config, number);
|
|
||||||
|
|
||||||
info!(port, "allocated port");
|
|
||||||
|
|
||||||
let Some(port) = port else {
|
|
||||||
writer.write_all(REJECT_OOP).await?;
|
|
||||||
return Ok(());
|
|
||||||
};
|
|
||||||
|
|
||||||
// make sure the client is authenticated before opening any ports
|
|
||||||
if !authenticated {
|
|
||||||
let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port)
|
|
||||||
.await
|
|
||||||
.context("dy-ip update")?;
|
|
||||||
authenticated = true;
|
|
||||||
updated_server = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut port_handler = port_handler.lock().await;
|
|
||||||
|
|
||||||
let listener = if let Some((listener, _packet)) = port_handler.stop_rejector(port).await {
|
|
||||||
Ok(listener)
|
|
||||||
} else {
|
|
||||||
TcpListener::bind((config.listen_addr.ip(), port)).await
|
|
||||||
};
|
|
||||||
|
|
||||||
match listener {
|
|
||||||
Ok(listener) => {
|
|
||||||
// make sure that if we have an error, we still have access
|
|
||||||
// to the listener in the error handler.
|
|
||||||
handler_metadata.listener = Some(listener);
|
|
||||||
|
|
||||||
// if we authenticated a client for a port we then failed to open
|
|
||||||
// we need to update the server here once a port that can be opened
|
|
||||||
// has been found
|
|
||||||
if !updated_server {
|
|
||||||
let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port)
|
|
||||||
.await
|
|
||||||
.context("dy-ip update")?;
|
|
||||||
}
|
|
||||||
|
|
||||||
port_handler.register_update();
|
|
||||||
port_handler
|
|
||||||
.port_state
|
|
||||||
.entry(port)
|
|
||||||
.or_default()
|
|
||||||
.new_state(PortStatus::Idle);
|
|
||||||
|
|
||||||
handler_metadata.port = Some(port);
|
|
||||||
|
|
||||||
break port;
|
|
||||||
}
|
|
||||||
Err(_err) => {
|
|
||||||
port_handler.mark_port_error(number, port);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
};
|
|
||||||
|
|
||||||
let listener = handler_metadata.listener.as_mut().unwrap(); // we only break from the loop if this is set
|
|
||||||
|
|
||||||
packet.header = Header {
|
|
||||||
kind: PacketKind::RemConfirm.raw(),
|
|
||||||
length: 0,
|
|
||||||
};
|
|
||||||
packet.data.clear();
|
|
||||||
packet.send(&mut writer).await?;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
enum Result {
|
|
||||||
Caller {
|
|
||||||
packet: Packet,
|
|
||||||
stream: TcpStream,
|
|
||||||
addr: SocketAddr,
|
|
||||||
},
|
|
||||||
Packet {
|
|
||||||
packet: Packet,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut last_ping_sent_at = Instant::now();
|
|
||||||
let mut last_ping_received_at = Instant::now();
|
|
||||||
|
|
||||||
let result = loop {
|
|
||||||
trace!(
|
|
||||||
seconds = SEND_PING_INTERVAL
|
|
||||||
.saturating_sub(last_ping_sent_at.elapsed())
|
|
||||||
.as_secs(),
|
|
||||||
"next ping in"
|
|
||||||
);
|
|
||||||
trace!(
|
|
||||||
seconds = PING_TIMEOUT
|
|
||||||
.saturating_sub(last_ping_received_at.elapsed())
|
|
||||||
.as_secs(),
|
|
||||||
"timeout in",
|
|
||||||
);
|
|
||||||
|
|
||||||
let send_next_ping_in = SEND_PING_INTERVAL.saturating_sub(last_ping_sent_at.elapsed());
|
|
||||||
let next_ping_expected_in = PING_TIMEOUT.saturating_sub(last_ping_received_at.elapsed());
|
|
||||||
|
|
||||||
select! {
|
|
||||||
caller = listener.accept() => {
|
|
||||||
let (stream, addr) = caller?;
|
|
||||||
break Result::Caller { packet, stream, addr }
|
|
||||||
},
|
|
||||||
_ = Packet::peek_packet_kind(&mut reader) => {
|
|
||||||
packet.recv_into(&mut reader).await?;
|
|
||||||
|
|
||||||
if packet.kind() == PacketKind::Ping {
|
|
||||||
trace!("received ping");
|
|
||||||
last_ping_received_at = Instant::now();
|
|
||||||
} else {
|
|
||||||
break Result::Packet { packet }
|
|
||||||
}
|
|
||||||
},
|
|
||||||
_ = sleep(send_next_ping_in) => {
|
|
||||||
trace!("sending ping");
|
|
||||||
writer.write_all(bytemuck::bytes_of(& Header { kind: PacketKind::Ping.raw(), length: 0 })).await?;
|
|
||||||
last_ping_sent_at = Instant::now();
|
|
||||||
}
|
|
||||||
_ = sleep(next_ping_expected_in) => {
|
|
||||||
writer.write_all(REJECT_TIMEOUT).await?;
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let (mut client, mut packet) = match result {
|
|
||||||
Result::Packet { mut packet } => {
|
|
||||||
if matches!(
|
|
||||||
packet.kind(),
|
|
||||||
packets::PacketKind::End | packets::PacketKind::Reject
|
|
||||||
) {
|
|
||||||
info!(?packet, "got disconnect packet");
|
|
||||||
|
|
||||||
if packet.kind() == packets::PacketKind::End {
|
|
||||||
packet.header.kind = packets::PacketKind::Reject.raw();
|
|
||||||
packet.data.clear();
|
|
||||||
packet.data.extend_from_slice(b"nc\0");
|
|
||||||
packet.header.length = packet.data.len() as u8;
|
|
||||||
}
|
|
||||||
|
|
||||||
port_handler.lock().await.start_rejector(
|
|
||||||
port,
|
|
||||||
handler_metadata
|
|
||||||
.listener
|
|
||||||
.take()
|
|
||||||
.expect("tried to start rejector twice"),
|
|
||||||
packet,
|
|
||||||
)?;
|
|
||||||
return Ok(());
|
|
||||||
} else {
|
|
||||||
bail!("unexpected packet: {:?}", packet.kind())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Result::Caller {
|
|
||||||
mut packet,
|
|
||||||
stream,
|
|
||||||
addr,
|
|
||||||
} => {
|
|
||||||
info!(%addr, "got caller from");
|
|
||||||
|
|
||||||
packet.data.clear();
|
|
||||||
/* The I-Telex Clients can't handle data in this packet due to a bug
|
|
||||||
match addr.ip() {
|
|
||||||
IpAddr::V4(addr) => packet.data.extend_from_slice(&addr.octets()),
|
|
||||||
IpAddr::V6(addr) => packet.data.extend_from_slice(&addr.octets()),
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
packet.header = Header {
|
|
||||||
kind: PacketKind::RemCall.raw(),
|
|
||||||
length: packet.data.len() as u8,
|
|
||||||
};
|
|
||||||
|
|
||||||
packet.send(&mut writer).await?;
|
|
||||||
|
|
||||||
(stream, packet)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
match timeout(
|
|
||||||
CALL_ACK_TIMEOUT,
|
|
||||||
packet.recv_into_cancelation_safe(&mut reader),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(res) => res?,
|
|
||||||
Err(_) => {
|
|
||||||
writer.write_all(REJECT_TIMEOUT).await?;
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
match packet.kind() {
|
|
||||||
PacketKind::End | PacketKind::Reject => {
|
|
||||||
port_handler.lock().await.start_rejector(
|
|
||||||
port,
|
|
||||||
handler_metadata
|
|
||||||
.listener
|
|
||||||
.take()
|
|
||||||
.expect("tried to start rejector twice"),
|
|
||||||
packet,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
PacketKind::RemAck => {
|
|
||||||
packet.header = Header {
|
|
||||||
kind: PacketKind::Reject.raw(),
|
|
||||||
length: 4,
|
|
||||||
};
|
|
||||||
packet.data.clear();
|
|
||||||
packet.data.extend_from_slice(b"occ");
|
|
||||||
packet.data.push(0);
|
|
||||||
|
|
||||||
{
|
|
||||||
let mut port_handler = port_handler.lock().await;
|
|
||||||
|
|
||||||
port_handler.register_update();
|
|
||||||
port_handler
|
|
||||||
.port_state
|
|
||||||
.entry(port)
|
|
||||||
.or_default()
|
|
||||||
.new_state(PortStatus::InCall);
|
|
||||||
|
|
||||||
port_handler.start_rejector(
|
|
||||||
port,
|
|
||||||
handler_metadata
|
|
||||||
.listener
|
|
||||||
.take()
|
|
||||||
.expect("tried to start rejector twice"),
|
|
||||||
packet,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
stream.set_nodelay(true)?;
|
|
||||||
client.set_nodelay(true)?;
|
|
||||||
|
|
||||||
let _ = timeout(
|
|
||||||
CALL_TIMEOUT,
|
|
||||||
tokio::io::copy_bidirectional(stream, &mut client),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
{
|
|
||||||
let mut port_handler = port_handler.lock().await;
|
|
||||||
|
|
||||||
port_handler.register_update();
|
|
||||||
port_handler
|
|
||||||
.port_state
|
|
||||||
.entry(port)
|
|
||||||
.or_default()
|
|
||||||
.new_state(PortStatus::Disconnected);
|
|
||||||
|
|
||||||
port_handler
|
|
||||||
.change_rejector(port, |packet| {
|
|
||||||
packet.data.clear();
|
|
||||||
packet.data.extend_from_slice(b"nc");
|
|
||||||
packet.data.push(0);
|
|
||||||
packet.header = Header {
|
|
||||||
kind: PacketKind::Reject.raw(),
|
|
||||||
length: packet.data.len() as u8,
|
|
||||||
};
|
|
||||||
})
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
kind => bail!("unexpected packet: {:?}", kind),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
use std::{fmt::Debug, net::SocketAddr};
|
use std::fmt::Debug;
|
||||||
|
|
||||||
use anyhow::bail;
|
use anyhow::bail;
|
||||||
use bytemuck::{Pod, Zeroable};
|
use bytemuck::{Pod, Zeroable};
|
||||||
@ -6,7 +6,6 @@ use tokio::{
|
|||||||
io::{AsyncReadExt, AsyncWriteExt},
|
io::{AsyncReadExt, AsyncWriteExt},
|
||||||
net::tcp::{ReadHalf, WriteHalf},
|
net::tcp::{ReadHalf, WriteHalf},
|
||||||
};
|
};
|
||||||
use tracing::debug;
|
|
||||||
|
|
||||||
pub const REJECT_OOP: &[u8; 6] = b"\x04\x04oop\x00";
|
pub const REJECT_OOP: &[u8; 6] = b"\x04\x04oop\x00";
|
||||||
pub const REJECT_TIMEOUT: &[u8; 10] = b"\x04\x08timeout\x00";
|
pub const REJECT_TIMEOUT: &[u8; 10] = b"\x04\x08timeout\x00";
|
||||||
@ -177,65 +176,3 @@ impl Packet {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn dyn_ip_update(
|
|
||||||
server: &SocketAddr,
|
|
||||||
number: u32,
|
|
||||||
pin: u16,
|
|
||||||
port: u16,
|
|
||||||
) -> anyhow::Result<std::net::Ipv4Addr> {
|
|
||||||
debug!(%number, %port, "starting dyn ip update");
|
|
||||||
|
|
||||||
let mut packet = Packet::default();
|
|
||||||
packet.header = Header {
|
|
||||||
kind: PacketKind::DynIpUpdate.raw(),
|
|
||||||
length: 8,
|
|
||||||
};
|
|
||||||
|
|
||||||
packet.data.clear();
|
|
||||||
packet.data.reserve(packet.header.length as usize);
|
|
||||||
packet.data.extend_from_slice(&number.to_le_bytes());
|
|
||||||
packet.data.extend_from_slice(&pin.to_le_bytes());
|
|
||||||
packet.data.extend_from_slice(&port.to_le_bytes());
|
|
||||||
|
|
||||||
let mut socket = tokio::net::TcpStream::connect(server).await?;
|
|
||||||
|
|
||||||
let (mut reader, mut writer) = socket.split();
|
|
||||||
|
|
||||||
packet.send(&mut writer).await?;
|
|
||||||
|
|
||||||
packet.recv_into(&mut reader).await?;
|
|
||||||
|
|
||||||
let result = match packet.kind() {
|
|
||||||
PacketKind::DynIpUpdateResponse => Ok(<[u8; 4]>::try_from(packet.data)
|
|
||||||
.map_err(|err| {
|
|
||||||
anyhow::anyhow!(
|
|
||||||
"too little data for ip address. Need 4 bytes got {}",
|
|
||||||
err.len()
|
|
||||||
)
|
|
||||||
})?
|
|
||||||
.into()),
|
|
||||||
PacketKind::Error => {
|
|
||||||
let first_zero = packet
|
|
||||||
.data
|
|
||||||
.iter()
|
|
||||||
.enumerate()
|
|
||||||
.find_map(|(i, x)| (*x == 0).then_some(i));
|
|
||||||
|
|
||||||
bail!(
|
|
||||||
"{}",
|
|
||||||
std::str::from_utf8(
|
|
||||||
first_zero
|
|
||||||
.map(|i| &packet.data[..i])
|
|
||||||
.unwrap_or(&packet.data),
|
|
||||||
)?
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
_ => bail!("server returned unexpected packet"),
|
|
||||||
};
|
|
||||||
|
|
||||||
debug!(?result, "finished dyn ip update");
|
|
||||||
|
|
||||||
result
|
|
||||||
}
|
|
||||||
|
11
src/ports.rs
11
src/ports.rs
@ -16,8 +16,9 @@ use tokio::{net::TcpListener, sync::Mutex, task::JoinHandle, time::Instant};
|
|||||||
use tracing::{error, info, warn};
|
use tracing::{error, info, warn};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
packets::Packet, spawn, Config, Number, Port, UnixTimestamp, PORT_OWNERSHIP_TIMEOUT,
|
constants::{PORT_OWNERSHIP_TIMEOUT, PORT_RETRY_TIME},
|
||||||
PORT_RETRY_TIME, TIME_FORMAT, TIME_ZONE_OFFSET,
|
packets::Packet,
|
||||||
|
spawn, Config, Number, Port, UnixTimestamp, TIME_FORMAT, TIME_ZONE_OFFSET,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Default, Serialize, Deserialize)]
|
#[derive(Default, Serialize, Deserialize)]
|
||||||
@ -421,15 +422,13 @@ impl PortHandler {
|
|||||||
self.port_state
|
self.port_state
|
||||||
.get(port)
|
.get(port)
|
||||||
.map(|port_state| {
|
.map(|port_state| {
|
||||||
dbg!(port_state).status == PortStatus::Disconnected
|
port_state.status == PortStatus::Disconnected
|
||||||
&& dbg!(now.saturating_sub(Duration::from_secs(port_state.last_change)))
|
&& now.saturating_sub(Duration::from_secs(port_state.last_change))
|
||||||
>= PORT_OWNERSHIP_TIMEOUT
|
>= PORT_OWNERSHIP_TIMEOUT
|
||||||
})
|
})
|
||||||
.unwrap_or(true)
|
.unwrap_or(true)
|
||||||
});
|
});
|
||||||
|
|
||||||
dbg!(&removable_entry);
|
|
||||||
|
|
||||||
if let Some((&old_number, &port)) = removable_entry {
|
if let Some((&old_number, &port)) = removable_entry {
|
||||||
self.register_update();
|
self.register_update();
|
||||||
info!(port, old_number, "reused port");
|
info!(port, old_number, "reused port");
|
||||||
|
Loading…
Reference in New Issue
Block a user