From af5c09060045c53bb785217dd9f40c1f7132348c Mon Sep 17 00:00:00 2001 From: soruh Date: Sun, 11 Jun 2023 02:40:39 +0200 Subject: [PATCH] refactors --- src/http.rs | 175 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 src/http.rs diff --git a/src/http.rs b/src/http.rs new file mode 100644 index 0000000..50024e2 --- /dev/null +++ b/src/http.rs @@ -0,0 +1,175 @@ +use futures::Future; +use tokio_stream::StreamExt; + +use hyper::rt::Executor; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Method, Response, Server, StatusCode}; +use std::convert::Infallible; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::sync::Mutex; +use tokio_stream::wrappers::{IntervalStream, WatchStream}; +use tracing::error; + +use zerocopy::{AsBytes, FromBytes, LittleEndian, Unaligned}; + +use tracing::{debug, instrument}; + +use crate::constants::DEBUG_SERVER_PING_INTERVAL; +use crate::packets::{Header, Packet}; + +use crate::ports::PortHandler; +use crate::spawn; + +#[derive(Clone)] +struct NamedExecutor; +impl + Send + 'static> Executor for NamedExecutor { + fn execute(&self, fut: Fut) { + spawn("http worker", fut); + } +} + +pub async fn debug_server( + addr: SocketAddr, + port_handler: Arc>, + change_receiver: tokio::sync::watch::Receiver, +) { + let server = Server::bind(&addr) + .executor(NamedExecutor) + .serve(make_service_fn(move |_conn| { + let port_handler = port_handler.clone(); + let change_receiver = change_receiver.clone(); + async move { + Ok::<_, Infallible>(service_fn(move |req| { + let port_handler = port_handler.clone(); + let change_receiver = WatchStream::new(change_receiver.clone()); + async move { + match (req.method(), req.uri().path()) { + (&Method::GET, "/") => Ok(Response::new(Body::from(include_str!( + concat!(env!("OUT_DIR"), "/minified.html") + )))), + + (&Method::GET, "/data") => { + let res = Response::builder().header("Cache-Control", "no-store"); + + match serde_json::to_string(&*port_handler.lock().await) { + Ok(data) => res + .header("Content-Type", "application/json") + .body(Body::from(data)), + Err(err) => { + error!(%err, "failed to serialize data for debug server"); + res.status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from("")) + } + } + } + + (&Method::GET, "/events") => Response::builder() + .status(StatusCode::OK) + .header("Cache-Control", "no-store") + .header("Content-Type", "text/event-stream") + .body(Body::wrap_stream({ + change_receiver + .map(|x| ("change", x)) + .merge( + IntervalStream::new(tokio::time::interval( + DEBUG_SERVER_PING_INTERVAL, + )) + .map(|x| ("ping", x.into_std())), + ) + .filter_map(|(kind, time)| { + let timestamp = (SystemTime::now() + time.elapsed()) + .duration_since(UNIX_EPOCH) + .ok()? + .as_secs(); + + Some(Ok::<_, Infallible>(format!( + "event:{kind}\ndata: {timestamp}\n\n" + ))) + }) + })), + _ => Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::empty()), + } + } + })) + } + })); + + if let Err(error) = server.await { + error!(%error, "debug server error"); + } +} + +type U16 = zerocopy::U16; +type U32 = zerocopy::U32; + +#[derive(AsBytes)] +#[repr(transparent)] +#[allow(dead_code)] +struct PeerQuery { + number: U32, +} + +#[derive(FromBytes, Unaligned, Debug)] +#[repr(packed)] +#[allow(dead_code)] +struct PeerReply { + number: U32, + name: [u8; 40], + flags: U16, + kind: u8, + hostname: [u8; 40], + ipaddress: [u8; 4], + port: U16, + extension: u8, + pin: U16, + timestamp: U32, +} + +#[instrument] +pub async fn peer_query(server: &SocketAddr, number: u32) -> eyre::Result> { + debug!(%number, "looking up"); + + let mut packet = Packet { + header: Header { + kind: 3, // Peer Query + length: 4, + }, + data: Vec::new(), + }; + + packet.data.clear(); + packet.data.resize(packet.header.length as usize, 0); + + PeerQuery { + number: number.into(), + } + .write_to(packet.data.as_mut_slice()) + .unwrap(); + + let mut socket = tokio::net::TcpStream::connect(server).await?; + + let (mut reader, mut writer) = socket.split(); + + packet.send(&mut writer).await?; + packet.recv_into(&mut reader).await?; + + Ok(if packet.kind().raw() == 5 { + // PeerReply + PeerReply::read_from(packet.data.as_slice()).and_then(|x| { + let i = x + .name + .iter() + .enumerate() + .find_map(|(i, c)| (*c == 0).then_some(i)) + .unwrap_or(x.name.len()); + + Some(std::str::from_utf8(&x.name[..i]).ok()?.to_owned()) + }) + } else { + None + }) +}