From 1ae573dd761917002d06659e32ee06158bfb4c59 Mon Sep 17 00:00:00 2001 From: soruh Date: Sun, 11 Jun 2023 00:12:02 +0200 Subject: [PATCH] start on new debug server --- Cargo.lock | 6 +- Cargo.toml | 6 +- src/constants.rs | 2 + src/debug_server.rs | 96 +++++++++++++++++++++++++-- src/main.rs | 9 ++- src/ports.rs | 153 ++------------------------------------------ 6 files changed, 107 insertions(+), 165 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d59ee31..c70648e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -187,6 +187,7 @@ dependencies = [ "serde_json", "time", "tokio", + "tokio-stream", "tracing", "tracing-error", "tracing-subscriber", @@ -1048,13 +1049,14 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.12" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index b94661d..2a418f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,9 +6,6 @@ edition = "2021" [profile.release] debug = true -[profile.dev.package.backtrace] -opt-level = 3 - # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -18,7 +15,7 @@ time = { version = "0.3.20", features = ["local-offset", "macros"] } bytemuck = { version = "1.13.0", features = ["derive"] } serde = { version = "1.0.152", features = ["derive"] } serde_json = "1.0.91" -hyper = { version = "0.14.24", optional = true, features = ["server", "http1", "tcp"] } +hyper = { version = "0.14.24", optional = true, features = ["server", "http1", "tcp", "stream"] } futures = { version = "0.3.27", default-features = false, features = ["std"] } tracing = "0.1.37" tracing-subscriber = { version = "0.3.16", features = ["time"] } @@ -28,6 +25,7 @@ eyre = "0.6.8" color-eyre = "0.6.2" tracing-error = "0.2.0" zerocopy = "0.6.1" +tokio-stream = { version = "0.1.14", features = ["sync"] } [features] default = ["debug_server"] diff --git a/src/constants.rs b/src/constants.rs index 189ea9c..e3ae6a5 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -9,3 +9,5 @@ pub const PING_TIMEOUT: Duration = Duration::from_secs(30); pub const SEND_PING_INTERVAL: Duration = Duration::from_secs(20); pub const CACHE_STORE_INTERVAL: Duration = Duration::from_secs(5); + +pub const DEBUG_SERVER_PING_INTERVAL: Duration = Duration::from_secs(5); diff --git a/src/debug_server.rs b/src/debug_server.rs index 9a85d95..0fd3bc2 100644 --- a/src/debug_server.rs +++ b/src/debug_server.rs @@ -1,16 +1,22 @@ use futures::Future; +use tokio_stream::StreamExt; + use hyper::rt::Executor; use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Response, Server}; +use hyper::{Body, Method, Response, Server, StatusCode}; use std::convert::Infallible; use std::net::SocketAddr; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::Mutex; +use tokio_stream::wrappers::{IntervalStream, WatchStream}; use tracing::error; + use zerocopy::{AsBytes, FromBytes, LittleEndian, Unaligned}; use tracing::{debug, instrument}; +use crate::constants::DEBUG_SERVER_PING_INTERVAL; use crate::packets::{Header, Packet}; use crate::ports::PortHandler; @@ -24,24 +30,100 @@ impl + Send + 'static> Executor } } -pub async fn debug_server(addr: SocketAddr, port_handler: Arc>) { +pub async fn debug_server( + addr: SocketAddr, + port_handler: Arc>, + change_receiver: tokio::sync::watch::Receiver, +) { let server = Server::bind(&addr) .executor(NamedExecutor) .serve(make_service_fn(move |_conn| { let port_handler = port_handler.clone(); + let change_receiver = change_receiver.clone(); async move { - Ok::<_, Infallible>(service_fn(move |_req| { + Ok::<_, Infallible>(service_fn(move |req| { let port_handler = port_handler.clone(); + let change_receiver = WatchStream::new(change_receiver.clone()); async move { - Ok::<_, Infallible>(Response::new(Body::from( - port_handler.lock().await.status_string(), - ))) + match (req.method(), req.uri().path()) { + (&Method::GET, "/") => Ok(Response::new(Body::from( + r#" + + + + + "#, + ))), + + (&Method::GET, "/data") => { + let res = Response::builder().header("Cache-Control", "no-store"); + + match serde_json::to_string(&*port_handler.lock().await) { + Ok(data) => res + .header("Content-Type", "application/json") + .body(Body::from(data)), + Err(err) => { + error!(%err, "failed to serialize data for debug server"); + res.status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from("")) + } + } + } + + (&Method::GET, "/events") => Response::builder() + .status(StatusCode::OK) + .header("Cache-Control", "no-store") + .header("Content-Type", "text/event-stream") + .body(Body::wrap_stream({ + change_receiver + .map(|x| ("change", x)) + .merge( + IntervalStream::new(tokio::time::interval( + DEBUG_SERVER_PING_INTERVAL, + )) + .map(|x| ("ping", x.into_std())), + ) + .filter_map(|(kind, time)| { + let timestamp = (SystemTime::now() + time.elapsed()) + .duration_since(UNIX_EPOCH) + .ok()? + .as_secs(); + + Some(Ok::<_, Infallible>(format!( + "event:{kind}\ndata: {timestamp}\n\n" + ))) + }) + })), + _ => Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::empty()), + } } })) } })); - // Run this server for... forever! if let Err(error) = server.await { error!(%error, "debug server error"); } diff --git a/src/main.rs b/src/main.rs index 4c0b6a6..3136c5a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,4 @@ #![warn(clippy::pedantic)] -// #![allow(clippy::missing_errors_doc)] use std::{ fmt::Debug, @@ -20,7 +19,7 @@ use tokio::{ io::AsyncWriteExt, net::{TcpListener, TcpStream}, sync::Mutex, - time::{sleep, Instant}, + time::sleep, }; use tracing::{error, info, instrument, warn, Level}; use tracing_subscriber::fmt::time::FormatTime; @@ -323,7 +322,7 @@ async fn tokio_main(config: Arc) -> eyre::Result<()> { let cache_path = PathBuf::from("cache.json"); - let (change_sender, change_receiver) = tokio::sync::watch::channel(Instant::now()); + let (change_sender, change_receiver) = tokio::sync::watch::channel(std::time::Instant::now()); let mut port_handler = PortHandler::load_or_default(&cache_path, change_sender); port_handler.update_allowed_ports(&config.allowed_ports); @@ -332,7 +331,7 @@ async fn tokio_main(config: Arc) -> eyre::Result<()> { spawn( "cache daemon", - cache_daemon(port_handler.clone(), cache_path, change_receiver), + cache_daemon(port_handler.clone(), cache_path, change_receiver.clone()), ); #[cfg(feature = "debug_server")] @@ -340,7 +339,7 @@ async fn tokio_main(config: Arc) -> eyre::Result<()> { warn!(%listen_addr, "debug server listening"); spawn( "debug server", - debug_server(listen_addr, port_handler.clone()), + debug_server(listen_addr, port_handler.clone(), change_receiver), ); } diff --git a/src/ports.rs b/src/ports.rs index 0c7fe68..af1effd 100644 --- a/src/ports.rs +++ b/src/ports.rs @@ -1,5 +1,4 @@ use std::{ - borrow::Cow, collections::{BTreeSet, HashMap, HashSet}, fmt::{Debug, Display}, fs::File, @@ -23,16 +22,16 @@ use tracing::{debug, error, info, instrument, warn}; use crate::{ constants::{CACHE_STORE_INTERVAL, PORT_OWNERSHIP_TIMEOUT, PORT_RETRY_TIME}, packets::Packet, - spawn, Config, Number, Port, UnixTimestamp, TIME_FORMAT, TIME_ZONE_OFFSET, + spawn, Config, Number, Port, UnixTimestamp, }; #[derive(Default, Serialize, Deserialize)] pub struct PortHandler { #[serde(skip)] - pub last_update: Option, + pub last_update: Option, #[serde(skip)] - pub change_sender: Option>, + pub change_sender: Option>, #[serde(skip)] port_guards: HashMap, @@ -55,7 +54,7 @@ pub struct PortHandler { pub async fn cache_daemon( port_handler: Arc>, cache_path: PathBuf, - mut change_receiver: Receiver, + mut change_receiver: Receiver, ) { let mut last_store = Instant::now() - 2 * CACHE_STORE_INTERVAL; let mut change_timeout = None; @@ -92,129 +91,6 @@ impl Debug for DisplayAsDebug { } } -fn duration_string(duration: Duration) -> String { - let seconds_elapsed = duration.as_secs(); - - let days = seconds_elapsed / (60 * 60 * 24); - let hours = seconds_elapsed / (60 * 60) % 24; - let minutes = (seconds_elapsed / 60) % 60; - let seconds = seconds_elapsed % 60; - - match (days > 0, hours > 0, minutes > 0) { - (true, _, _) => format!("{days}d {hours}h {minutes}min {seconds}s"), - (false, true, _) => format!("{hours}h {minutes}min {seconds}s"), - (false, false, true) => format!("{minutes}min {seconds}s"), - _ => format!("{duration:.0?}"), - } -} - -fn format_instant(instant: Instant) -> String { - let when = duration_string(instant.elapsed()) + " ago"; - - (|| -> eyre::Result<_> { - let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)? - instant.elapsed(); - let date = time::OffsetDateTime::from_unix_timestamp( - timestamp.as_secs().try_into().expect("timestamp overflow"), - )? - .to_offset(*TIME_ZONE_OFFSET.get().unwrap()) - .format(TIME_FORMAT.get().unwrap())?; - - Ok(format!("{date} ({when})")) - })() - .unwrap_or(when) -} - -fn instant_from_timestamp(timestamp: UnixTimestamp) -> Instant { - Instant::now() - UNIX_EPOCH.elapsed().unwrap() + Duration::from_secs(timestamp) -} - -#[cfg(feature = "debug_server")] -impl Debug for PortHandler { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - const SHOW_N_FREE_PORTS: usize = 10; - - let last_update = self - .last_update - .map(|last_update| Cow::from(format_instant(last_update))) - .unwrap_or(Cow::from("?")); - - let mut free_ports = self.free_ports.iter().copied().collect::>(); - - free_ports.sort_unstable(); - - let mut free_ports = free_ports - .into_iter() - .take(SHOW_N_FREE_PORTS) - .map(|x| DisplayAsDebug(x.to_string())) - .collect::>(); - - if let Some(n_not_shown) = self.free_ports.len().checked_sub(SHOW_N_FREE_PORTS) { - if n_not_shown > 0 { - free_ports.push(DisplayAsDebug(format!("[{n_not_shown} more]"))); - } - } - - let errored_ports = self - .errored_ports - .iter() - .rev() - .map(|&(since, port)| { - DisplayAsDebug(format!( - "{port:5}: {}", - format_instant(instant_from_timestamp(since)) - )) - }) - .collect::>(); - - let mut allocated_ports = self - .allocated_ports - .iter() - .map(|(&number, &port)| { - #[derive(Debug)] - #[allow(dead_code)] - struct State<'n> { - state: PortStatus, - name: &'n str, - number: u32, - port: u16, - last_change: DisplayAsDebug, - } - - let state = &self.port_state[&port]; - - State { - state: state.status, - number, - port, - name: self.names.get(&number).map_or("?", |x| x.as_str()), - last_change: DisplayAsDebug(format_instant(instant_from_timestamp( - state.last_change, - ))), - } - }) - .collect::>(); - - allocated_ports.sort_unstable_by(|a, b| { - a.state.cmp(&b.state).then( - self.port_state[&a.port] - .last_change - .cmp(&self.port_state[&b.port].last_change) - .reverse(), - ) - }); - - writeln!(f, "last update: {last_update}")?; - writeln!(f, "rejectors: {:#?}", self.port_guards)?; - writeln!(f, "allowed ports: {:?}", self.allowed_ports.0)?; - writeln!(f, "free ports: {free_ports:?}")?; - - writeln!(f, "errored ports: {errored_ports:#?}")?; - writeln!(f, "allocated ports: {allocated_ports:#?}")?; - - Ok(()) - } -} - #[derive(Default, Serialize, Deserialize)] pub struct PortState { last_change: UnixTimestamp, @@ -222,18 +98,6 @@ pub struct PortState { status: PortStatus, } -impl Debug for PortState { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PortState") - .field( - "last_change", - &DisplayAsDebug(format_instant(instant_from_timestamp(self.last_change))), - ) - .field("status", &self.status) - .finish() - } -} - impl PortState { pub fn new_state(&mut self, status: PortStatus) { self.last_change = SystemTime::now() @@ -273,13 +137,8 @@ impl AllowedList { } impl PortHandler { - #[must_use] - pub fn status_string(&self) -> String { - format!("{self:#?}\n") - } - pub fn register_update(&mut self) { - let now = Instant::now(); + let now = std::time::Instant::now(); self.last_update = Some(now); self.change_sender .as_ref() @@ -310,7 +169,7 @@ impl PortHandler { #[instrument(skip(change_sender))] pub fn load_or_default( path: &Path, - change_sender: tokio::sync::watch::Sender, + change_sender: tokio::sync::watch::Sender, ) -> Self { let mut this = Self::load(path).unwrap_or_else(|error| { error!(?path, %error, "failed to parse cache file");