refactors
This commit is contained in:
parent
50fa67409c
commit
af5c090600
175
src/http.rs
Normal file
175
src/http.rs
Normal file
@ -0,0 +1,175 @@
|
|||||||
|
use futures::Future;
|
||||||
|
use tokio_stream::StreamExt;
|
||||||
|
|
||||||
|
use hyper::rt::Executor;
|
||||||
|
use hyper::service::{make_service_fn, service_fn};
|
||||||
|
use hyper::{Body, Method, Response, Server, StatusCode};
|
||||||
|
use std::convert::Infallible;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
use tokio_stream::wrappers::{IntervalStream, WatchStream};
|
||||||
|
use tracing::error;
|
||||||
|
|
||||||
|
use zerocopy::{AsBytes, FromBytes, LittleEndian, Unaligned};
|
||||||
|
|
||||||
|
use tracing::{debug, instrument};
|
||||||
|
|
||||||
|
use crate::constants::DEBUG_SERVER_PING_INTERVAL;
|
||||||
|
use crate::packets::{Header, Packet};
|
||||||
|
|
||||||
|
use crate::ports::PortHandler;
|
||||||
|
use crate::spawn;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct NamedExecutor;
|
||||||
|
impl<T: Send + 'static, Fut: Future<Output = T> + Send + 'static> Executor<Fut> for NamedExecutor {
|
||||||
|
fn execute(&self, fut: Fut) {
|
||||||
|
spawn("http worker", fut);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn debug_server(
|
||||||
|
addr: SocketAddr,
|
||||||
|
port_handler: Arc<Mutex<PortHandler>>,
|
||||||
|
change_receiver: tokio::sync::watch::Receiver<std::time::Instant>,
|
||||||
|
) {
|
||||||
|
let server = Server::bind(&addr)
|
||||||
|
.executor(NamedExecutor)
|
||||||
|
.serve(make_service_fn(move |_conn| {
|
||||||
|
let port_handler = port_handler.clone();
|
||||||
|
let change_receiver = change_receiver.clone();
|
||||||
|
async move {
|
||||||
|
Ok::<_, Infallible>(service_fn(move |req| {
|
||||||
|
let port_handler = port_handler.clone();
|
||||||
|
let change_receiver = WatchStream::new(change_receiver.clone());
|
||||||
|
async move {
|
||||||
|
match (req.method(), req.uri().path()) {
|
||||||
|
(&Method::GET, "/") => Ok(Response::new(Body::from(include_str!(
|
||||||
|
concat!(env!("OUT_DIR"), "/minified.html")
|
||||||
|
)))),
|
||||||
|
|
||||||
|
(&Method::GET, "/data") => {
|
||||||
|
let res = Response::builder().header("Cache-Control", "no-store");
|
||||||
|
|
||||||
|
match serde_json::to_string(&*port_handler.lock().await) {
|
||||||
|
Ok(data) => res
|
||||||
|
.header("Content-Type", "application/json")
|
||||||
|
.body(Body::from(data)),
|
||||||
|
Err(err) => {
|
||||||
|
error!(%err, "failed to serialize data for debug server");
|
||||||
|
res.status(StatusCode::INTERNAL_SERVER_ERROR)
|
||||||
|
.body(Body::from(""))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
(&Method::GET, "/events") => Response::builder()
|
||||||
|
.status(StatusCode::OK)
|
||||||
|
.header("Cache-Control", "no-store")
|
||||||
|
.header("Content-Type", "text/event-stream")
|
||||||
|
.body(Body::wrap_stream({
|
||||||
|
change_receiver
|
||||||
|
.map(|x| ("change", x))
|
||||||
|
.merge(
|
||||||
|
IntervalStream::new(tokio::time::interval(
|
||||||
|
DEBUG_SERVER_PING_INTERVAL,
|
||||||
|
))
|
||||||
|
.map(|x| ("ping", x.into_std())),
|
||||||
|
)
|
||||||
|
.filter_map(|(kind, time)| {
|
||||||
|
let timestamp = (SystemTime::now() + time.elapsed())
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.ok()?
|
||||||
|
.as_secs();
|
||||||
|
|
||||||
|
Some(Ok::<_, Infallible>(format!(
|
||||||
|
"event:{kind}\ndata: {timestamp}\n\n"
|
||||||
|
)))
|
||||||
|
})
|
||||||
|
})),
|
||||||
|
_ => Response::builder()
|
||||||
|
.status(StatusCode::NOT_FOUND)
|
||||||
|
.body(Body::empty()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
if let Err(error) = server.await {
|
||||||
|
error!(%error, "debug server error");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type U16 = zerocopy::U16<LittleEndian>;
|
||||||
|
type U32 = zerocopy::U32<LittleEndian>;
|
||||||
|
|
||||||
|
#[derive(AsBytes)]
|
||||||
|
#[repr(transparent)]
|
||||||
|
#[allow(dead_code)]
|
||||||
|
struct PeerQuery {
|
||||||
|
number: U32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(FromBytes, Unaligned, Debug)]
|
||||||
|
#[repr(packed)]
|
||||||
|
#[allow(dead_code)]
|
||||||
|
struct PeerReply {
|
||||||
|
number: U32,
|
||||||
|
name: [u8; 40],
|
||||||
|
flags: U16,
|
||||||
|
kind: u8,
|
||||||
|
hostname: [u8; 40],
|
||||||
|
ipaddress: [u8; 4],
|
||||||
|
port: U16,
|
||||||
|
extension: u8,
|
||||||
|
pin: U16,
|
||||||
|
timestamp: U32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument]
|
||||||
|
pub async fn peer_query(server: &SocketAddr, number: u32) -> eyre::Result<Option<String>> {
|
||||||
|
debug!(%number, "looking up");
|
||||||
|
|
||||||
|
let mut packet = Packet {
|
||||||
|
header: Header {
|
||||||
|
kind: 3, // Peer Query
|
||||||
|
length: 4,
|
||||||
|
},
|
||||||
|
data: Vec::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
packet.data.clear();
|
||||||
|
packet.data.resize(packet.header.length as usize, 0);
|
||||||
|
|
||||||
|
PeerQuery {
|
||||||
|
number: number.into(),
|
||||||
|
}
|
||||||
|
.write_to(packet.data.as_mut_slice())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut socket = tokio::net::TcpStream::connect(server).await?;
|
||||||
|
|
||||||
|
let (mut reader, mut writer) = socket.split();
|
||||||
|
|
||||||
|
packet.send(&mut writer).await?;
|
||||||
|
packet.recv_into(&mut reader).await?;
|
||||||
|
|
||||||
|
Ok(if packet.kind().raw() == 5 {
|
||||||
|
// PeerReply
|
||||||
|
PeerReply::read_from(packet.data.as_slice()).and_then(|x| {
|
||||||
|
let i = x
|
||||||
|
.name
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.find_map(|(i, c)| (*c == 0).then_some(i))
|
||||||
|
.unwrap_or(x.name.len());
|
||||||
|
|
||||||
|
Some(std::str::from_utf8(&x.name[..i]).ok()?.to_owned())
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user