From 96033a07961b3bfc79eb54b70c72433300b271b4 Mon Sep 17 00:00:00 2001 From: soruh Date: Sun, 19 Mar 2023 17:17:31 +0100 Subject: [PATCH] turn on most of clippy::pedantic --- src/auth.rs | 6 +- src/client.rs | 333 ++++++++++++++++++++++++++++--------------------- src/main.rs | 271 +++++++++++++++++++--------------------- src/packets.rs | 4 + src/ports.rs | 95 +++++++++----- 5 files changed, 391 insertions(+), 318 deletions(-) diff --git a/src/auth.rs b/src/auth.rs index 27f8d6e..5a2e017 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -53,11 +53,7 @@ pub async fn dyn_ip_update( bail!( "{}", - std::str::from_utf8( - first_zero - .map(|i| &packet.data[..i]) - .unwrap_or(&packet.data), - )? + std::str::from_utf8(first_zero.map_or(&packet.data, |i| &packet.data[..i]),)? ) } diff --git a/src/client.rs b/src/client.rs index f9416af..224b47a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,8 +1,11 @@ -use anyhow::{bail, Context}; +use anyhow::{anyhow, bail, Context}; use std::{net::SocketAddr, time::Instant}; use tokio::{ io::AsyncWriteExt, - net::{TcpListener, TcpStream}, + net::{ + tcp::{ReadHalf, WriteHalf}, + TcpListener, TcpStream, + }, select, sync::Mutex, time::{sleep, timeout}, @@ -17,32 +20,15 @@ use crate::{ Config, HandlerMetadata, }; -pub async fn connection_handler( +async fn authenticate( config: &Config, - handler_metadata: &mut HandlerMetadata, port_handler: &Mutex, - stream: &mut TcpStream, -) -> anyhow::Result<()> { - let addr = stream.peer_addr()?; - - let (mut reader, mut writer) = stream.split(); - - let mut packet = Packet::default(); - - match timeout(AUTH_TIMEOUT, packet.recv_into_cancelation_safe(&mut reader)).await { - Ok(res) => res?, - Err(_) => { - writer.write_all(REJECT_TIMEOUT).await?; - return Ok(()); - } - } - - let RemConnect { number, pin } = packet.as_rem_connect()?; - - handler_metadata.number = Some(number); - + handler_metadata: &mut HandlerMetadata, + number: u32, + pin: u16, +) -> anyhow::Result> { let mut authenticated = false; - let port = loop { + loop { let mut updated_server = false; let port = port_handler @@ -51,8 +37,7 @@ pub async fn connection_handler( .allocate_port_for_number(config, number); let Some(port) = port else { - writer.write_all(REJECT_OOP).await?; - return Ok(()); + return Ok(None); }; // make sure the client is authenticated before opening any ports @@ -96,42 +81,38 @@ pub async fn connection_handler( handler_metadata.port = Some(port); - break port; + break Ok(Some(port)); } Err(_err) => { port_handler.mark_port_error(number, port); continue; } }; - }; - - info!(%addr, number, port, "authenticated"); - - let listener = handler_metadata.listener.as_mut().unwrap(); // we only break from the loop if this is set - - packet.header = Header { - kind: PacketKind::RemConfirm.raw(), - length: 0, - }; - packet.data.clear(); - packet.send(&mut writer).await?; - - #[derive(Debug)] - enum Result { - Caller { - packet: Packet, - stream: TcpStream, - addr: SocketAddr, - }, - Packet { - packet: Packet, - }, } +} +#[derive(Debug)] +enum IdleResult { + Caller { + packet: Packet, + stream: TcpStream, + addr: SocketAddr, + }, + Disconnect { + packet: Packet, + }, +} + +async fn idle( + listener: &mut TcpListener, + mut packet: Packet, + reader: &mut ReadHalf<'_>, + writer: &mut WriteHalf<'_>, +) -> anyhow::Result> { let mut last_ping_sent_at = Instant::now(); let mut last_ping_received_at = Instant::now(); - let result = loop { + loop { trace!( seconds = SEND_PING_INTERVAL .saturating_sub(last_ping_sent_at.elapsed()) @@ -151,16 +132,16 @@ pub async fn connection_handler( select! { caller = listener.accept() => { let (stream, addr) = caller?; - break Result::Caller { packet, stream, addr } + break Ok(Some(IdleResult::Caller { packet, stream, addr })) }, - _ = Packet::peek_packet_kind(&mut reader) => { - packet.recv_into(&mut reader).await?; + _ = Packet::peek_packet_kind( reader) => { + packet.recv_into( reader).await?; if packet.kind() == PacketKind::Ping { trace!("received ping"); last_ping_received_at = Instant::now(); } else { - break Result::Packet { packet } + break Ok(Some(IdleResult::Disconnect { packet })) } }, _ = sleep(send_next_ping_in) => { @@ -170,13 +151,21 @@ pub async fn connection_handler( } _ = sleep(next_ping_expected_in) => { writer.write_all(REJECT_TIMEOUT).await?; - return Ok(()); + break Ok(None); } } - }; + } +} - let (mut client, mut packet) = match result { - Result::Packet { mut packet } => { +async fn notify_or_disconnect( + result: IdleResult, + handler_metadata: &mut HandlerMetadata, + port_handler: &Mutex, + port: u16, + writer: &mut WriteHalf<'_>, +) -> anyhow::Result> { + match result { + IdleResult::Disconnect { mut packet } => { if matches!(packet.kind(), PacketKind::End | PacketKind::Reject) { info!(?packet, "got disconnect packet"); @@ -184,7 +173,7 @@ pub async fn connection_handler( if packet.data.is_empty() { packet.data.extend_from_slice(b"nc\0"); - packet.header.length = packet.data.len() as u8; + packet.header.length = packet.data.len().try_into().unwrap(); } port_handler.lock().await.start_rejector( @@ -195,12 +184,12 @@ pub async fn connection_handler( .expect("tried to start rejector twice"), packet, )?; - return Ok(()); + Ok(None) } else { - bail!("unexpected packet: {:?}", packet.kind()) + Err(anyhow!("unexpected packet: {:?}", packet.kind())) } } - Result::Caller { + IdleResult::Caller { mut packet, stream, addr, @@ -216,27 +205,143 @@ pub async fn connection_handler( */ packet.header = Header { kind: PacketKind::RemCall.raw(), - length: packet.data.len() as u8, + length: packet.data.len().try_into().unwrap(), // ip addresses are less then 255 bytes long }; - packet.send(&mut writer).await?; + packet.send(writer).await?; - (stream, packet) - } - }; - - match timeout( - CALL_ACK_TIMEOUT, - packet.recv_into_cancelation_safe(&mut reader), - ) - .await - { - Ok(res) => res?, - Err(_) => { - writer.write_all(REJECT_TIMEOUT).await?; - return Ok(()); + Ok(Some((stream, packet))) } } +} + +async fn connect( + mut packet: Packet, + port_handler: &Mutex, + port: u16, + handler_metadata: &mut HandlerMetadata, + stream: &mut TcpStream, + client: &mut TcpStream, +) -> anyhow::Result<()> { + packet.header = Header { + kind: PacketKind::Reject.raw(), + length: 4, + }; + packet.data.clear(); + packet.data.extend_from_slice(b"occ"); + packet.data.push(0); + + { + let mut port_handler = port_handler.lock().await; + + port_handler.register_update(); + port_handler + .port_state + .entry(port) + .or_default() + .new_state(PortStatus::InCall); + + port_handler.start_rejector( + port, + handler_metadata + .listener + .take() + .expect("tried to start rejector twice"), + packet, + )?; + } + + stream.set_nodelay(true)?; + client.set_nodelay(true)?; + + let _ = timeout(CALL_TIMEOUT, tokio::io::copy_bidirectional(stream, client)).await; + + { + let mut port_handler = port_handler.lock().await; + + port_handler.register_update(); + port_handler + .port_state + .entry(port) + .or_default() + .new_state(PortStatus::Disconnected); + + port_handler + .change_rejector(port, |packet| { + packet.data.clear(); + packet.data.extend_from_slice(b"nc"); + packet.data.push(0); + packet.header = Header { + kind: PacketKind::Reject.raw(), + length: packet.data.len().try_into().unwrap(), + }; + }) + .await?; + } + + Ok(()) +} + +pub async fn handler( + stream: &mut TcpStream, + addr: SocketAddr, + config: &Config, + handler_metadata: &mut HandlerMetadata, + port_handler: &Mutex, +) -> anyhow::Result<()> { + let (mut reader, mut writer) = stream.split(); + + let mut packet = Packet::default(); + + let Ok(res) = timeout(AUTH_TIMEOUT, packet.recv_into_cancelation_safe(&mut reader)).await else { + writer.write_all(REJECT_TIMEOUT).await?; + return Ok(()); + }; + res?; + + let RemConnect { number, pin } = packet.as_rem_connect()?; + + handler_metadata.number = Some(number); + + let Some(port) = authenticate(config, port_handler, handler_metadata, number, pin).await? else { + writer.write_all(REJECT_OOP).await?; + return Ok(()); + }; + + info!(%addr, number, port, "authenticated"); + + let listener = handler_metadata.listener.as_mut().unwrap(); // we are only authenticated if this is set + + packet.header = Header { + kind: PacketKind::RemConfirm.raw(), + length: 0, + }; + packet.data.clear(); + packet.send(&mut writer).await?; + + let Some(idle_result) = idle( + listener, + packet, + &mut reader, + &mut writer, + ).await? else { + return Ok(()); + }; + + let Some((mut client, mut packet)) = notify_or_disconnect(idle_result, handler_metadata, port_handler, port, &mut writer).await? else { + return Ok(()); + }; + + let recv = timeout( + CALL_ACK_TIMEOUT, + packet.recv_into_cancelation_safe(&mut reader), + ); + + let Ok(res) = recv.await else { + writer.write_all(REJECT_TIMEOUT).await?; + return Ok(()); + }; + res?; match packet.kind() { PacketKind::End | PacketKind::Reject => { @@ -253,67 +358,15 @@ pub async fn connection_handler( } PacketKind::RemAck => { - packet.header = Header { - kind: PacketKind::Reject.raw(), - length: 4, - }; - packet.data.clear(); - packet.data.extend_from_slice(b"occ"); - packet.data.push(0); - - { - let mut port_handler = port_handler.lock().await; - - port_handler.register_update(); - port_handler - .port_state - .entry(port) - .or_default() - .new_state(PortStatus::InCall); - - port_handler.start_rejector( - port, - handler_metadata - .listener - .take() - .expect("tried to start rejector twice"), - packet, - )?; - } - - stream.set_nodelay(true)?; - client.set_nodelay(true)?; - - let _ = timeout( - CALL_TIMEOUT, - tokio::io::copy_bidirectional(stream, &mut client), + connect( + packet, + port_handler, + port, + handler_metadata, + stream, + &mut client, ) - .await; - - { - let mut port_handler = port_handler.lock().await; - - port_handler.register_update(); - port_handler - .port_state - .entry(port) - .or_default() - .new_state(PortStatus::Disconnected); - - port_handler - .change_rejector(port, |packet| { - packet.data.clear(); - packet.data.extend_from_slice(b"nc"); - packet.data.push(0); - packet.header = Header { - kind: PacketKind::Reject.raw(), - length: packet.data.len() as u8, - }; - }) - .await?; - } - - Ok(()) + .await } kind => bail!("unexpected packet: {:?}", kind), diff --git a/src/main.rs b/src/main.rs index 74e6e81..136915a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,6 @@ +#![warn(clippy::pedantic)] +#![allow(clippy::missing_errors_doc, clippy::missing_panics_doc)] + use std::{ fmt::Debug, fs::File, @@ -15,17 +18,14 @@ use serde::{Deserialize, Deserializer}; use time::format_description::OwnedFormatItem; use tokio::{ io::AsyncWriteExt, - net::TcpListener, + net::{TcpListener, TcpStream}, sync::Mutex, time::{sleep, Instant}, }; use tracing::{error, info, warn, Level}; -use crate::{ - client::connection_handler, - ports::{AllowedPorts, PortHandler, PortStatus}, -}; -use crate::{constants::CACHE_STORE_INTERVAL, packets::PacketKind}; +use crate::packets::PacketKind; +use crate::ports::{cache_daemon, AllowedList, PortHandler, PortStatus}; pub mod auth; pub mod client; @@ -41,7 +41,7 @@ type UnixTimestamp = u64; #[derive(Debug, Deserialize)] pub struct Config { - allowed_ports: AllowedPorts, + allowed_ports: AllowedList, #[serde(deserialize_with = "parse_socket_addr")] listen_addr: SocketAddr, #[serde(deserialize_with = "parse_socket_addr")] @@ -135,12 +135,117 @@ static TIME_ZONE_OFFSET: once_cell::sync::OnceCell = static TIME_FORMAT: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); +fn setup_tracing(config: &Config) { + use tracing_subscriber::prelude::*; + use tracing_subscriber::{filter, fmt}; + + // build a `Subscriber` by combining layers with a + // `tracing_subscriber::Registry`: + let registry = tracing_subscriber::registry(); + + #[cfg(feature = "tokio_console")] + let registry = registry.with(console_subscriber::spawn()); + + registry + .with( + fmt::layer() + .with_target(true) + .with_timer(fmt::time::OffsetTime::new( + *TIME_ZONE_OFFSET.get().unwrap(), + TIME_FORMAT.get().unwrap(), + )) + .with_filter(filter::LevelFilter::from_level(config.log_level)) + .with_filter(tracing_subscriber::filter::filter_fn(|meta| { + meta.target().starts_with("centralex") + })), + ) + .init(); +} + +async fn connection_handler( + mut stream: TcpStream, + addr: SocketAddr, + config: Arc, + port_handler: Arc>, +) { + use futures::future::FutureExt; + + let mut handler_metadata = HandlerMetadata::default(); + + let res = std::panic::AssertUnwindSafe(client::handler( + &mut stream, + addr, + &config, + &mut handler_metadata, + &port_handler, + )) + .catch_unwind() + .await; + + let error = match res { + Err(err) => { + let err = err + .downcast::() + .map_or_else(|_| "?".to_owned(), |err| *err); + + Some(format!("panic at: {err}")) + } + Ok(Err(err)) => Some(err.to_string()), + Ok(Ok(())) => None, + }; + + if let Some(error) = error { + error!(%addr, %error, "Client had an error"); + + let mut packet = Packet::default(); + + packet.data.extend_from_slice(error.as_bytes()); + packet.data.truncate((u8::MAX - 1) as usize); + packet.data.push(0); + packet.header = Header { + kind: PacketKind::Error.raw(), + length: packet.data.len().try_into().unwrap(), // this will never fail, as we just truncated the vector + }; + + let (_, mut writer) = stream.split(); + let _ = packet.send(&mut writer).await; + } + + if let Some(port) = handler_metadata.port { + let mut port_handler = port_handler.lock().await; + + if let Some(port_state) = port_handler.port_state.get_mut(&port) { + port_state.new_state(PortStatus::Disconnected); + port_handler.register_update(); + } + + if let Some(listener) = handler_metadata.listener.take() { + let res = port_handler.start_rejector( + port, + listener, + Packet { + header: Header { + kind: PacketKind::Reject.raw(), + length: 3, + }, + data: b"nc\0".to_vec(), + }, + ); + + if let Err(error) = res { + error!(%port, %error, "failed to start rejector"); + } + } + } + + sleep(Duration::from_secs(3)).await; + let _ = stream.shutdown().await; +} + fn main() -> anyhow::Result<()> { let config = Arc::new(Config::load("config.json")?); - if config.allowed_ports.is_empty() { - panic!("no allowed ports"); - } + assert!(!config.allowed_ports.is_empty(), "no allowed ports"); TIME_FORMAT.set(config.time_format.clone()).unwrap(); @@ -155,72 +260,21 @@ fn main() -> anyhow::Result<()> { .enable_all() .build()? .block_on(async move { - { - use tracing_subscriber::prelude::*; - use tracing_subscriber::*; - - // build a `Subscriber` by combining layers with a - // `tracing_subscriber::Registry`: - let registry = tracing_subscriber::registry(); - - #[cfg(feature = "tokio_console")] - let registry = registry.with(console_subscriber::spawn()); - - registry - .with( - fmt::layer() - .with_target(true) - .with_timer(fmt::time::OffsetTime::new( - *TIME_ZONE_OFFSET.get().unwrap(), - TIME_FORMAT.get().unwrap(), - )) - .with_filter(filter::LevelFilter::from_level(config.log_level)) - .with_filter(tracing_subscriber::filter::filter_fn(|meta| { - meta.target().starts_with("centralex") - })), - ) - .init(); - } + setup_tracing(&config); let cache_path = PathBuf::from("cache.json"); - let (change_sender, mut change_receiver) = tokio::sync::watch::channel(Instant::now()); + let (change_sender, change_receiver) = tokio::sync::watch::channel(Instant::now()); let mut port_handler = PortHandler::load_or_default(&cache_path, change_sender); port_handler.update_allowed_ports(&config.allowed_ports); let port_handler = Arc::new(Mutex::new(port_handler)); - { - let port_handler = port_handler.clone(); - spawn("cache daemon", async move { - let mut last_store = Instant::now() - 2 * CACHE_STORE_INTERVAL; - let mut change_timeout = None; - loop { - if let Some(change_timeout) = change_timeout.take() { - tokio::time::timeout(change_timeout, change_receiver.changed()) - .await - .unwrap_or(Ok(())) - } else { - change_receiver.changed().await - } - .expect("failed to wait for cache changes"); - - let time_since_last_store = last_store.elapsed(); - - if time_since_last_store >= CACHE_STORE_INTERVAL { - let port_handler = port_handler.lock().await; - - last_store = Instant::now(); - if let Err(err) = port_handler.store(&cache_path) { - error!("failed to store cache: {err:?}"); - } - } else { - change_timeout = Some(CACHE_STORE_INTERVAL - time_since_last_store); - } - } - }); - } + spawn( + "cache daemon", + cache_daemon(port_handler.clone(), cache_path, change_receiver), + ); #[cfg(feature = "debug_server")] if let Some(listen_addr) = config.debug_server_addr { @@ -237,86 +291,13 @@ fn main() -> anyhow::Result<()> { "centralex server listening" ); - while let Ok((mut stream, addr)) = listener.accept().await { + while let Ok((stream, addr)) = listener.accept().await { info!(%addr, "new connection"); - let port_handler = port_handler.clone(); - let config = config.clone(); - - let mut handler_metadata = HandlerMetadata::default(); - - spawn(&format!("connection to {addr}"), async move { - use futures::future::FutureExt; - - let res = std::panic::AssertUnwindSafe(connection_handler( - &config, - &mut handler_metadata, - &port_handler, - &mut stream, - )) - .catch_unwind() - .await; - - let error = match res { - Err(err) => { - let err = err - .downcast::() - .map(|err| *err) - .unwrap_or_else(|_| "?".to_owned()); - - Some(format!("panic at: {err}")) - } - Ok(Err(err)) => Some(err.to_string()), - Ok(Ok(())) => None, - }; - - if let Some(error) = error { - error!(%addr, %error, "Client had an error"); - - let mut packet = Packet::default(); - - packet.data.extend_from_slice(error.as_bytes()); - packet.data.truncate((u8::MAX - 1) as usize); - packet.data.push(0); - packet.header = Header { - kind: PacketKind::Error.raw(), - length: packet.data.len() as u8, - }; - - let (_, mut writer) = stream.split(); - let _ = packet.send(&mut writer).await; - } - - if let Some(port) = handler_metadata.port { - let mut port_handler = port_handler.lock().await; - - if let Some(port_state) = port_handler.port_state.get_mut(&port) { - port_state.new_state(PortStatus::Disconnected); - port_handler.register_update(); - } - - if let Some(listener) = handler_metadata.listener.take() { - let res = port_handler.start_rejector( - port, - listener, - Packet { - header: Header { - kind: PacketKind::Reject.raw(), - length: 3, - }, - data: b"nc\0".to_vec(), - }, - ); - - if let Err(error) = res { - error!(%port, %error, "failed to start rejector"); - } - } - } - - sleep(Duration::from_secs(3)).await; - let _ = stream.shutdown().await; - }); + spawn( + &format!("connection to {addr}"), + connection_handler(stream, addr, config.clone(), port_handler.clone()), + ); } Ok(()) diff --git a/src/packets.rs b/src/packets.rs index 2750440..accf76f 100644 --- a/src/packets.rs +++ b/src/packets.rs @@ -26,7 +26,9 @@ pub enum PacketKind { Error = 0xff, } +#[allow(clippy::enum_glob_use)] impl PacketKind { + #[must_use] fn from_u8(raw: u8) -> Self { use PacketKind::*; @@ -45,6 +47,7 @@ impl PacketKind { } } + #[must_use] pub fn raw(&self) -> u8 { use PacketKind::*; @@ -154,6 +157,7 @@ impl Packet { Ok(()) } + #[must_use] pub fn kind(&self) -> PacketKind { PacketKind::from_u8(self.header.kind) } diff --git a/src/ports.rs b/src/ports.rs index bcff304..36b0816 100644 --- a/src/ports.rs +++ b/src/ports.rs @@ -5,18 +5,23 @@ use std::{ fs::File, io::{BufReader, BufWriter}, ops::RangeInclusive, - path::Path, + path::{Path, PathBuf}, sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, }; use anyhow::anyhow; use serde::{Deserialize, Serialize}; -use tokio::{net::TcpListener, sync::Mutex, task::JoinHandle, time::Instant}; +use tokio::{ + net::TcpListener, + sync::{watch::Receiver, Mutex}, + task::JoinHandle, + time::Instant, +}; use tracing::{debug, error, info, warn}; use crate::{ - constants::{PORT_OWNERSHIP_TIMEOUT, PORT_RETRY_TIME}, + constants::{CACHE_STORE_INTERVAL, PORT_OWNERSHIP_TIMEOUT, PORT_RETRY_TIME}, packets::Packet, spawn, Config, Number, Port, UnixTimestamp, TIME_FORMAT, TIME_ZONE_OFFSET, }; @@ -32,7 +37,7 @@ pub struct PortHandler { #[serde(skip)] port_guards: HashMap, - allowed_ports: AllowedPorts, + allowed_ports: AllowedList, #[serde(skip)] free_ports: HashSet, @@ -42,6 +47,38 @@ pub struct PortHandler { pub port_state: HashMap, } +pub async fn cache_daemon( + port_handler: Arc>, + cache_path: PathBuf, + mut change_receiver: Receiver, +) { + let mut last_store = Instant::now() - 2 * CACHE_STORE_INTERVAL; + let mut change_timeout = None; + loop { + if let Some(change_timeout) = change_timeout.take() { + tokio::time::timeout(change_timeout, change_receiver.changed()) + .await + .unwrap_or(Ok(())) + } else { + change_receiver.changed().await + } + .expect("failed to wait for cache changes"); + + let time_since_last_store = last_store.elapsed(); + + if time_since_last_store >= CACHE_STORE_INTERVAL { + let port_handler = port_handler.lock().await; + + last_store = Instant::now(); + if let Err(err) = port_handler.store(&cache_path) { + error!("failed to store cache: {err:?}"); + } + } else { + change_timeout = Some(CACHE_STORE_INTERVAL - time_since_last_store); + } + } +} + #[derive(Hash, PartialEq, Eq)] struct DisplayAsDebug(T); impl Debug for DisplayAsDebug { @@ -60,7 +97,7 @@ fn duration_in_hours(duration: Duration) -> String { match (hours > 0, minutes > 0) { (true, _) => format!("{hours}h {minutes}min {seconds}s"), (false, true) => format!("{minutes}min {seconds}s"), - _ => format!("{:.0?}", duration), + _ => format!("{duration:.0?}"), } } @@ -69,9 +106,11 @@ fn format_instant(instant: Instant) -> String { (|| -> anyhow::Result<_> { let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)? - instant.elapsed(); - let date = time::OffsetDateTime::from_unix_timestamp(timestamp.as_secs() as i64)? - .to_offset(*TIME_ZONE_OFFSET.get().unwrap()) - .format(TIME_FORMAT.get().unwrap())?; + let date = time::OffsetDateTime::from_unix_timestamp( + timestamp.as_secs().try_into().expect("timestamp overflow"), + )? + .to_offset(*TIME_ZONE_OFFSET.get().unwrap()) + .format(TIME_FORMAT.get().unwrap())?; Ok(format!("{date} ({when})")) })() @@ -93,7 +132,7 @@ impl Debug for PortHandler { let mut free_ports = self.free_ports.iter().copied().collect::>(); - free_ports.sort(); + free_ports.sort_unstable(); let mut free_ports = free_ports .into_iter() @@ -123,8 +162,6 @@ impl Debug for PortHandler { .allocated_ports .iter() .map(|(&number, &port)| { - let state = &self.port_state[&port]; - #[derive(Debug)] #[allow(dead_code)] struct State { @@ -134,6 +171,8 @@ impl Debug for PortHandler { last_change: DisplayAsDebug, } + let state = &self.port_state[&port]; + State { state: state.status, number, @@ -145,7 +184,7 @@ impl Debug for PortHandler { }) .collect::>(); - allocated_ports.sort_by(|a, b| { + allocated_ports.sort_unstable_by(|a, b| { a.state.cmp(&b.state).then( self.port_state[&a.port] .last_change @@ -157,10 +196,10 @@ impl Debug for PortHandler { writeln!(f, "last update: {last_update}")?; writeln!(f, "rejectors: {:#?}", self.port_guards)?; writeln!(f, "allowed ports: {:?}", self.allowed_ports.0)?; - writeln!(f, "free ports: {:?}", free_ports)?; + writeln!(f, "free ports: {free_ports:?}")?; - writeln!(f, "errored ports: {:#?}", errored_ports)?; - writeln!(f, "allocated ports: {:#?}", allocated_ports)?; + writeln!(f, "errored ports: {errored_ports:#?}")?; + writeln!(f, "allocated ports: {allocated_ports:#?}")?; Ok(()) } @@ -210,18 +249,21 @@ impl Default for PortStatus { } #[derive(Default, Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] -pub struct AllowedPorts(Vec>); +pub struct AllowedList(Vec>); -impl AllowedPorts { +impl AllowedList { + #[must_use] pub fn is_allowed(&self, port: Port) -> bool { self.0.iter().any(|range| range.contains(&port)) } + #[must_use] pub fn is_empty(&self) -> bool { self.0.is_empty() } } impl PortHandler { + #[must_use] pub fn status_string(&self) -> String { format!("{self:#?}\n") } @@ -256,6 +298,7 @@ impl PortHandler { Ok(cache) } + #[must_use] pub fn load_or_default( path: &Path, change_sender: tokio::sync::watch::Sender, @@ -266,7 +309,7 @@ impl PortHandler { }) } - pub fn update_allowed_ports(&mut self, allowed_ports: &AllowedPorts) { + pub fn update_allowed_ports(&mut self, allowed_ports: &AllowedList) { self.register_update(); self.allowed_ports = allowed_ports.clone(); @@ -392,8 +435,7 @@ impl PortHandler { let already_connected = self .port_state .get(port) - .map(|state| state.status != PortStatus::Disconnected) - .unwrap_or(false); + .map_or(false, |state| state.status != PortStatus::Disconnected); if already_connected { None @@ -463,14 +505,11 @@ impl PortHandler { } let removable_entry = self.allocated_ports.iter().find(|(_, port)| { - self.port_state - .get(port) - .map(|port_state| { - port_state.status == PortStatus::Disconnected - && now.saturating_sub(Duration::from_secs(port_state.last_change)) - >= PORT_OWNERSHIP_TIMEOUT - }) - .unwrap_or(true) + self.port_state.get(port).map_or(true, |port_state| { + port_state.status == PortStatus::Disconnected + && now.saturating_sub(Duration::from_secs(port_state.last_change)) + >= PORT_OWNERSHIP_TIMEOUT + }) }); if let Some((&old_number, &port)) = removable_entry {