turn on most of clippy::pedantic

This commit is contained in:
soruh 2023-03-19 17:17:31 +01:00
parent 4023b5bad4
commit 96033a0796
5 changed files with 391 additions and 318 deletions

View File

@ -53,11 +53,7 @@ pub async fn dyn_ip_update(
bail!( bail!(
"{}", "{}",
std::str::from_utf8( std::str::from_utf8(first_zero.map_or(&packet.data, |i| &packet.data[..i]),)?
first_zero
.map(|i| &packet.data[..i])
.unwrap_or(&packet.data),
)?
) )
} }

View File

@ -1,8 +1,11 @@
use anyhow::{bail, Context}; use anyhow::{anyhow, bail, Context};
use std::{net::SocketAddr, time::Instant}; use std::{net::SocketAddr, time::Instant};
use tokio::{ use tokio::{
io::AsyncWriteExt, io::AsyncWriteExt,
net::{TcpListener, TcpStream}, net::{
tcp::{ReadHalf, WriteHalf},
TcpListener, TcpStream,
},
select, select,
sync::Mutex, sync::Mutex,
time::{sleep, timeout}, time::{sleep, timeout},
@ -17,32 +20,15 @@ use crate::{
Config, HandlerMetadata, Config, HandlerMetadata,
}; };
pub async fn connection_handler( async fn authenticate(
config: &Config, config: &Config,
handler_metadata: &mut HandlerMetadata,
port_handler: &Mutex<PortHandler>, port_handler: &Mutex<PortHandler>,
stream: &mut TcpStream, handler_metadata: &mut HandlerMetadata,
) -> anyhow::Result<()> { number: u32,
let addr = stream.peer_addr()?; pin: u16,
) -> anyhow::Result<Option<u16>> {
let (mut reader, mut writer) = stream.split();
let mut packet = Packet::default();
match timeout(AUTH_TIMEOUT, packet.recv_into_cancelation_safe(&mut reader)).await {
Ok(res) => res?,
Err(_) => {
writer.write_all(REJECT_TIMEOUT).await?;
return Ok(());
}
}
let RemConnect { number, pin } = packet.as_rem_connect()?;
handler_metadata.number = Some(number);
let mut authenticated = false; let mut authenticated = false;
let port = loop { loop {
let mut updated_server = false; let mut updated_server = false;
let port = port_handler let port = port_handler
@ -51,8 +37,7 @@ pub async fn connection_handler(
.allocate_port_for_number(config, number); .allocate_port_for_number(config, number);
let Some(port) = port else { let Some(port) = port else {
writer.write_all(REJECT_OOP).await?; return Ok(None);
return Ok(());
}; };
// make sure the client is authenticated before opening any ports // make sure the client is authenticated before opening any ports
@ -96,42 +81,38 @@ pub async fn connection_handler(
handler_metadata.port = Some(port); handler_metadata.port = Some(port);
break port; break Ok(Some(port));
} }
Err(_err) => { Err(_err) => {
port_handler.mark_port_error(number, port); port_handler.mark_port_error(number, port);
continue; continue;
} }
}; };
}; }
}
info!(%addr, number, port, "authenticated"); #[derive(Debug)]
enum IdleResult {
let listener = handler_metadata.listener.as_mut().unwrap(); // we only break from the loop if this is set
packet.header = Header {
kind: PacketKind::RemConfirm.raw(),
length: 0,
};
packet.data.clear();
packet.send(&mut writer).await?;
#[derive(Debug)]
enum Result {
Caller { Caller {
packet: Packet, packet: Packet,
stream: TcpStream, stream: TcpStream,
addr: SocketAddr, addr: SocketAddr,
}, },
Packet { Disconnect {
packet: Packet, packet: Packet,
}, },
} }
async fn idle(
listener: &mut TcpListener,
mut packet: Packet,
reader: &mut ReadHalf<'_>,
writer: &mut WriteHalf<'_>,
) -> anyhow::Result<Option<IdleResult>> {
let mut last_ping_sent_at = Instant::now(); let mut last_ping_sent_at = Instant::now();
let mut last_ping_received_at = Instant::now(); let mut last_ping_received_at = Instant::now();
let result = loop { loop {
trace!( trace!(
seconds = SEND_PING_INTERVAL seconds = SEND_PING_INTERVAL
.saturating_sub(last_ping_sent_at.elapsed()) .saturating_sub(last_ping_sent_at.elapsed())
@ -151,16 +132,16 @@ pub async fn connection_handler(
select! { select! {
caller = listener.accept() => { caller = listener.accept() => {
let (stream, addr) = caller?; let (stream, addr) = caller?;
break Result::Caller { packet, stream, addr } break Ok(Some(IdleResult::Caller { packet, stream, addr }))
}, },
_ = Packet::peek_packet_kind(&mut reader) => { _ = Packet::peek_packet_kind( reader) => {
packet.recv_into(&mut reader).await?; packet.recv_into( reader).await?;
if packet.kind() == PacketKind::Ping { if packet.kind() == PacketKind::Ping {
trace!("received ping"); trace!("received ping");
last_ping_received_at = Instant::now(); last_ping_received_at = Instant::now();
} else { } else {
break Result::Packet { packet } break Ok(Some(IdleResult::Disconnect { packet }))
} }
}, },
_ = sleep(send_next_ping_in) => { _ = sleep(send_next_ping_in) => {
@ -170,13 +151,21 @@ pub async fn connection_handler(
} }
_ = sleep(next_ping_expected_in) => { _ = sleep(next_ping_expected_in) => {
writer.write_all(REJECT_TIMEOUT).await?; writer.write_all(REJECT_TIMEOUT).await?;
return Ok(()); break Ok(None);
} }
} }
}; }
}
let (mut client, mut packet) = match result { async fn notify_or_disconnect(
Result::Packet { mut packet } => { result: IdleResult,
handler_metadata: &mut HandlerMetadata,
port_handler: &Mutex<PortHandler>,
port: u16,
writer: &mut WriteHalf<'_>,
) -> anyhow::Result<Option<(TcpStream, Packet)>> {
match result {
IdleResult::Disconnect { mut packet } => {
if matches!(packet.kind(), PacketKind::End | PacketKind::Reject) { if matches!(packet.kind(), PacketKind::End | PacketKind::Reject) {
info!(?packet, "got disconnect packet"); info!(?packet, "got disconnect packet");
@ -184,7 +173,7 @@ pub async fn connection_handler(
if packet.data.is_empty() { if packet.data.is_empty() {
packet.data.extend_from_slice(b"nc\0"); packet.data.extend_from_slice(b"nc\0");
packet.header.length = packet.data.len() as u8; packet.header.length = packet.data.len().try_into().unwrap();
} }
port_handler.lock().await.start_rejector( port_handler.lock().await.start_rejector(
@ -195,12 +184,12 @@ pub async fn connection_handler(
.expect("tried to start rejector twice"), .expect("tried to start rejector twice"),
packet, packet,
)?; )?;
return Ok(()); Ok(None)
} else { } else {
bail!("unexpected packet: {:?}", packet.kind()) Err(anyhow!("unexpected packet: {:?}", packet.kind()))
} }
} }
Result::Caller { IdleResult::Caller {
mut packet, mut packet,
stream, stream,
addr, addr,
@ -216,43 +205,24 @@ pub async fn connection_handler(
*/ */
packet.header = Header { packet.header = Header {
kind: PacketKind::RemCall.raw(), kind: PacketKind::RemCall.raw(),
length: packet.data.len() as u8, length: packet.data.len().try_into().unwrap(), // ip addresses are less then 255 bytes long
}; };
packet.send(&mut writer).await?; packet.send(writer).await?;
(stream, packet) Ok(Some((stream, packet)))
}
};
match timeout(
CALL_ACK_TIMEOUT,
packet.recv_into_cancelation_safe(&mut reader),
)
.await
{
Ok(res) => res?,
Err(_) => {
writer.write_all(REJECT_TIMEOUT).await?;
return Ok(());
} }
} }
}
match packet.kind() { async fn connect(
PacketKind::End | PacketKind::Reject => { mut packet: Packet,
port_handler.lock().await.start_rejector( port_handler: &Mutex<PortHandler>,
port, port: u16,
handler_metadata handler_metadata: &mut HandlerMetadata,
.listener stream: &mut TcpStream,
.take() client: &mut TcpStream,
.expect("tried to start rejector twice"), ) -> anyhow::Result<()> {
packet,
)?;
Ok(())
}
PacketKind::RemAck => {
packet.header = Header { packet.header = Header {
kind: PacketKind::Reject.raw(), kind: PacketKind::Reject.raw(),
length: 4, length: 4,
@ -284,11 +254,7 @@ pub async fn connection_handler(
stream.set_nodelay(true)?; stream.set_nodelay(true)?;
client.set_nodelay(true)?; client.set_nodelay(true)?;
let _ = timeout( let _ = timeout(CALL_TIMEOUT, tokio::io::copy_bidirectional(stream, client)).await;
CALL_TIMEOUT,
tokio::io::copy_bidirectional(stream, &mut client),
)
.await;
{ {
let mut port_handler = port_handler.lock().await; let mut port_handler = port_handler.lock().await;
@ -307,13 +273,100 @@ pub async fn connection_handler(
packet.data.push(0); packet.data.push(0);
packet.header = Header { packet.header = Header {
kind: PacketKind::Reject.raw(), kind: PacketKind::Reject.raw(),
length: packet.data.len() as u8, length: packet.data.len().try_into().unwrap(),
}; };
}) })
.await?; .await?;
} }
Ok(()) Ok(())
}
pub async fn handler(
stream: &mut TcpStream,
addr: SocketAddr,
config: &Config,
handler_metadata: &mut HandlerMetadata,
port_handler: &Mutex<PortHandler>,
) -> anyhow::Result<()> {
let (mut reader, mut writer) = stream.split();
let mut packet = Packet::default();
let Ok(res) = timeout(AUTH_TIMEOUT, packet.recv_into_cancelation_safe(&mut reader)).await else {
writer.write_all(REJECT_TIMEOUT).await?;
return Ok(());
};
res?;
let RemConnect { number, pin } = packet.as_rem_connect()?;
handler_metadata.number = Some(number);
let Some(port) = authenticate(config, port_handler, handler_metadata, number, pin).await? else {
writer.write_all(REJECT_OOP).await?;
return Ok(());
};
info!(%addr, number, port, "authenticated");
let listener = handler_metadata.listener.as_mut().unwrap(); // we are only authenticated if this is set
packet.header = Header {
kind: PacketKind::RemConfirm.raw(),
length: 0,
};
packet.data.clear();
packet.send(&mut writer).await?;
let Some(idle_result) = idle(
listener,
packet,
&mut reader,
&mut writer,
).await? else {
return Ok(());
};
let Some((mut client, mut packet)) = notify_or_disconnect(idle_result, handler_metadata, port_handler, port, &mut writer).await? else {
return Ok(());
};
let recv = timeout(
CALL_ACK_TIMEOUT,
packet.recv_into_cancelation_safe(&mut reader),
);
let Ok(res) = recv.await else {
writer.write_all(REJECT_TIMEOUT).await?;
return Ok(());
};
res?;
match packet.kind() {
PacketKind::End | PacketKind::Reject => {
port_handler.lock().await.start_rejector(
port,
handler_metadata
.listener
.take()
.expect("tried to start rejector twice"),
packet,
)?;
Ok(())
}
PacketKind::RemAck => {
connect(
packet,
port_handler,
port,
handler_metadata,
stream,
&mut client,
)
.await
} }
kind => bail!("unexpected packet: {:?}", kind), kind => bail!("unexpected packet: {:?}", kind),

View File

@ -1,3 +1,6 @@
#![warn(clippy::pedantic)]
#![allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
use std::{ use std::{
fmt::Debug, fmt::Debug,
fs::File, fs::File,
@ -15,17 +18,14 @@ use serde::{Deserialize, Deserializer};
use time::format_description::OwnedFormatItem; use time::format_description::OwnedFormatItem;
use tokio::{ use tokio::{
io::AsyncWriteExt, io::AsyncWriteExt,
net::TcpListener, net::{TcpListener, TcpStream},
sync::Mutex, sync::Mutex,
time::{sleep, Instant}, time::{sleep, Instant},
}; };
use tracing::{error, info, warn, Level}; use tracing::{error, info, warn, Level};
use crate::{ use crate::packets::PacketKind;
client::connection_handler, use crate::ports::{cache_daemon, AllowedList, PortHandler, PortStatus};
ports::{AllowedPorts, PortHandler, PortStatus},
};
use crate::{constants::CACHE_STORE_INTERVAL, packets::PacketKind};
pub mod auth; pub mod auth;
pub mod client; pub mod client;
@ -41,7 +41,7 @@ type UnixTimestamp = u64;
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
pub struct Config { pub struct Config {
allowed_ports: AllowedPorts, allowed_ports: AllowedList,
#[serde(deserialize_with = "parse_socket_addr")] #[serde(deserialize_with = "parse_socket_addr")]
listen_addr: SocketAddr, listen_addr: SocketAddr,
#[serde(deserialize_with = "parse_socket_addr")] #[serde(deserialize_with = "parse_socket_addr")]
@ -135,29 +135,9 @@ static TIME_ZONE_OFFSET: once_cell::sync::OnceCell<time::UtcOffset> =
static TIME_FORMAT: once_cell::sync::OnceCell<OwnedFormatItem> = once_cell::sync::OnceCell::new(); static TIME_FORMAT: once_cell::sync::OnceCell<OwnedFormatItem> = once_cell::sync::OnceCell::new();
fn main() -> anyhow::Result<()> { fn setup_tracing(config: &Config) {
let config = Arc::new(Config::load("config.json")?);
if config.allowed_ports.is_empty() {
panic!("no allowed ports");
}
TIME_FORMAT.set(config.time_format.clone()).unwrap();
// we need to get this while still single threaded
// as getting the time zone offset in a multithreaded programm
// is UB in some environments
TIME_ZONE_OFFSET
.set(time::UtcOffset::current_local_offset()?)
.unwrap();
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(async move {
{
use tracing_subscriber::prelude::*; use tracing_subscriber::prelude::*;
use tracing_subscriber::*; use tracing_subscriber::{filter, fmt};
// build a `Subscriber` by combining layers with a // build a `Subscriber` by combining layers with a
// `tracing_subscriber::Registry`: // `tracing_subscriber::Registry`:
@ -180,79 +160,24 @@ fn main() -> anyhow::Result<()> {
})), })),
) )
.init(); .init();
} }
let cache_path = PathBuf::from("cache.json"); async fn connection_handler(
mut stream: TcpStream,
let (change_sender, mut change_receiver) = tokio::sync::watch::channel(Instant::now()); addr: SocketAddr,
config: Arc<Config>,
let mut port_handler = PortHandler::load_or_default(&cache_path, change_sender); port_handler: Arc<Mutex<PortHandler>>,
port_handler.update_allowed_ports(&config.allowed_ports); ) {
use futures::future::FutureExt;
let port_handler = Arc::new(Mutex::new(port_handler));
{
let port_handler = port_handler.clone();
spawn("cache daemon", async move {
let mut last_store = Instant::now() - 2 * CACHE_STORE_INTERVAL;
let mut change_timeout = None;
loop {
if let Some(change_timeout) = change_timeout.take() {
tokio::time::timeout(change_timeout, change_receiver.changed())
.await
.unwrap_or(Ok(()))
} else {
change_receiver.changed().await
}
.expect("failed to wait for cache changes");
let time_since_last_store = last_store.elapsed();
if time_since_last_store >= CACHE_STORE_INTERVAL {
let port_handler = port_handler.lock().await;
last_store = Instant::now();
if let Err(err) = port_handler.store(&cache_path) {
error!("failed to store cache: {err:?}");
}
} else {
change_timeout = Some(CACHE_STORE_INTERVAL - time_since_last_store);
}
}
});
}
#[cfg(feature = "debug_server")]
if let Some(listen_addr) = config.debug_server_addr {
warn!(%listen_addr, "debug server listening");
spawn(
"debug server",
debug_server(listen_addr, port_handler.clone()),
);
}
let listener = TcpListener::bind(config.listen_addr).await?;
warn!(
listen_addr = %config.listen_addr,
"centralex server listening"
);
while let Ok((mut stream, addr)) = listener.accept().await {
info!(%addr, "new connection");
let port_handler = port_handler.clone();
let config = config.clone();
let mut handler_metadata = HandlerMetadata::default(); let mut handler_metadata = HandlerMetadata::default();
spawn(&format!("connection to {addr}"), async move { let res = std::panic::AssertUnwindSafe(client::handler(
use futures::future::FutureExt; &mut stream,
addr,
let res = std::panic::AssertUnwindSafe(connection_handler(
&config, &config,
&mut handler_metadata, &mut handler_metadata,
&port_handler, &port_handler,
&mut stream,
)) ))
.catch_unwind() .catch_unwind()
.await; .await;
@ -261,8 +186,7 @@ fn main() -> anyhow::Result<()> {
Err(err) => { Err(err) => {
let err = err let err = err
.downcast::<String>() .downcast::<String>()
.map(|err| *err) .map_or_else(|_| "?".to_owned(), |err| *err);
.unwrap_or_else(|_| "?".to_owned());
Some(format!("panic at: {err}")) Some(format!("panic at: {err}"))
} }
@ -280,7 +204,7 @@ fn main() -> anyhow::Result<()> {
packet.data.push(0); packet.data.push(0);
packet.header = Header { packet.header = Header {
kind: PacketKind::Error.raw(), kind: PacketKind::Error.raw(),
length: packet.data.len() as u8, length: packet.data.len().try_into().unwrap(), // this will never fail, as we just truncated the vector
}; };
let (_, mut writer) = stream.split(); let (_, mut writer) = stream.split();
@ -316,7 +240,64 @@ fn main() -> anyhow::Result<()> {
sleep(Duration::from_secs(3)).await; sleep(Duration::from_secs(3)).await;
let _ = stream.shutdown().await; let _ = stream.shutdown().await;
}); }
fn main() -> anyhow::Result<()> {
let config = Arc::new(Config::load("config.json")?);
assert!(!config.allowed_ports.is_empty(), "no allowed ports");
TIME_FORMAT.set(config.time_format.clone()).unwrap();
// we need to get this while still single threaded
// as getting the time zone offset in a multithreaded programm
// is UB in some environments
TIME_ZONE_OFFSET
.set(time::UtcOffset::current_local_offset()?)
.unwrap();
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(async move {
setup_tracing(&config);
let cache_path = PathBuf::from("cache.json");
let (change_sender, change_receiver) = tokio::sync::watch::channel(Instant::now());
let mut port_handler = PortHandler::load_or_default(&cache_path, change_sender);
port_handler.update_allowed_ports(&config.allowed_ports);
let port_handler = Arc::new(Mutex::new(port_handler));
spawn(
"cache daemon",
cache_daemon(port_handler.clone(), cache_path, change_receiver),
);
#[cfg(feature = "debug_server")]
if let Some(listen_addr) = config.debug_server_addr {
warn!(%listen_addr, "debug server listening");
spawn(
"debug server",
debug_server(listen_addr, port_handler.clone()),
);
}
let listener = TcpListener::bind(config.listen_addr).await?;
warn!(
listen_addr = %config.listen_addr,
"centralex server listening"
);
while let Ok((stream, addr)) = listener.accept().await {
info!(%addr, "new connection");
spawn(
&format!("connection to {addr}"),
connection_handler(stream, addr, config.clone(), port_handler.clone()),
);
} }
Ok(()) Ok(())

View File

@ -26,7 +26,9 @@ pub enum PacketKind {
Error = 0xff, Error = 0xff,
} }
#[allow(clippy::enum_glob_use)]
impl PacketKind { impl PacketKind {
#[must_use]
fn from_u8(raw: u8) -> Self { fn from_u8(raw: u8) -> Self {
use PacketKind::*; use PacketKind::*;
@ -45,6 +47,7 @@ impl PacketKind {
} }
} }
#[must_use]
pub fn raw(&self) -> u8 { pub fn raw(&self) -> u8 {
use PacketKind::*; use PacketKind::*;
@ -154,6 +157,7 @@ impl Packet {
Ok(()) Ok(())
} }
#[must_use]
pub fn kind(&self) -> PacketKind { pub fn kind(&self) -> PacketKind {
PacketKind::from_u8(self.header.kind) PacketKind::from_u8(self.header.kind)
} }

View File

@ -5,18 +5,23 @@ use std::{
fs::File, fs::File,
io::{BufReader, BufWriter}, io::{BufReader, BufWriter},
ops::RangeInclusive, ops::RangeInclusive,
path::Path, path::{Path, PathBuf},
sync::Arc, sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH}, time::{Duration, SystemTime, UNIX_EPOCH},
}; };
use anyhow::anyhow; use anyhow::anyhow;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::{net::TcpListener, sync::Mutex, task::JoinHandle, time::Instant}; use tokio::{
net::TcpListener,
sync::{watch::Receiver, Mutex},
task::JoinHandle,
time::Instant,
};
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
use crate::{ use crate::{
constants::{PORT_OWNERSHIP_TIMEOUT, PORT_RETRY_TIME}, constants::{CACHE_STORE_INTERVAL, PORT_OWNERSHIP_TIMEOUT, PORT_RETRY_TIME},
packets::Packet, packets::Packet,
spawn, Config, Number, Port, UnixTimestamp, TIME_FORMAT, TIME_ZONE_OFFSET, spawn, Config, Number, Port, UnixTimestamp, TIME_FORMAT, TIME_ZONE_OFFSET,
}; };
@ -32,7 +37,7 @@ pub struct PortHandler {
#[serde(skip)] #[serde(skip)]
port_guards: HashMap<Port, Rejector>, port_guards: HashMap<Port, Rejector>,
allowed_ports: AllowedPorts, allowed_ports: AllowedList,
#[serde(skip)] #[serde(skip)]
free_ports: HashSet<Port>, free_ports: HashSet<Port>,
@ -42,6 +47,38 @@ pub struct PortHandler {
pub port_state: HashMap<Port, PortState>, pub port_state: HashMap<Port, PortState>,
} }
pub async fn cache_daemon(
port_handler: Arc<Mutex<PortHandler>>,
cache_path: PathBuf,
mut change_receiver: Receiver<Instant>,
) {
let mut last_store = Instant::now() - 2 * CACHE_STORE_INTERVAL;
let mut change_timeout = None;
loop {
if let Some(change_timeout) = change_timeout.take() {
tokio::time::timeout(change_timeout, change_receiver.changed())
.await
.unwrap_or(Ok(()))
} else {
change_receiver.changed().await
}
.expect("failed to wait for cache changes");
let time_since_last_store = last_store.elapsed();
if time_since_last_store >= CACHE_STORE_INTERVAL {
let port_handler = port_handler.lock().await;
last_store = Instant::now();
if let Err(err) = port_handler.store(&cache_path) {
error!("failed to store cache: {err:?}");
}
} else {
change_timeout = Some(CACHE_STORE_INTERVAL - time_since_last_store);
}
}
}
#[derive(Hash, PartialEq, Eq)] #[derive(Hash, PartialEq, Eq)]
struct DisplayAsDebug<T: Display>(T); struct DisplayAsDebug<T: Display>(T);
impl<T: Display> Debug for DisplayAsDebug<T> { impl<T: Display> Debug for DisplayAsDebug<T> {
@ -60,7 +97,7 @@ fn duration_in_hours(duration: Duration) -> String {
match (hours > 0, minutes > 0) { match (hours > 0, minutes > 0) {
(true, _) => format!("{hours}h {minutes}min {seconds}s"), (true, _) => format!("{hours}h {minutes}min {seconds}s"),
(false, true) => format!("{minutes}min {seconds}s"), (false, true) => format!("{minutes}min {seconds}s"),
_ => format!("{:.0?}", duration), _ => format!("{duration:.0?}"),
} }
} }
@ -69,7 +106,9 @@ fn format_instant(instant: Instant) -> String {
(|| -> anyhow::Result<_> { (|| -> anyhow::Result<_> {
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)? - instant.elapsed(); let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)? - instant.elapsed();
let date = time::OffsetDateTime::from_unix_timestamp(timestamp.as_secs() as i64)? let date = time::OffsetDateTime::from_unix_timestamp(
timestamp.as_secs().try_into().expect("timestamp overflow"),
)?
.to_offset(*TIME_ZONE_OFFSET.get().unwrap()) .to_offset(*TIME_ZONE_OFFSET.get().unwrap())
.format(TIME_FORMAT.get().unwrap())?; .format(TIME_FORMAT.get().unwrap())?;
@ -93,7 +132,7 @@ impl Debug for PortHandler {
let mut free_ports = self.free_ports.iter().copied().collect::<Vec<u16>>(); let mut free_ports = self.free_ports.iter().copied().collect::<Vec<u16>>();
free_ports.sort(); free_ports.sort_unstable();
let mut free_ports = free_ports let mut free_ports = free_ports
.into_iter() .into_iter()
@ -123,8 +162,6 @@ impl Debug for PortHandler {
.allocated_ports .allocated_ports
.iter() .iter()
.map(|(&number, &port)| { .map(|(&number, &port)| {
let state = &self.port_state[&port];
#[derive(Debug)] #[derive(Debug)]
#[allow(dead_code)] #[allow(dead_code)]
struct State { struct State {
@ -134,6 +171,8 @@ impl Debug for PortHandler {
last_change: DisplayAsDebug<String>, last_change: DisplayAsDebug<String>,
} }
let state = &self.port_state[&port];
State { State {
state: state.status, state: state.status,
number, number,
@ -145,7 +184,7 @@ impl Debug for PortHandler {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
allocated_ports.sort_by(|a, b| { allocated_ports.sort_unstable_by(|a, b| {
a.state.cmp(&b.state).then( a.state.cmp(&b.state).then(
self.port_state[&a.port] self.port_state[&a.port]
.last_change .last_change
@ -157,10 +196,10 @@ impl Debug for PortHandler {
writeln!(f, "last update: {last_update}")?; writeln!(f, "last update: {last_update}")?;
writeln!(f, "rejectors: {:#?}", self.port_guards)?; writeln!(f, "rejectors: {:#?}", self.port_guards)?;
writeln!(f, "allowed ports: {:?}", self.allowed_ports.0)?; writeln!(f, "allowed ports: {:?}", self.allowed_ports.0)?;
writeln!(f, "free ports: {:?}", free_ports)?; writeln!(f, "free ports: {free_ports:?}")?;
writeln!(f, "errored ports: {:#?}", errored_ports)?; writeln!(f, "errored ports: {errored_ports:#?}")?;
writeln!(f, "allocated ports: {:#?}", allocated_ports)?; writeln!(f, "allocated ports: {allocated_ports:#?}")?;
Ok(()) Ok(())
} }
@ -210,18 +249,21 @@ impl Default for PortStatus {
} }
#[derive(Default, Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[derive(Default, Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct AllowedPorts(Vec<RangeInclusive<u16>>); pub struct AllowedList(Vec<RangeInclusive<u16>>);
impl AllowedPorts { impl AllowedList {
#[must_use]
pub fn is_allowed(&self, port: Port) -> bool { pub fn is_allowed(&self, port: Port) -> bool {
self.0.iter().any(|range| range.contains(&port)) self.0.iter().any(|range| range.contains(&port))
} }
#[must_use]
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.0.is_empty() self.0.is_empty()
} }
} }
impl PortHandler { impl PortHandler {
#[must_use]
pub fn status_string(&self) -> String { pub fn status_string(&self) -> String {
format!("{self:#?}\n") format!("{self:#?}\n")
} }
@ -256,6 +298,7 @@ impl PortHandler {
Ok(cache) Ok(cache)
} }
#[must_use]
pub fn load_or_default( pub fn load_or_default(
path: &Path, path: &Path,
change_sender: tokio::sync::watch::Sender<Instant>, change_sender: tokio::sync::watch::Sender<Instant>,
@ -266,7 +309,7 @@ impl PortHandler {
}) })
} }
pub fn update_allowed_ports(&mut self, allowed_ports: &AllowedPorts) { pub fn update_allowed_ports(&mut self, allowed_ports: &AllowedList) {
self.register_update(); self.register_update();
self.allowed_ports = allowed_ports.clone(); self.allowed_ports = allowed_ports.clone();
@ -392,8 +435,7 @@ impl PortHandler {
let already_connected = self let already_connected = self
.port_state .port_state
.get(port) .get(port)
.map(|state| state.status != PortStatus::Disconnected) .map_or(false, |state| state.status != PortStatus::Disconnected);
.unwrap_or(false);
if already_connected { if already_connected {
None None
@ -463,14 +505,11 @@ impl PortHandler {
} }
let removable_entry = self.allocated_ports.iter().find(|(_, port)| { let removable_entry = self.allocated_ports.iter().find(|(_, port)| {
self.port_state self.port_state.get(port).map_or(true, |port_state| {
.get(port)
.map(|port_state| {
port_state.status == PortStatus::Disconnected port_state.status == PortStatus::Disconnected
&& now.saturating_sub(Duration::from_secs(port_state.last_change)) && now.saturating_sub(Duration::from_secs(port_state.last_change))
>= PORT_OWNERSHIP_TIMEOUT >= PORT_OWNERSHIP_TIMEOUT
}) })
.unwrap_or(true)
}); });
if let Some((&old_number, &port)) = removable_entry { if let Some((&old_number, &port)) = removable_entry {