diff --git a/Cargo.lock b/Cargo.lock index 6bb0b3c..5d9afd4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -22,9 +22,6 @@ name = "anyhow" version = "1.0.70" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7de8ce5e0f9f8d88245311066a578d72b7af3e7088f32783804676302df237e4" -dependencies = [ - "backtrace", -] [[package]] name = "async-stream" @@ -179,9 +176,10 @@ checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" name = "centralex" version = "0.1.0" dependencies = [ - "anyhow", "bytemuck", + "color-eyre", "console-subscriber", + "eyre", "futures", "hyper", "once_cell", @@ -190,6 +188,7 @@ dependencies = [ "time", "tokio", "tracing", + "tracing-error", "tracing-subscriber", ] @@ -199,6 +198,33 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "color-eyre" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a667583cca8c4f8436db8de46ea8233c42a7d9ae424a82d338f2e4675229204" +dependencies = [ + "backtrace", + "color-spantrace", + "eyre", + "indenter", + "once_cell", + "owo-colors", + "tracing-error", +] + +[[package]] +name = "color-spantrace" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ba75b3d9449ecdccb27ecbc479fdc0b87fa2dd43d2f8298f9bf0e59aacc8dce" +dependencies = [ + "once_cell", + "owo-colors", + "tracing-core", + "tracing-error", +] + [[package]] name = "console-api" version = "0.4.0" @@ -269,6 +295,16 @@ version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91" +[[package]] +name = "eyre" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c2b6b5a29c02cdc822728b7d7b8ae1bab3e3b05d44522770ddd49722eeac7eb" +dependencies = [ + "indenter", + "once_cell", +] + [[package]] name = "flate2" version = "1.0.25" @@ -490,6 +526,12 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "indenter" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" + [[package]] name = "indexmap" version = "1.9.2" @@ -659,6 +701,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "owo-colors" +version = "3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" + [[package]] name = "percent-encoding" version = "2.2.0" @@ -1119,6 +1167,16 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-error" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d686ec1c0f384b1277f097b2f279a2ecc11afe8c133c1aabf036a27cb4cd206e" +dependencies = [ + "tracing", + "tracing-subscriber", +] + [[package]] name = "tracing-futures" version = "0.2.5" diff --git a/Cargo.toml b/Cargo.toml index 71429e7..0368cf6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,21 +3,30 @@ name = "centralex" version = "0.1.0" edition = "2021" +[profile.release] +debug = true + +[profile.dev.package.backtrace] +opt-level = 3 + + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] tokio = { version = "1.24.2", features = ["macros", "rt-multi-thread", "net", "io-util", "sync", "time"] } -anyhow = { version = "1.0.68", features = ["backtrace"] } +time = { version = "0.3.20", features = ["local-offset", "macros"] } bytemuck = { version = "1.13.0", features = ["derive"] } serde = { version = "1.0.152", features = ["derive"] } serde_json = "1.0.91" hyper = { version = "0.14.24", optional = true, features = ["server", "http1", "tcp"] } futures = { version = "0.3.27", default-features = false, features = ["std"] } -console-subscriber = { version = "0.1.8", optional = true } tracing = "0.1.37" tracing-subscriber = { version = "0.3.16", features = ["time"] } -time = { version = "0.3.20", features = ["local-offset", "macros"] } +console-subscriber = { version = "0.1.8", optional = true } once_cell = "1.17.1" +eyre = "0.6.8" +color-eyre = "0.6.2" +tracing-error = "0.2.0" [features] default = ["debug_server", "tokio_console"] diff --git a/src/auth.rs b/src/auth.rs index b1ed0fa..6df0281 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -1,19 +1,20 @@ use std::net::SocketAddr; -use anyhow::bail; -use tracing::debug; +use eyre::eyre; +use tracing::{debug, instrument}; use crate::packets::{Header, Packet, PacketKind}; /// # Errors /// - the dyn ip server returns a malformed response or is unreachable /// - the authentication fails +#[instrument] pub async fn dyn_ip_update( server: &SocketAddr, number: u32, pin: u16, port: u16, -) -> anyhow::Result { +) -> eyre::Result { debug!(%number, %port, "starting dyn ip update"); let mut packet = Packet { @@ -41,7 +42,7 @@ pub async fn dyn_ip_update( let result = match packet.kind() { PacketKind::DynIpUpdateResponse => Ok(<[u8; 4]>::try_from(packet.data) .map_err(|err| { - anyhow::anyhow!( + eyre!( "too little data for ip address. Need 4 bytes got {}", err.len() ) @@ -54,13 +55,13 @@ pub async fn dyn_ip_update( .enumerate() .find_map(|(i, x)| (*x == 0).then_some(i)); - bail!( + return Err(eyre!( "{}", std::str::from_utf8(first_zero.map_or(&packet.data, |i| &packet.data[..i]),)? - ) + )); } - _ => bail!("server returned unexpected packet"), + _ => return Err(eyre!("server returned unexpected packet")), }; debug!(?result, "finished dyn ip update"); diff --git a/src/client.rs b/src/client.rs index 96b4e2c..ed4254c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,4 @@ -use anyhow::{anyhow, bail, Context}; +use eyre::eyre; use std::{net::SocketAddr, time::Instant}; use tokio::{ io::AsyncWriteExt, @@ -10,7 +10,7 @@ use tokio::{ sync::Mutex, time::{sleep, timeout}, }; -use tracing::{info, trace}; +use tracing::{info, instrument, trace}; use crate::{ auth::dyn_ip_update, @@ -22,13 +22,14 @@ use crate::{ /// # Errors /// - the client authentication fails +#[instrument(skip(config, port_handler, handler_metadata))] async fn authenticate( config: &Config, port_handler: &Mutex, handler_metadata: &mut HandlerMetadata, number: u32, pin: u16, -) -> anyhow::Result> { +) -> eyre::Result> { let mut authenticated = false; loop { let mut updated_server = false; @@ -44,9 +45,7 @@ async fn authenticate( // make sure the client is authenticated before opening any ports if !authenticated { - let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port) - .await - .context("dy-ip update")?; + let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port).await?; authenticated = true; updated_server = true; } @@ -68,9 +67,7 @@ async fn authenticate( // we need to update the server here once a port that can be opened // has been found if !updated_server { - let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port) - .await - .context("dy-ip update")?; + let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port).await?; } port_handler.register_update(); @@ -101,12 +98,13 @@ enum IdleResult { }, } +#[instrument(skip(listener, reader, writer, packet))] async fn idle( listener: &mut TcpListener, mut packet: Packet, reader: &mut ReadHalf<'_>, writer: &mut WriteHalf<'_>, -) -> anyhow::Result> { +) -> eyre::Result> { let mut last_ping_sent_at = Instant::now(); let mut last_ping_received_at = Instant::now(); @@ -132,8 +130,8 @@ async fn idle( let (stream, addr) = caller?; break Ok(Some(IdleResult::Caller { packet, stream, addr })) }, - _ = Packet::peek_packet_kind( reader) => { - packet.recv_into( reader).await?; + _ = Packet::peek_packet_kind(reader) => { + packet.recv_into(reader).await?; if packet.kind() == PacketKind::Ping { trace!("received ping"); @@ -148,6 +146,7 @@ async fn idle( last_ping_sent_at = Instant::now(); } _ = sleep(next_ping_expected_in) => { + writer.write_all(REJECT_TIMEOUT).await?; break Ok(None); } @@ -155,13 +154,14 @@ async fn idle( } } +#[instrument(skip(port_handler, handler_metadata, writer))] async fn notify_or_disconnect( result: IdleResult, handler_metadata: &mut HandlerMetadata, port_handler: &Mutex, port: u16, writer: &mut WriteHalf<'_>, -) -> anyhow::Result> { +) -> eyre::Result> { match result { IdleResult::Disconnect { mut packet } => { if matches!(packet.kind(), PacketKind::End | PacketKind::Reject) { @@ -184,7 +184,7 @@ async fn notify_or_disconnect( ); Ok(None) } else { - Err(anyhow!("unexpected packet: {:?}", packet.kind())) + Err(eyre!("unexpected packet: {:?}", packet.kind())) } } IdleResult::Caller { @@ -213,14 +213,21 @@ async fn notify_or_disconnect( } } +fn print_addr(stream: &TcpStream) -> String { + stream + .peer_addr() + .map_or_else(|_| "?".to_owned(), |addr| format!("{addr}")) +} + +#[instrument(skip(packet, port_handler, handler_metadata, caller, client), fields(client_addr = print_addr(client), caller_addr = print_addr(caller)))] async fn connect( mut packet: Packet, port_handler: &Mutex, port: u16, handler_metadata: &mut HandlerMetadata, - stream: &mut TcpStream, client: &mut TcpStream, -) -> anyhow::Result<()> { + caller: &mut TcpStream, +) -> eyre::Result<()> { packet.header = Header { kind: PacketKind::Reject.raw(), length: 4, @@ -249,10 +256,10 @@ async fn connect( ); } - stream.set_nodelay(true)?; client.set_nodelay(true)?; + caller.set_nodelay(true)?; - let _ = timeout(CALL_TIMEOUT, tokio::io::copy_bidirectional(stream, client)).await; + let _ = timeout(CALL_TIMEOUT, tokio::io::copy_bidirectional(client, caller)).await; { let mut port_handler = port_handler.lock().await; @@ -286,14 +293,15 @@ async fn connect( /// - accepting a tcp connection fails /// - settings tcp socket properties fails /// - the client authentication fails +#[instrument(skip_all)] pub async fn handler( - stream: &mut TcpStream, + client: &mut TcpStream, addr: SocketAddr, config: &Config, handler_metadata: &mut HandlerMetadata, port_handler: &Mutex, -) -> anyhow::Result<()> { - let (mut reader, mut writer) = stream.split(); +) -> eyre::Result<()> { + let (mut reader, mut writer) = client.split(); let mut packet = Packet::default(); @@ -334,7 +342,7 @@ pub async fn handler( return Ok(()); }; - let Some((mut client, mut packet)) = notify_or_disconnect(idle_result, handler_metadata, port_handler, port, &mut writer).await? else { + let Some((mut caller, mut packet)) = notify_or_disconnect(idle_result, handler_metadata, port_handler, port, &mut writer).await? else { return Ok(()); }; @@ -369,12 +377,12 @@ pub async fn handler( port_handler, port, handler_metadata, - stream, - &mut client, + client, + &mut caller, ) .await } - kind => bail!("unexpected packet: {:?}", kind), + kind => Err(eyre!("unexpected packet: {:?}", kind)), } } diff --git a/src/main.rs b/src/main.rs index fa8787a..a776882 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,7 +22,8 @@ use tokio::{ sync::Mutex, time::{sleep, Instant}, }; -use tracing::{error, info, warn, Level}; +use tracing::{error, info, instrument, warn, Level}; +use tracing_subscriber::fmt::time::FormatTime; use crate::packets::PacketKind; use crate::ports::{cache_daemon, AllowedList, PortHandler, PortStatus}; @@ -112,25 +113,28 @@ impl Config { } } -#[cfg(not(feature = "tokio_console"))] -#[track_caller] -fn spawn( - _name: &str, - future: impl Future + Send + 'static, -) -> tokio::task::JoinHandle { - tokio::spawn(future) -} - -#[cfg(feature = "tokio_console")] #[track_caller] fn spawn( name: &str, future: impl Future + Send + 'static, ) -> tokio::task::JoinHandle { - tokio::task::Builder::new() + use tracing::Instrument; + + let future = future.instrument(tracing::span!( + Level::TRACE, + "spawn", + name = name, + caller = %std::panic::Location::caller().to_string() + )); + + #[cfg(feature = "tokio_console")] + return tokio::task::Builder::new() .name(name) .spawn(future) - .unwrap_or_else(|err| panic!("failed to spawn {name:?}: {err:?}")) + .unwrap_or_else(|err| panic!("failed to spawn {name:?}: {err:?}")); + + #[cfg(not(feature = "tokio_console"))] + return tokio::spawn(future); } static TIME_ZONE_OFFSET: once_cell::sync::OnceCell = @@ -139,8 +143,66 @@ static TIME_ZONE_OFFSET: once_cell::sync::OnceCell = static TIME_FORMAT: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); fn setup_tracing(config: &Config) { + use tracing::Subscriber; + use tracing_error::ErrorLayer; use tracing_subscriber::prelude::*; - use tracing_subscriber::{filter, fmt}; + use tracing_subscriber::{ + filter, + fmt::{self, FormatEvent, FormatFields}, + registry::LookupSpan, + }; + + struct EventFormater; + impl FormatEvent for EventFormater + where + S: Subscriber + for<'a> LookupSpan<'a>, + N: for<'a> FormatFields<'a> + 'static, + { + fn format_event( + &self, + ctx: &fmt::FmtContext<'_, S, N>, + mut writer: fmt::format::Writer<'_>, + event: &tracing::Event<'_>, + ) -> std::fmt::Result { + use color_eyre::owo_colors::OwoColorize; + + let meta = event.metadata(); + + fmt::time::OffsetTime::new( + *TIME_ZONE_OFFSET.get().unwrap(), + TIME_FORMAT.get().unwrap(), + ) + .format_time(&mut writer)?; + + // TODO: check writer.has_ansi_escapes() + + let level = *meta.level(); + match level { + Level::TRACE => write!(writer, " {:>5} ", level.purple())?, + Level::DEBUG => write!(writer, " {:>5} ", level.cyan())?, + Level::INFO => write!(writer, " {:>5} ", level.green())?, + Level::WARN => write!(writer, " {:>5} ", level.yellow())?, + Level::ERROR => write!(writer, " {:>5} ", level.red())?, + } + + write!(writer, "{:17}{}", meta.target().dimmed(), ":".bold())?; + + /* + if let Some(filename) = meta.file() { + write!(writer, " {}{}", filename.bold(), ":".dimmed())?; + } + if let Some(line_number) = meta.line() { + write!(writer, "{}{}", line_number.bold(), ":".dimmed())?; + } + */ + + writer.write_char(' ')?; + + ctx.format_fields(writer.by_ref(), event)?; + + writeln!(writer) + } + } // build a `Subscriber` by combining layers with a // `tracing_subscriber::Registry`: @@ -150,13 +212,11 @@ fn setup_tracing(config: &Config) { let registry = registry.with(console_subscriber::spawn()); registry + .with(ErrorLayer::default()) .with( fmt::layer() .with_target(true) - .with_timer(fmt::time::OffsetTime::new( - *TIME_ZONE_OFFSET.get().unwrap(), - TIME_FORMAT.get().unwrap(), - )) + .event_format(EventFormater) .with_filter(filter::LevelFilter::from_level(config.log_level)) .with_filter(tracing_subscriber::filter::filter_fn(|meta| { meta.target().starts_with("centralex") @@ -165,6 +225,7 @@ fn setup_tracing(config: &Config) { .init(); } +#[instrument(skip(stream, config, port_handler))] async fn connection_handler( mut stream: TcpStream, addr: SocketAddr, @@ -186,13 +247,7 @@ async fn connection_handler( .await; let error = match res { - Err(err) => { - let err = err - .downcast::() - .map_or_else(|_| "?".to_owned(), |err| *err); - - Some(format!("panic at: {err}")) - } + Err(_) => Some("internal server error".to_owned()), Ok(Err(err)) => Some(err.to_string()), Ok(Ok(())) => None, }; @@ -241,7 +296,9 @@ async fn connection_handler( let _ = stream.shutdown().await; } -fn main() -> anyhow::Result<()> { +fn main() -> eyre::Result<()> { + color_eyre::install()?; + let config = Arc::new(Config::load("config.json")?); assert!(!config.allowed_ports.is_empty(), "no allowed ports"); @@ -261,7 +318,7 @@ fn main() -> anyhow::Result<()> { .block_on(tokio_main(config)) } -async fn tokio_main(config: Arc) -> anyhow::Result<()> { +async fn tokio_main(config: Arc) -> eyre::Result<()> { setup_tracing(&config); let cache_path = PathBuf::from("cache.json"); diff --git a/src/packets.rs b/src/packets.rs index 038b1f8..183d537 100644 --- a/src/packets.rs +++ b/src/packets.rs @@ -1,7 +1,7 @@ use std::fmt::Debug; -use anyhow::bail; use bytemuck::{Pod, Zeroable}; +use eyre::eyre; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::tcp::{ReadHalf, WriteHalf}, @@ -169,16 +169,19 @@ impl Packet { /// # Errors /// the packet must be a `RemConnect` packet and must contain at least 6 bytes of data - pub fn as_rem_connect(&self) -> anyhow::Result { + pub fn as_rem_connect(&self) -> eyre::Result { if self.kind() != PacketKind::RemConnect { - bail!("Unexpected Packet: {:?} expected RemConnect", self.kind()); + return Err(eyre!( + "Unexpected Packet: {:?} expected RemConnect", + self.kind() + )); } if self.data.len() < 6 { - bail!( + return Err(eyre!( "Too little data for RemConnect. Need at least 6 Bytes got {}", self.data.len() - ); + )); } Ok(RemConnect { diff --git a/src/ports.rs b/src/ports.rs index b1ff5ec..eedda49 100644 --- a/src/ports.rs +++ b/src/ports.rs @@ -10,7 +10,7 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; -use anyhow::anyhow; +use eyre::eyre; use serde::{Deserialize, Serialize}; use tokio::{ net::TcpListener, @@ -18,7 +18,7 @@ use tokio::{ task::JoinHandle, time::Instant, }; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, instrument, warn}; use crate::{ constants::{CACHE_STORE_INTERVAL, PORT_OWNERSHIP_TIMEOUT, PORT_RETRY_TIME}, @@ -47,6 +47,7 @@ pub struct PortHandler { pub port_state: HashMap, } +#[instrument(skip(port_handler, change_receiver))] pub async fn cache_daemon( port_handler: Arc>, cache_path: PathBuf, @@ -104,7 +105,7 @@ fn duration_in_hours(duration: Duration) -> String { fn format_instant(instant: Instant) -> String { let when = duration_in_hours(instant.elapsed()) + " ago"; - (|| -> anyhow::Result<_> { + (|| -> eyre::Result<_> { let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)? - instant.elapsed(); let date = time::OffsetDateTime::from_unix_timestamp( timestamp.as_secs().try_into().expect("timestamp overflow"), @@ -279,6 +280,7 @@ impl PortHandler { } #[allow(clippy::missing_errors_doc)] + #[instrument(skip(self))] pub fn store(&self, cache: &Path) -> std::io::Result<()> { debug!("storing cache"); let temp_file = cache.with_extension(".temp"); @@ -290,6 +292,7 @@ impl PortHandler { } #[allow(clippy::missing_errors_doc)] + #[instrument(skip(change_sender))] pub fn load( cache: &Path, change_sender: tokio::sync::watch::Sender, @@ -301,6 +304,7 @@ impl PortHandler { } #[must_use] + #[instrument(skip(change_sender))] pub fn load_or_default( path: &Path, change_sender: tokio::sync::watch::Sender, @@ -346,6 +350,7 @@ impl PortHandler { }); } + #[instrument(skip(self, listener))] pub fn start_rejector(&mut self, port: Port, listener: TcpListener, packet: Packet) { info!(port, ?packet, "starting rejector"); @@ -356,6 +361,7 @@ impl PortHandler { } } + #[instrument(skip(self))] pub async fn stop_rejector(&mut self, port: Port) -> Option<(TcpListener, Packet)> { info!(port, "stopping rejector"); @@ -368,11 +374,11 @@ impl PortHandler { &mut self, port: Port, f: impl FnOnce(&mut Packet), - ) -> anyhow::Result<()> { + ) -> eyre::Result<()> { let (listener, mut packet) = self .stop_rejector(port) .await - .ok_or_else(|| anyhow!("tried to stop rejector that is not running"))?; + .ok_or_else(|| eyre!("tried to stop rejector that is not running"))?; f(&mut packet); @@ -396,6 +402,7 @@ impl Debug for Rejector { } impl Rejector { + #[instrument(skip(listener))] fn start(listener: TcpListener, packet: Packet) -> Self { let port = listener.local_addr().map(|addr| addr.port()).unwrap_or(0); let state = Arc::new((Mutex::new(listener), packet)); @@ -419,6 +426,7 @@ impl Rejector { Self { state, handle } } + #[instrument(skip(self))] async fn stop(self) -> (TcpListener, Packet) { self.handle.abort(); let _ = self.handle.await; @@ -428,6 +436,7 @@ impl Rejector { } impl PortHandler { + #[instrument(skip(self, config))] pub fn allocate_port_for_number(&mut self, config: &Config, number: Number) -> Option { let port = if let Some(port) = self.allocated_ports.get(&number) { let already_connected = self @@ -462,6 +471,7 @@ impl PortHandler { port } + #[instrument(skip(self, config))] fn try_recover_port(&mut self, config: &Config) -> Option { let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); @@ -522,6 +532,7 @@ impl PortHandler { None // TODO } + #[instrument(skip(self))] pub fn mark_port_error(&mut self, number: Number, port: Port) { warn!(port, number, "registering an error on"); self.register_update();