From e17547ad8388ad105eb699fd1bf479629ed8b87e Mon Sep 17 00:00:00 2001 From: soruh Date: Sat, 18 Mar 2023 20:47:16 +0100 Subject: [PATCH] move logging to tracing --- Cargo.lock | 302 +++++++++------------------------------ Cargo.toml | 7 +- config-template.json | 2 + src/debug_server.rs | 5 +- src/main.rs | 331 +++++++++++++++++++++++++------------------ src/packets.rs | 9 +- src/ports.rs | 39 ++--- 7 files changed, 300 insertions(+), 395 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5d13aeb..8f5ee38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,15 +17,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" -[[package]] -name = "android_system_properties" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" -dependencies = [ - "libc", -] - [[package]] name = "anyhow" version = "1.0.69" @@ -146,12 +137,6 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" -[[package]] -name = "bumpalo" -version = "3.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" - [[package]] name = "bytemuck" version = "1.13.1" @@ -196,13 +181,15 @@ version = "0.1.0" dependencies = [ "anyhow", "bytemuck", - "chrono", "console-subscriber", "futures", "hyper", "serde", "serde_json", + "time", "tokio", + "tracing", + "tracing-subscriber", ] [[package]] @@ -211,31 +198,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "chrono" -version = "0.4.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e3c5919066adf22df73762e50cffcde3a758f2a848b113b586d1f86728b673b" -dependencies = [ - "iana-time-zone", - "js-sys", - "num-integer", - "num-traits", - "time", - "wasm-bindgen", - "winapi", -] - -[[package]] -name = "codespan-reporting" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e" -dependencies = [ - "termcolor", - "unicode-width", -] - [[package]] name = "console-api" version = "0.4.0" @@ -272,12 +234,6 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "core-foundation-sys" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" - [[package]] name = "crc32fast" version = "1.3.2" @@ -306,50 +262,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "cxx" -version = "1.0.92" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a140f260e6f3f79013b8bfc65e7ce630c9ab4388c6a89c71e07226f49487b72" -dependencies = [ - "cc", - "cxxbridge-flags", - "cxxbridge-macro", - "link-cplusplus", -] - -[[package]] -name = "cxx-build" -version = "1.0.92" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da6383f459341ea689374bf0a42979739dc421874f112ff26f829b8040b8e613" -dependencies = [ - "cc", - "codespan-reporting", - "once_cell", - "proc-macro2", - "quote", - "scratch", - "syn 1.0.109", -] - -[[package]] -name = "cxxbridge-flags" -version = "1.0.92" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90201c1a650e95ccff1c8c0bb5a343213bdd317c6e600a93075bca2eff54ec97" - -[[package]] -name = "cxxbridge-macro" -version = "1.0.92" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b75aed41bb2e6367cae39e6326ef817a851db13c13e4f3263714ca3cfb8de56" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "either" version = "1.8.1" @@ -445,7 +357,7 @@ checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" dependencies = [ "cfg-if", "libc", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi", ] [[package]] @@ -577,30 +489,6 @@ dependencies = [ "tokio-io-timeout", ] -[[package]] -name = "iana-time-zone" -version = "0.1.53" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64c122667b287044802d6ce17ee2ddf13207ed924c712de9a66a5814d5b64765" -dependencies = [ - "android_system_properties", - "core-foundation-sys", - "iana-time-zone-haiku", - "js-sys", - "wasm-bindgen", - "winapi", -] - -[[package]] -name = "iana-time-zone-haiku" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0703ae284fc167426161c2e3f1da3ea71d94b21bedbcc9494e92b28e334e3dca" -dependencies = [ - "cxx", - "cxx-build", -] - [[package]] name = "indexmap" version = "1.9.2" @@ -626,15 +514,6 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" -[[package]] -name = "js-sys" -version = "0.3.61" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "445dde2150c55e483f3d8416706b97ec8e8237c307e5b7b4b8dd15e6af2a0730" -dependencies = [ - "wasm-bindgen", -] - [[package]] name = "lazy_static" version = "1.4.0" @@ -647,15 +526,6 @@ version = "0.2.140" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c" -[[package]] -name = "link-cplusplus" -version = "1.0.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecd207c9c713c34f95a097a5b029ac2ce6010530c7b49d7fea24d977dede04f5" -dependencies = [ - "cc", -] - [[package]] name = "log" version = "0.4.17" @@ -715,7 +585,7 @@ checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9" dependencies = [ "libc", "log", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi", "windows-sys", ] @@ -730,13 +600,13 @@ dependencies = [ ] [[package]] -name = "num-integer" -version = "0.1.45" +name = "nu-ansi-term" +version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" dependencies = [ - "autocfg", - "num-traits", + "overload", + "winapi", ] [[package]] @@ -758,6 +628,15 @@ dependencies = [ "libc", ] +[[package]] +name = "num_threads" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" +dependencies = [ + "libc", +] + [[package]] name = "object" version = "0.30.3" @@ -773,6 +652,12 @@ version = "1.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "percent-encoding" version = "2.2.0" @@ -939,12 +824,6 @@ version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" -[[package]] -name = "scratch" -version = "1.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1792db035ce95be60c3f8853017b3999209281c24e2ba5bc8e59bf97a0c590c1" - [[package]] name = "serde" version = "1.0.156" @@ -994,6 +873,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "smallvec" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" + [[package]] name = "socket2" version = "0.4.9" @@ -1032,15 +917,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" -[[package]] -name = "termcolor" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6" -dependencies = [ - "winapi-util", -] - [[package]] name = "thread_local" version = "1.1.7" @@ -1053,13 +929,31 @@ dependencies = [ [[package]] name = "time" -version = "0.1.45" +version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a" +checksum = "cd0cbfecb4d19b5ea75bb31ad904eb5b9fa13f21079c3b92017ebdf4999a5890" dependencies = [ + "itoa", "libc", - "wasi 0.10.0+wasi-snapshot-preview1", - "winapi", + "num_threads", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd" + +[[package]] +name = "time-macros" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd80a657e71da814b8e5d60d3374fc6d35045062245d80224748ae522dd76f36" +dependencies = [ + "time-core", ] [[package]] @@ -1234,6 +1128,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "tracing-log" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.16" @@ -1241,12 +1146,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70" dependencies = [ "matchers", + "nu-ansi-term", "once_cell", "regex", "sharded-slab", + "smallvec", "thread_local", + "time", "tracing", "tracing-core", + "tracing-log", ] [[package]] @@ -1261,12 +1170,6 @@ version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4" -[[package]] -name = "unicode-width" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" - [[package]] name = "valuable" version = "0.1.0" @@ -1283,72 +1186,12 @@ dependencies = [ "try-lock", ] -[[package]] -name = "wasi" -version = "0.10.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" - [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" -[[package]] -name = "wasm-bindgen" -version = "0.2.84" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31f8dcbc21f30d9b8f2ea926ecb58f6b91192c17e9d33594b3df58b2007ca53b" -dependencies = [ - "cfg-if", - "wasm-bindgen-macro", -] - -[[package]] -name = "wasm-bindgen-backend" -version = "0.2.84" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95ce90fd5bcc06af55a641a86428ee4229e44e07033963a2290a8e241607ccb9" -dependencies = [ - "bumpalo", - "log", - "once_cell", - "proc-macro2", - "quote", - "syn 1.0.109", - "wasm-bindgen-shared", -] - -[[package]] -name = "wasm-bindgen-macro" -version = "0.2.84" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c21f77c0bedc37fd5dc21f897894a5ca01e7bb159884559461862ae90c0b4c5" -dependencies = [ - "quote", - "wasm-bindgen-macro-support", -] - -[[package]] -name = "wasm-bindgen-macro-support" -version = "0.2.84" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2aff81306fcac3c7515ad4e177f521b5c9a15f2b08f4e32d823066102f35a5f6" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", - "wasm-bindgen-backend", - "wasm-bindgen-shared", -] - -[[package]] -name = "wasm-bindgen-shared" -version = "0.2.84" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0046fef7e28c3804e5e38bfa31ea2a0f73905319b677e57ebe37e49358989b5d" - [[package]] name = "winapi" version = "0.3.9" @@ -1365,15 +1208,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" -[[package]] -name = "winapi-util" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" -dependencies = [ - "winapi", -] - [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 98f9896..c6d67f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,12 +12,13 @@ bytemuck = { version = "1.13.0", features = ["derive"] } serde = { version = "1.0.152", features = ["derive"] } serde_json = "1.0.91" hyper = { version = "0.14.24", optional = true, features = ["server", "http1", "tcp"] } -chrono = { version = "0.4.23", optional = true } futures = { version = "0.3.27", default-features = false, features = ["std"] } console-subscriber = { version = "0.1.8", optional = true } +tracing = "0.1.37" +tracing-subscriber = { version = "0.3.16", features = ["time"] } +time = { version = "0.3.20", features = ["local-offset", "macros"] } [features] -default = ["debug_server", "chrono"] -chrono = ["dep:chrono"] +default = ["debug_server"] debug_server = ["dep:hyper"] tokio_console = ["dep:console-subscriber"] diff --git a/config-template.json b/config-template.json index 43471ef..d7f2744 100644 --- a/config-template.json +++ b/config-template.json @@ -1,4 +1,6 @@ { + "time_format": "[year]:[month]:[day] [hour]:[minute]:[second]", + "log_level": "info", "dyn_ip_server": "127.0.0.1:11811", "listen_addr": "0.0.0.0:11820", "debug_server_addr": "0.0.0.0:4885", diff --git a/src/debug_server.rs b/src/debug_server.rs index 5b50c57..e6a5cc2 100644 --- a/src/debug_server.rs +++ b/src/debug_server.rs @@ -6,6 +6,7 @@ use std::convert::Infallible; use std::net::SocketAddr; use std::sync::Arc; use tokio::sync::Mutex; +use tracing::error; use crate::ports::PortHandler; use crate::spawn; @@ -36,7 +37,7 @@ pub async fn debug_server(addr: SocketAddr, port_handler: Arc })); // Run this server for... forever! - if let Err(e) = server.await { - eprintln!("server error: {}", e); + if let Err(error) = server.await { + error!(%error, "debug server error"); } } diff --git a/src/main.rs b/src/main.rs index 0b2c832..3b38f3a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,6 +13,7 @@ use debug_server::debug_server; use futures::Future; use packets::{Header, Packet, RemConnect}; use serde::{Deserialize, Deserializer}; +use time::format_description::OwnedFormatItem; use tokio::{ io::AsyncWriteExt, net::{TcpListener, TcpStream}, @@ -20,6 +21,7 @@ use tokio::{ sync::Mutex, time::{sleep, timeout, Instant}, }; +use tracing::{debug, error, info, warn, Level}; use crate::packets::{dyn_ip_update, PacketKind, REJECT_OOP, REJECT_TIMEOUT}; use crate::ports::{AllowedPorts, PortHandler, PortStatus}; @@ -54,6 +56,29 @@ pub struct Config { #[serde(deserialize_with = "maybe_parse_socket_addr")] #[serde(default)] debug_server_addr: Option, + + #[serde(deserialize_with = "parse_time_format")] + time_format: OwnedFormatItem, + + #[serde(deserialize_with = "parse_log_level")] + log_level: Level, +} + +fn parse_log_level<'de, D: Deserializer<'de>>(deserializer: D) -> Result { + use serde::de::Error; + + String::deserialize(deserializer)? + .parse() + .map_err(|err| D::Error::custom(err)) +} + +fn parse_time_format<'de, D: Deserializer<'de>>( + deserializer: D, +) -> Result { + use serde::de::Error; + + time::format_description::parse_owned::<2>(&String::deserialize(deserializer)?) + .map_err(|err| D::Error::custom(err)) } fn maybe_parse_socket_addr<'de, D: Deserializer<'de>>( @@ -87,7 +112,7 @@ fn parse_socket_addr<'de, D: Deserializer<'de>>(deserializer: D) -> Result) -> std::io::Result { - println!("loading config"); + info!("loading config"); Ok(serde_json::from_reader(BufReader::new(File::open(path)?))?) } } @@ -113,154 +138,182 @@ fn spawn( .unwrap_or_else(|err| panic!("failed to spawn {name:?}: {err:?}")) } -#[tokio::main] -async fn main() -> anyhow::Result<()> { - #[cfg(feature = "tokio_console")] - console_subscriber::init(); - +fn main() -> anyhow::Result<()> { let config = Arc::new(Config::load("config.json")?); if config.allowed_ports.is_empty() { panic!("no allowed ports"); } - let cache_path = PathBuf::from("cache.json"); + let local_offset = time::UtcOffset::local_offset_at(time::OffsetDateTime::UNIX_EPOCH)?; - let (change_sender, mut change_receiver) = tokio::sync::watch::channel(Instant::now()); + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()? + .block_on(async move { + { + use tracing_subscriber::prelude::*; + use tracing_subscriber::*; - let mut port_handler = PortHandler::load_or_default(&cache_path, change_sender); - port_handler.update_allowed_ports(&config.allowed_ports); + // build a `Subscriber` by combining layers with a + // `tracing_subscriber::Registry`: + let registry = tracing_subscriber::registry(); - let port_handler = Arc::new(Mutex::new(port_handler)); + #[cfg(feature = "tokio_console")] + let registry = registry.with(console_subscriber::spawn()); - { - let port_handler = port_handler.clone(); - spawn("cache daemon", async move { - let mut last_store = Instant::now() - 2 * CACHE_STORE_INTERVAL; - let mut change_timeout = None; - loop { - if let Some(change_timeout) = change_timeout.take() { - tokio::time::timeout(change_timeout, change_receiver.changed()) - .await - .unwrap_or(Ok(())) - } else { - change_receiver.changed().await - } - .expect("failed to wait for cache changes"); + registry + .with( + fmt::layer() + .with_target(false) + .with_timer(fmt::time::OffsetTime::new( + local_offset, + config.time_format.clone(), + )) + .with_filter(filter::LevelFilter::from_level(config.log_level)), + ) + .init(); + } - let time_since_last_store = last_store.elapsed(); + let cache_path = PathBuf::from("cache.json"); - if time_since_last_store > CACHE_STORE_INTERVAL { - let port_handler = port_handler.lock().await; + let (change_sender, mut change_receiver) = tokio::sync::watch::channel(Instant::now()); - last_store = Instant::now(); - if let Err(err) = port_handler.store(&cache_path) { - println!("failed to store cache: {err:?}"); + let mut port_handler = PortHandler::load_or_default(&cache_path, change_sender); + port_handler.update_allowed_ports(&config.allowed_ports); + + let port_handler = Arc::new(Mutex::new(port_handler)); + + { + let port_handler = port_handler.clone(); + spawn("cache daemon", async move { + let mut last_store = Instant::now() - 2 * CACHE_STORE_INTERVAL; + let mut change_timeout = None; + loop { + if let Some(change_timeout) = change_timeout.take() { + tokio::time::timeout(change_timeout, change_receiver.changed()) + .await + .unwrap_or(Ok(())) + } else { + change_receiver.changed().await + } + .expect("failed to wait for cache changes"); + + let time_since_last_store = last_store.elapsed(); + + if time_since_last_store > CACHE_STORE_INTERVAL { + let port_handler = port_handler.lock().await; + + last_store = Instant::now(); + if let Err(err) = port_handler.store(&cache_path) { + error!("failed to store cache: {err:?}"); + } + } else { + change_timeout = Some(CACHE_STORE_INTERVAL - time_since_last_store); + } } - } else { - change_timeout = Some(CACHE_STORE_INTERVAL - time_since_last_store); - } - } - }); - } - - #[cfg(feature = "debug_server")] - if let Some(debug_server_addr) = config.debug_server_addr { - println!("starting debug server on {debug_server_addr:?}"); - spawn( - "debug server", - debug_server(debug_server_addr, port_handler.clone()), - ); - } - - let listener = TcpListener::bind(config.listen_addr).await?; - println!("listening on {}", config.listen_addr); - - while let Ok((mut stream, addr)) = listener.accept().await { - println!("connection from {addr}"); - - let port_handler = port_handler.clone(); - let config = config.clone(); - - let mut handler_metadata = HandlerMetadata::default(); - - spawn(&format!("connection to {addr}"), async move { - use futures::future::FutureExt; - - let res = std::panic::AssertUnwindSafe(connection_handler( - &config, - &mut handler_metadata, - &port_handler, - &mut stream, - )) - .catch_unwind() - .await; - - let error = match res { - Err(err) => { - let err = err - .downcast::() - .map(|err| *err) - .unwrap_or_else(|_| "?".to_owned()); - - Some(format!("panic at: {err}")) - } - Ok(Err(err)) => Some(err.to_string()), - Ok(Ok(())) => None, - }; - - if let Some(err) = error { - println!("client at {addr} had an error: {err}"); - - let mut packet = Packet::default(); - - packet.data.extend_from_slice(err.as_bytes()); - packet.data.truncate((u8::MAX - 1) as usize); - packet.data.push(0); - packet.header = Header { - kind: PacketKind::Error.raw(), - length: packet.data.len() as u8, - }; - - let (_, mut writer) = stream.split(); - let _ = packet.send(&mut writer).await; + }); } - if let Some(port) = handler_metadata.port { - let mut port_handler = port_handler.lock().await; + #[cfg(feature = "debug_server")] + if let Some(listen_addr) = config.debug_server_addr { + warn!(%listen_addr, "debug server listening"); + spawn( + "debug server", + debug_server(listen_addr, port_handler.clone()), + ); + } - if let Some(port_state) = port_handler.port_state.get_mut(&port) { - port_state.new_state(PortStatus::Disconnected); - port_handler.register_update(); - } + let listener = TcpListener::bind(config.listen_addr).await?; + warn!( + listen_addr = %config.listen_addr, + "centralex server listening" + ); - if let Some(listener) = handler_metadata.listener.take() { - let res = port_handler.start_rejector( - port, - listener, - Packet { - header: Header { - kind: PacketKind::Reject.raw(), - length: 3, - }, - data: b"nc\0".to_vec(), - }, - ); + while let Ok((mut stream, addr)) = listener.accept().await { + info!(%addr, "new connection"); - if let Err(err) = res { - println!( - "failed to start rejector on port {port} after client error: {err}" - ); + let port_handler = port_handler.clone(); + let config = config.clone(); + + let mut handler_metadata = HandlerMetadata::default(); + + spawn(&format!("connection to {addr}"), async move { + use futures::future::FutureExt; + + let res = std::panic::AssertUnwindSafe(connection_handler( + &config, + &mut handler_metadata, + &port_handler, + &mut stream, + )) + .catch_unwind() + .await; + + let error = match res { + Err(err) => { + let err = err + .downcast::() + .map(|err| *err) + .unwrap_or_else(|_| "?".to_owned()); + + Some(format!("panic at: {err}")) + } + Ok(Err(err)) => Some(err.to_string()), + Ok(Ok(())) => None, + }; + + if let Some(error) = error { + error!(%addr, %error, "Client had an error"); + + let mut packet = Packet::default(); + + packet.data.extend_from_slice(error.as_bytes()); + packet.data.truncate((u8::MAX - 1) as usize); + packet.data.push(0); + packet.header = Header { + kind: PacketKind::Error.raw(), + length: packet.data.len() as u8, + }; + + let (_, mut writer) = stream.split(); + let _ = packet.send(&mut writer).await; } - } + + if let Some(port) = handler_metadata.port { + let mut port_handler = port_handler.lock().await; + + if let Some(port_state) = port_handler.port_state.get_mut(&port) { + port_state.new_state(PortStatus::Disconnected); + port_handler.register_update(); + } + + if let Some(listener) = handler_metadata.listener.take() { + let res = port_handler.start_rejector( + port, + listener, + Packet { + header: Header { + kind: PacketKind::Reject.raw(), + length: 3, + }, + data: b"nc\0".to_vec(), + }, + ); + + if let Err(error) = res { + error!(%port, %error, "failed to start rejector"); + } + } + } + + sleep(Duration::from_secs(3)).await; + let _ = stream.shutdown().await; + }); } - sleep(Duration::from_secs(3)).await; - let _ = stream.shutdown().await; - }); - } - - Ok(()) + Ok(()) + }) } #[derive(Debug, Default)] @@ -301,7 +354,7 @@ async fn connection_handler( .await .allocate_port_for_number(config, number); - println!("allocated port: {:?}", port); + info!(port, "allocated port"); let Some(port) = port else { writer.write_all(REJECT_OOP).await?; @@ -383,8 +436,18 @@ async fn connection_handler( let mut last_ping_received_at = Instant::now(); let result = loop { - // println!("next ping in {:?}s", SEND_PING_INTERVAL.saturating_sub(now.saturating_duration_since(last_ping_sent_at)).as_secs()); - // println!("will timeout in in {:?}s", PING_TIMEOUT.saturating_sub(now.saturating_duration_since(last_ping_received_at)).as_secs()); + debug!( + seconds = SEND_PING_INTERVAL + .saturating_sub(last_ping_sent_at.elapsed()) + .as_secs(), + "next ping in" + ); + debug!( + seconds = PING_TIMEOUT + .saturating_sub(last_ping_received_at.elapsed()) + .as_secs(), + "timeout in", + ); let send_next_ping_in = SEND_PING_INTERVAL.saturating_sub(last_ping_sent_at.elapsed()); let next_ping_expected_in = PING_TIMEOUT.saturating_sub(last_ping_received_at.elapsed()); @@ -398,14 +461,14 @@ async fn connection_handler( packet.recv_into(&mut reader).await?; if packet.kind() == PacketKind::Ping { - // println!("received ping"); + debug!("received ping"); last_ping_received_at = Instant::now(); } else { break Result::Packet { packet } } }, _ = sleep(send_next_ping_in) => { - // println!("sending ping"); + debug!("sending ping"); writer.write_all(bytemuck::bytes_of(& Header { kind: PacketKind::Ping.raw(), length: 0 })).await?; last_ping_sent_at = Instant::now(); } @@ -422,7 +485,7 @@ async fn connection_handler( packet.kind(), packets::PacketKind::End | packets::PacketKind::Reject ) { - println!("got disconnect packet: {packet:?}"); + info!(?packet, "got disconnect packet"); if packet.kind() == packets::PacketKind::End { packet.header.kind = packets::PacketKind::Reject.raw(); @@ -449,7 +512,7 @@ async fn connection_handler( stream, addr, } => { - println!("got caller from: {addr}"); + info!(%addr, "got caller from"); packet.data.clear(); /* The I-Telex Clients can't handle data in this packet due to a bug diff --git a/src/packets.rs b/src/packets.rs index cfcebab..c81fc9b 100644 --- a/src/packets.rs +++ b/src/packets.rs @@ -6,6 +6,7 @@ use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::tcp::{ReadHalf, WriteHalf}, }; +use tracing::info; pub const REJECT_OOP: &[u8; 6] = b"\x04\x04oop\x00"; pub const REJECT_TIMEOUT: &[u8; 10] = b"\x04\x08timeout\x00"; @@ -183,7 +184,7 @@ pub async fn dyn_ip_update( pin: u16, port: u16, ) -> anyhow::Result { - println!("starting dyn ip update for number={number} port={port}..."); + info!(%number, %port, "starting dyn ip update"); let mut packet = Packet::default(); packet.header = Header { @@ -205,7 +206,7 @@ pub async fn dyn_ip_update( packet.recv_into(&mut reader).await?; - let res = match packet.kind() { + let result = match packet.kind() { PacketKind::DynIpUpdateResponse => Ok(<[u8; 4]>::try_from(packet.data) .map_err(|err| { anyhow::anyhow!( @@ -234,7 +235,7 @@ pub async fn dyn_ip_update( _ => bail!("server returned unexpected packet"), }; - println!("finished dyn ip update result: {res:?}"); + info!(?result, "finished dyn ip update"); - res + result } diff --git a/src/ports.rs b/src/ports.rs index 351db40..76cb792 100644 --- a/src/ports.rs +++ b/src/ports.rs @@ -13,6 +13,7 @@ use std::{ use anyhow::anyhow; use serde::{Deserialize, Serialize}; use tokio::{net::TcpListener, sync::Mutex, task::JoinHandle, time::Instant}; +use tracing::{error, info, warn}; use crate::{ packets::Packet, spawn, Config, Number, Port, UnixTimestamp, PORT_OWNERSHIP_TIMEOUT, @@ -65,6 +66,8 @@ fn duration_in_hours(duration: Duration) -> String { fn format_instant(instant: Instant) -> String { let when = duration_in_hours(instant.elapsed()) + " ago"; + todo!(); + #[cfg(feature = "chrono")] let when = (|| -> anyhow::Result<_> { use chrono::{Local, TimeZone}; @@ -208,7 +211,7 @@ impl PortHandler { } pub fn store(&self, cache: &Path) -> anyhow::Result<()> { - println!("storing cache"); + info!("storing cache"); let temp_file = cache.with_extension(".temp"); serde_json::to_writer(BufWriter::new(File::create(&temp_file)?), self)?; @@ -221,18 +224,18 @@ impl PortHandler { cache: &Path, change_sender: tokio::sync::watch::Sender, ) -> std::io::Result { - println!("loading cache"); + info!("loading cache"); let mut cache: Self = serde_json::from_reader(BufReader::new(File::open(cache)?))?; cache.change_sender = Some(change_sender); Ok(cache) } pub fn load_or_default( - cache: &Path, + path: &Path, change_sender: tokio::sync::watch::Sender, ) -> Self { - Self::load(cache, change_sender).unwrap_or_else(|err| { - println!("failed to parse cache file at {cache:?} using empty cache. error: {err}"); + Self::load(path, change_sender).unwrap_or_else(|error| { + error!(?path, %error, "failed to parse cache file"); Self::default() }) } @@ -280,7 +283,7 @@ impl PortHandler { listener: TcpListener, packet: Packet, ) -> anyhow::Result<()> { - println!("starting rejector: for port {port} with {packet:?}"); + info!(port, ?packet, "starting rejector"); let port_guard = Rejector::start(listener, packet); @@ -293,7 +296,7 @@ impl PortHandler { } pub async fn stop_rejector(&mut self, port: Port) -> Option<(TcpListener, Packet)> { - println!("stopping rejector: for port {port}"); + info!(port, "stopping rejector"); Some(self.port_guards.remove(&port)?.stop().await) } @@ -394,10 +397,10 @@ impl PortHandler { if recovered_port.is_none() && now.saturating_sub(Duration::from_secs(timestamp)) >= PORT_RETRY_TIME { - println!( - " trying port: {port} at -{:?}", - Duration::from_secs(now.as_secs()) - .saturating_sub(Duration::from_secs(timestamp)) + info!( + port, + last_try = ?Duration::from_secs(now.as_secs()).saturating_sub(Duration::from_secs(timestamp)), + "retrying errored port", ); match std::net::TcpListener::bind((config.listen_addr.ip(), port)) { @@ -408,10 +411,10 @@ impl PortHandler { Err(_) => timestamp = now.as_secs(), } } else { - println!( - "skipped port: {port} at -{:?}", - Duration::from_secs(now.as_secs()) - .saturating_sub(Duration::from_secs(timestamp)) + info!( + port, + last_try = ?Duration::from_secs(now.as_secs()).saturating_sub(Duration::from_secs(timestamp)), + "skipped retrying errored port", ); } @@ -421,7 +424,7 @@ impl PortHandler { if let Some((_, port)) = recovered_port { self.register_update(); - println!("recovered_port: {port}"); + info!(port, "recovered port"); return Some(port); } @@ -440,7 +443,7 @@ impl PortHandler { if let Some((&old_number, &port)) = removable_entry { self.register_update(); - println!("reused port {port} which used to be allocated to {old_number} which wasn't connected in a long time"); + info!(port, old_number, "reused port"); assert!(self.allocated_ports.remove(&old_number).is_some()); return Some(port); } @@ -449,7 +452,7 @@ impl PortHandler { } pub fn mark_port_error(&mut self, number: Number, port: Port) { - println!("registering an error on port {port} for number {number}"); + warn!(port, number, "registering an error on"); self.register_update(); self.errored_ports.insert((