From 50fa67409cc6b8f3bd7e9976f15ab80f0b83329f Mon Sep 17 00:00:00 2001 From: soruh Date: Sun, 11 Jun 2023 02:39:01 +0200 Subject: [PATCH] refactors --- src/auth.rs | 10 +-- src/client.rs | 2 +- src/debug_server.rs | 175 -------------------------------------------- src/main.rs | 8 +- src/packets.rs | 50 ++++++++++--- src/ports.rs | 24 ++++-- web/index.html | 3 + web/main.css | 4 +- web/main.js | 8 +- 9 files changed, 76 insertions(+), 208 deletions(-) delete mode 100644 src/debug_server.rs diff --git a/src/auth.rs b/src/auth.rs index a44a80f..f887a40 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -64,19 +64,13 @@ pub async fn dyn_ip_update( })? .into()), PacketKind::Error => { - let first_zero = packet - .data - .iter() - .enumerate() - .find_map(|(i, x)| (*x == 0).then_some(i)); - return Err(eyre!( "{}", - std::str::from_utf8(first_zero.map_or(&packet.data, |i| &packet.data[..i]),)? + packet.as_string().unwrap_or("unknown dyn auth error") )); } - _ => return Err(eyre!("server returned unexpected packet")), + _ => return Err(eyre!("auth server returned unexpected packet")), }; debug!(?result, "finished dyn ip update"); diff --git a/src/client.rs b/src/client.rs index e145c68..8d06856 100644 --- a/src/client.rs +++ b/src/client.rs @@ -15,7 +15,7 @@ use tracing::{info, instrument, trace}; use crate::{ auth::dyn_ip_update, constants::{AUTH_TIMEOUT, CALL_ACK_TIMEOUT, CALL_TIMEOUT, PING_TIMEOUT, SEND_PING_INTERVAL}, - debug_server::peer_query, + http::peer_query, packets::{Header, Packet, PacketKind, RemConnect, REJECT_OOP, REJECT_TIMEOUT}, ports::{PortHandler, PortStatus}, Config, HandlerMetadata, diff --git a/src/debug_server.rs b/src/debug_server.rs deleted file mode 100644 index 50024e2..0000000 --- a/src/debug_server.rs +++ /dev/null @@ -1,175 +0,0 @@ -use futures::Future; -use tokio_stream::StreamExt; - -use hyper::rt::Executor; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Method, Response, Server, StatusCode}; -use std::convert::Infallible; -use std::net::SocketAddr; -use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; -use tokio::sync::Mutex; -use tokio_stream::wrappers::{IntervalStream, WatchStream}; -use tracing::error; - -use zerocopy::{AsBytes, FromBytes, LittleEndian, Unaligned}; - -use tracing::{debug, instrument}; - -use crate::constants::DEBUG_SERVER_PING_INTERVAL; -use crate::packets::{Header, Packet}; - -use crate::ports::PortHandler; -use crate::spawn; - -#[derive(Clone)] -struct NamedExecutor; -impl + Send + 'static> Executor for NamedExecutor { - fn execute(&self, fut: Fut) { - spawn("http worker", fut); - } -} - -pub async fn debug_server( - addr: SocketAddr, - port_handler: Arc>, - change_receiver: tokio::sync::watch::Receiver, -) { - let server = Server::bind(&addr) - .executor(NamedExecutor) - .serve(make_service_fn(move |_conn| { - let port_handler = port_handler.clone(); - let change_receiver = change_receiver.clone(); - async move { - Ok::<_, Infallible>(service_fn(move |req| { - let port_handler = port_handler.clone(); - let change_receiver = WatchStream::new(change_receiver.clone()); - async move { - match (req.method(), req.uri().path()) { - (&Method::GET, "/") => Ok(Response::new(Body::from(include_str!( - concat!(env!("OUT_DIR"), "/minified.html") - )))), - - (&Method::GET, "/data") => { - let res = Response::builder().header("Cache-Control", "no-store"); - - match serde_json::to_string(&*port_handler.lock().await) { - Ok(data) => res - .header("Content-Type", "application/json") - .body(Body::from(data)), - Err(err) => { - error!(%err, "failed to serialize data for debug server"); - res.status(StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::from("")) - } - } - } - - (&Method::GET, "/events") => Response::builder() - .status(StatusCode::OK) - .header("Cache-Control", "no-store") - .header("Content-Type", "text/event-stream") - .body(Body::wrap_stream({ - change_receiver - .map(|x| ("change", x)) - .merge( - IntervalStream::new(tokio::time::interval( - DEBUG_SERVER_PING_INTERVAL, - )) - .map(|x| ("ping", x.into_std())), - ) - .filter_map(|(kind, time)| { - let timestamp = (SystemTime::now() + time.elapsed()) - .duration_since(UNIX_EPOCH) - .ok()? - .as_secs(); - - Some(Ok::<_, Infallible>(format!( - "event:{kind}\ndata: {timestamp}\n\n" - ))) - }) - })), - _ => Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::empty()), - } - } - })) - } - })); - - if let Err(error) = server.await { - error!(%error, "debug server error"); - } -} - -type U16 = zerocopy::U16; -type U32 = zerocopy::U32; - -#[derive(AsBytes)] -#[repr(transparent)] -#[allow(dead_code)] -struct PeerQuery { - number: U32, -} - -#[derive(FromBytes, Unaligned, Debug)] -#[repr(packed)] -#[allow(dead_code)] -struct PeerReply { - number: U32, - name: [u8; 40], - flags: U16, - kind: u8, - hostname: [u8; 40], - ipaddress: [u8; 4], - port: U16, - extension: u8, - pin: U16, - timestamp: U32, -} - -#[instrument] -pub async fn peer_query(server: &SocketAddr, number: u32) -> eyre::Result> { - debug!(%number, "looking up"); - - let mut packet = Packet { - header: Header { - kind: 3, // Peer Query - length: 4, - }, - data: Vec::new(), - }; - - packet.data.clear(); - packet.data.resize(packet.header.length as usize, 0); - - PeerQuery { - number: number.into(), - } - .write_to(packet.data.as_mut_slice()) - .unwrap(); - - let mut socket = tokio::net::TcpStream::connect(server).await?; - - let (mut reader, mut writer) = socket.split(); - - packet.send(&mut writer).await?; - packet.recv_into(&mut reader).await?; - - Ok(if packet.kind().raw() == 5 { - // PeerReply - PeerReply::read_from(packet.data.as_slice()).and_then(|x| { - let i = x - .name - .iter() - .enumerate() - .find_map(|(i, c)| (*c == 0).then_some(i)) - .unwrap_or(x.name.len()); - - Some(std::str::from_utf8(&x.name[..i]).ok()?.to_owned()) - }) - } else { - None - }) -} diff --git a/src/main.rs b/src/main.rs index 1af5b6c..91d986a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,8 +10,8 @@ use std::{ time::Duration, }; -use debug_server::debug_server; use futures::Future; +use http::debug_server; use packets::{Header, Packet}; use serde::{Deserialize, Deserializer}; use time::format_description::OwnedFormatItem; @@ -31,7 +31,7 @@ pub mod auth; pub mod client; pub mod constants; #[cfg(feature = "debug_server")] -pub mod debug_server; +pub mod http; pub mod packets; pub mod ports; @@ -184,7 +184,7 @@ fn setup_tracing(config: &Config) { Level::ERROR => write!(writer, " {:>5} ", level.red())?, } - write!(writer, "{:23}{}", meta.target().dimmed(), ":".bold())?; + write!(writer, "{:18}{}", meta.target().dimmed(), ":".bold())?; /* if let Some(filename) = meta.file() { @@ -249,8 +249,8 @@ async fn connection_handler( Err(_) => Some("internal server error".to_owned()), Ok(Err(err)) => match err.downcast_ref::() { Some(io_error) if io_error.kind() == std::io::ErrorKind::UnexpectedEof => { - debug!(%addr, "Client dropped their connection"); // don't print an error on dropped connections + debug!(%addr, "Client dropped their connection"); None } _ => Some(err.to_string()), diff --git a/src/packets.rs b/src/packets.rs index f82b56c..0f8aac8 100644 --- a/src/packets.rs +++ b/src/packets.rs @@ -1,4 +1,4 @@ -use std::{ffi::CStr, fmt::Debug}; +use std::fmt::Debug; use bytemuck::{Pod, Zeroable}; use eyre::eyre; @@ -82,19 +82,40 @@ pub struct Packet { pub data: Vec, } +impl Packet { + #[must_use] + pub fn data(&self) -> &[u8] { + &self.data[..self.header.length as usize] + } + + #[must_use] + pub fn as_string(&self) -> Option<&str> { + let data = self.data(); + let nul = data.iter().enumerate().find(|(_i, c)| **c == 0); + + let data = if let Some((i, _)) = nul { + &data[..i] + } else { + data + }; + + std::str::from_utf8(data).ok() + } +} + impl Debug for Packet { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let data = &self.data; - let mut debugger = f.debug_struct("Packet"); debugger.field("kind", &PacketKind::from_u8(self.header.kind)); - let c_str = CStr::from_bytes_until_nul(data).ok(); - if let Some(str_data) = c_str.as_ref().and_then(|x| x.to_str().ok()) { - debugger.field("data", &str_data); - } else { - debugger.field("data", &data); + match self.as_string() { + Some(string) if string.chars().all(|c| !c.is_control()) => { + debugger.field("data", &string); + } + _ => { + debugger.field("data", &self.data()); + } } debugger.finish() @@ -132,7 +153,7 @@ impl Packet { pub async fn recv_into_cancelation_safe( &mut self, stream: &mut ReadHalf<'_>, - ) -> std::io::Result<()> { + ) -> eyre::Result<()> { // Makes sure all data is available before reading let header_bytes = bytemuck::bytes_of_mut(&mut self.header); stream.peek(header_bytes).await?; @@ -144,7 +165,7 @@ impl Packet { } #[allow(clippy::missing_errors_doc)] - pub async fn recv_into(&mut self, stream: &mut ReadHalf<'_>) -> std::io::Result<()> { + pub async fn recv_into(&mut self, stream: &mut ReadHalf<'_>) -> eyre::Result<()> { let header_bytes = bytemuck::bytes_of_mut(&mut self.header); stream.read_exact(header_bytes).await?; @@ -153,13 +174,20 @@ impl Packet { stream.read_exact(&mut self.data).await?; + if self.header.kind == PacketKind::Error.raw() { + return Err(eyre!( + "client reported error: {:?}", + self.as_string().unwrap_or("unknown dyn auth error") + )); + } + Ok(()) } #[allow(clippy::missing_errors_doc)] pub async fn send(&self, stream: &mut WriteHalf<'_>) -> std::io::Result<()> { stream.write_all(bytemuck::bytes_of(&self.header)).await?; - stream.write_all(&self.data).await?; + stream.write_all(self.data()).await?; Ok(()) } diff --git a/src/ports.rs b/src/ports.rs index 633d822..4c4c30e 100644 --- a/src/ports.rs +++ b/src/ports.rs @@ -21,7 +21,7 @@ use tracing::{debug, error, info, instrument, warn}; use crate::{ constants::{CACHE_STORE_INTERVAL, PORT_OWNERSHIP_TIMEOUT, PORT_RETRY_TIME}, - packets::Packet, + packets::{Packet, PacketKind}, spawn, Config, Number, Port, UnixTimestamp, }; @@ -35,7 +35,7 @@ pub struct PortHandler { pub change_sender: Option>, #[serde(skip_deserializing)] - port_guards: HashMap, + rejectors: HashMap, allowed_ports: AllowedList, @@ -176,7 +176,7 @@ impl PortHandler { let value_object = value.as_object_mut().unwrap(); - value_object.remove("port_guards").unwrap(); + value_object.remove("rejectors").unwrap(); value_object.remove("last_update").unwrap(); serde_json::to_writer(BufWriter::new(File::create(&temp_file)?), &value)?; @@ -248,7 +248,7 @@ impl PortHandler { let port_guard = Rejector::start(listener, packet); - if self.port_guards.insert(port, port_guard).is_some() { + if self.rejectors.insert(port, port_guard).is_some() { unreachable!("Tried to start rejector that is already running. This should have been impossible since it requires two listeners on the same port."); } } @@ -257,7 +257,7 @@ impl PortHandler { pub async fn stop_rejector(&mut self, port: Port) -> Option<(TcpListener, Packet)> { info!(port, "stopping rejector"); - Some(self.port_guards.remove(&port)?.stop().await) + Some(self.rejectors.remove(&port)?.stop().await) } /// # Errors @@ -290,7 +290,19 @@ impl Serialize for Rejector { where S: Serializer, { - self.state.1.serialize(serializer) + let packet = &self.state.1; + let kind = match packet.kind() { + PacketKind::End => "end", + PacketKind::Reject => "reject", + _ => unreachable!(), + }; + + match packet.as_string() { + Some(string) if string.chars().all(|c| !c.is_control()) => { + (kind, string).serialize(serializer) + } + _ => (kind, packet.data()).serialize(serializer), + } } } diff --git a/web/index.html b/web/index.html index 1f133b7..7023650 100644 --- a/web/index.html +++ b/web/index.html @@ -9,6 +9,9 @@ + +

+


     
 
 
diff --git a/web/main.css b/web/main.css
index 3fb8bab..48ebbfc 100644
--- a/web/main.css
+++ b/web/main.css
@@ -1,3 +1,3 @@
-.body {
-    background-color: gray;
+body {
+    background-color: #eee;
 }
\ No newline at end of file
diff --git a/web/main.js b/web/main.js
index 6bcbb22..bafe094 100644
--- a/web/main.js
+++ b/web/main.js
@@ -1,19 +1,25 @@
 window.onload = () => {
     const evtSource = new EventSource("/events");
+    const data = document.getElementById("data");
+    const last_update = document.getElementById("last_update");
     evtSource.addEventListener("change", event => {
         console.log(event);
 
+        last_update.innerText = `last update at ${new Date(+event.data * 1000)}`;
+
         const newElement = document.createElement("li");
         const eventList = document.getElementById("list");
         newElement.textContent = `change at ${+event.data}`;
         eventList.appendChild(newElement);
 
-        fetch("/data").then(res => res.json().then(res => console.log(res)));
+        fetch("/data").then(res => res.json().then(res => data.innerText = JSON.stringify(res, null, 1)));
     });
 
     evtSource.addEventListener("ping", event => {
         console.log(event);
 
+        last_update.innerText = `last update at ${new Date(+event.data * 1000)}`;
+
         const newElement = document.createElement("li");
         const eventList = document.getElementById("list");
         newElement.textContent = `ping at ${+event.data}`;