use bytes::{Bytes, BytesMut}; use futures::Future; use http_body_util::combinators::UnsyncBoxBody; use http_body_util::BodyExt; use http_body_util::StreamBody; use hyper::body::{Body, Frame}; use hyper::header::{ACCEPT_ENCODING, CACHE_CONTROL, CONTENT_ENCODING, CONTENT_TYPE}; use hyper::rt::Executor; use hyper::service::service_fn; use hyper::{Method, Request, Response, StatusCode}; use std::io::Read; use std::net::SocketAddr; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::net::TcpListener; use tokio::sync::Mutex; use tokio_stream::wrappers::{IntervalStream, WatchStream}; use tokio_stream::StreamExt; use tracing::error; use tracing::{debug, instrument}; use zerocopy::{AsBytes, FromBytes, FromZeroes, LittleEndian, Unaligned}; use crate::constants::DEBUG_SERVER_PING_INTERVAL; use crate::packets::{Header, Packet}; use crate::ports::PortHandler; use crate::spawn; mod tokio_io { use std::{ pin::Pin, task::{Context, Poll}, }; #[derive(Debug)] #[pin_project::pin_project] pub struct TokioIo { #[pin] inner: T, } impl TokioIo { pub fn new(inner: T) -> Self { Self { inner } } } impl hyper::rt::Read for TokioIo where T: tokio::io::AsyncRead, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, mut buf: hyper::rt::ReadBufCursor<'_>, ) -> Poll> { let n = unsafe { let mut temp_buf = tokio::io::ReadBuf::uninit(buf.as_mut()); match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut temp_buf) { Poll::Ready(Ok(())) => temp_buf.filled().len(), other => return other, } }; unsafe { buf.advance(n); } Poll::Ready(Ok(())) } } impl hyper::rt::Write for TokioIo where T: tokio::io::AsyncWrite, { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf) } fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { tokio::io::AsyncWrite::poll_flush(self.project().inner, cx) } fn poll_shutdown( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx) } fn is_write_vectored(&self) -> bool { tokio::io::AsyncWrite::is_write_vectored(&self.inner) } fn poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[std::io::IoSlice<'_>], ) -> Poll> { tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) } } } use self::tokio_io::TokioIo; #[derive(Clone)] struct NamedExecutor; impl + Send + 'static> Executor for NamedExecutor { fn execute(&self, fut: Fut) { spawn("http worker", fut); } } const COMPRESSED_HTML: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/minified.html.gz")); type ResponseBody = UnsyncBoxBody; type RouteResponse = Result, hyper::http::Error>; fn full(buf: B) -> ResponseBody where http_body_util::Full: From + BodyExt + Body, D: Send + 'static, { http_body_util::Full::from(buf) .map_err(|_| unreachable!()) .boxed_unsync() } fn empty() -> ResponseBody { http_body_util::Empty::new() .map_err(|_| unreachable!()) .boxed_unsync() } fn index(req: &Request) -> RouteResponse { let response = Response::builder(); let accepts_gzip = req .headers() .get(ACCEPT_ENCODING) .map_or(false, |accept_encoding| { accept_encoding .as_bytes() .split(|x| *x == b',') .filter_map(|x| x.split(|x| *x == b';').next()) .filter_map(|x| std::str::from_utf8(x).ok()) .any(|x| x.trim() == "gzip") }); if accepts_gzip { response .header(CONTENT_ENCODING, "gzip") .header(CONTENT_TYPE, "text/html") .body(full(COMPRESSED_HTML)) } else { let (sender, mut receiver) = tokio::sync::mpsc::channel(1); spawn("gunzip task", async move { let mut decoder = flate2::bufread::GzDecoder::new(std::io::Cursor::new(COMPRESSED_HTML)); let mut done = false; while !done { let mut chunk = BytesMut::zeroed(256); let mut i = 0; loop { let dst = &mut chunk.as_bytes_mut()[i..]; if dst.is_empty() { break; // we are done } match decoder.read(dst) { Ok(n) => { if n == 0 { done = true; break; } i += n; } Err(err) => unreachable!("failed to read from gzip decode: {err}"), } } chunk.truncate(i); if sender.send(chunk.freeze()).await.is_err() { break; } } }); response.body( StreamBody::new(async_stream::stream! { while let Some(item) = receiver.recv().await { yield Ok(Frame::data(item)); } }) .boxed_unsync(), ) } } async fn data( _req: &Request, port_handler: &Mutex, ) -> RouteResponse { let res = Response::builder().header(CACHE_CONTROL, "no-store"); match serde_json::to_string(&*port_handler.lock().await) { Ok(data) => res .header(CONTENT_TYPE, "application/json") .body(full(data)), Err(err) => { error!(%err, "failed to serialize data for debug server"); res.status(StatusCode::INTERNAL_SERVER_ERROR).body(empty()) } } } fn events( _req: &Request, change_receiver: tokio::sync::watch::Receiver, ) -> RouteResponse { let stream = WatchStream::new(change_receiver) .map(|x| ("change", x)) .merge( IntervalStream::new(tokio::time::interval(DEBUG_SERVER_PING_INTERVAL)) .map(|x| ("ping", x.into_std())), ) .filter_map(|(kind, time)| { let timestamp = (SystemTime::now() + time.elapsed()) .duration_since(UNIX_EPOCH) .ok()? .as_secs(); Some(Ok::, _>(Frame::data(Bytes::from(format!( "event: {kind}\ndata: {timestamp}\n\n" ))))) }); Response::builder() .status(StatusCode::OK) .header(CACHE_CONTROL, "no-store") .header(CONTENT_TYPE, "text/event-stream") .body(StreamBody::new(stream).boxed_unsync()) } async fn debug_server_routes( req: Request, port_handler: Arc>, change_receiver: tokio::sync::watch::Receiver, ) -> RouteResponse { match (req.method(), req.uri().path()) { (&Method::GET, "/") => index(&req), (&Method::GET, "/data") => Ok(data(&req, &port_handler).await?), (&Method::GET, "/events") => events(&req, change_receiver), _ => Ok(Response::builder() .status(StatusCode::NOT_FOUND) .body(empty())?), } } pub async fn debug_server( addr: SocketAddr, port_handler: Arc>, change_receiver: tokio::sync::watch::Receiver, ) { let listener = TcpListener::bind(addr).await; let listener = match listener { Ok(listener) => listener, Err(error) => { error!(%error, %addr, "failed to bind debug server"); return; } }; loop { let conn = listener.accept().await; let stream = match conn { Ok((stream, _)) => stream, Err(error) => { error!(%error, "failed accept debug server connection"); continue; } }; let io = TokioIo::new(stream); let port_handler = port_handler.clone(); let change_receiver = change_receiver.clone(); tokio::task::spawn(async move { if let Err(error) = hyper::server::conn::http1::Builder::new() .serve_connection( io, service_fn(move |req| { debug_server_routes(req, port_handler.clone(), change_receiver.clone()) }), ) .await { if !error.is_incomplete_message() { error!(%error, "Failed to serve debug server connection"); } } }); } } type U16 = zerocopy::U16; type U32 = zerocopy::U32; #[derive(AsBytes)] #[repr(transparent)] #[allow(dead_code)] struct PeerQuery { number: U32, } #[derive(FromBytes, FromZeroes, Unaligned, Debug)] #[repr(packed)] #[allow(dead_code)] struct PeerReply { number: U32, name: [u8; 40], flags: U16, kind: u8, hostname: [u8; 40], ipaddress: [u8; 4], port: U16, extension: u8, pin: U16, timestamp: U32, } #[instrument] pub async fn peer_query(server: &SocketAddr, number: u32) -> eyre::Result> { debug!(%number, "looking up"); let mut packet = Packet { header: Header { kind: 3, // Peer Query length: 4, }, ..Default::default() }; packet.data.clear(); packet.data.resize(packet.header.length as usize, 0); PeerQuery { number: number.into(), } .write_to(packet.data.as_mut_slice()) .unwrap(); let mut socket = tokio::net::TcpStream::connect(server).await?; let (mut reader, mut writer) = socket.split(); packet.send(&mut writer).await?; packet.recv_into(&mut reader).await?; Ok(if packet.kind().raw() == 5 { // PeerReply PeerReply::read_from(packet.data.as_slice()).and_then(|x| { let i = x .name .iter() .enumerate() .find_map(|(i, c)| (*c == 0).then_some(i)) .unwrap_or(x.name.len()); Some(std::str::from_utf8(&x.name[..i]).ok()?.to_owned()) }) } else { None }) }