start on new debug server
This commit is contained in:
parent
15672536f6
commit
1ae573dd76
6
Cargo.lock
generated
6
Cargo.lock
generated
@ -187,6 +187,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"time",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tracing",
|
||||
"tracing-error",
|
||||
"tracing-subscriber",
|
||||
@ -1048,13 +1049,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tokio-stream"
|
||||
version = "0.1.12"
|
||||
version = "0.1.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313"
|
||||
checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -6,9 +6,6 @@ edition = "2021"
|
||||
[profile.release]
|
||||
debug = true
|
||||
|
||||
[profile.dev.package.backtrace]
|
||||
opt-level = 3
|
||||
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
@ -18,7 +15,7 @@ time = { version = "0.3.20", features = ["local-offset", "macros"] }
|
||||
bytemuck = { version = "1.13.0", features = ["derive"] }
|
||||
serde = { version = "1.0.152", features = ["derive"] }
|
||||
serde_json = "1.0.91"
|
||||
hyper = { version = "0.14.24", optional = true, features = ["server", "http1", "tcp"] }
|
||||
hyper = { version = "0.14.24", optional = true, features = ["server", "http1", "tcp", "stream"] }
|
||||
futures = { version = "0.3.27", default-features = false, features = ["std"] }
|
||||
tracing = "0.1.37"
|
||||
tracing-subscriber = { version = "0.3.16", features = ["time"] }
|
||||
@ -28,6 +25,7 @@ eyre = "0.6.8"
|
||||
color-eyre = "0.6.2"
|
||||
tracing-error = "0.2.0"
|
||||
zerocopy = "0.6.1"
|
||||
tokio-stream = { version = "0.1.14", features = ["sync"] }
|
||||
|
||||
[features]
|
||||
default = ["debug_server"]
|
||||
|
@ -9,3 +9,5 @@ pub const PING_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
pub const SEND_PING_INTERVAL: Duration = Duration::from_secs(20);
|
||||
|
||||
pub const CACHE_STORE_INTERVAL: Duration = Duration::from_secs(5);
|
||||
|
||||
pub const DEBUG_SERVER_PING_INTERVAL: Duration = Duration::from_secs(5);
|
||||
|
@ -1,16 +1,22 @@
|
||||
use futures::Future;
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
use hyper::rt::Executor;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Response, Server};
|
||||
use hyper::{Body, Method, Response, Server, StatusCode};
|
||||
use std::convert::Infallible;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_stream::wrappers::{IntervalStream, WatchStream};
|
||||
use tracing::error;
|
||||
|
||||
use zerocopy::{AsBytes, FromBytes, LittleEndian, Unaligned};
|
||||
|
||||
use tracing::{debug, instrument};
|
||||
|
||||
use crate::constants::DEBUG_SERVER_PING_INTERVAL;
|
||||
use crate::packets::{Header, Packet};
|
||||
|
||||
use crate::ports::PortHandler;
|
||||
@ -24,24 +30,100 @@ impl<T: Send + 'static, Fut: Future<Output = T> + Send + 'static> Executor<Fut>
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn debug_server(addr: SocketAddr, port_handler: Arc<Mutex<PortHandler>>) {
|
||||
pub async fn debug_server(
|
||||
addr: SocketAddr,
|
||||
port_handler: Arc<Mutex<PortHandler>>,
|
||||
change_receiver: tokio::sync::watch::Receiver<std::time::Instant>,
|
||||
) {
|
||||
let server = Server::bind(&addr)
|
||||
.executor(NamedExecutor)
|
||||
.serve(make_service_fn(move |_conn| {
|
||||
let port_handler = port_handler.clone();
|
||||
let change_receiver = change_receiver.clone();
|
||||
async move {
|
||||
Ok::<_, Infallible>(service_fn(move |_req| {
|
||||
Ok::<_, Infallible>(service_fn(move |req| {
|
||||
let port_handler = port_handler.clone();
|
||||
let change_receiver = WatchStream::new(change_receiver.clone());
|
||||
async move {
|
||||
Ok::<_, Infallible>(Response::new(Body::from(
|
||||
port_handler.lock().await.status_string(),
|
||||
match (req.method(), req.uri().path()) {
|
||||
(&Method::GET, "/") => Ok(Response::new(Body::from(
|
||||
r#"
|
||||
<script>
|
||||
window.onload = () => {
|
||||
evtSource = new EventSource("/events");
|
||||
evtSource.addEventListener("change", event => {
|
||||
console.log(event);
|
||||
|
||||
const newElement = document.createElement("li");
|
||||
const eventList = document.getElementById("list");
|
||||
newElement.textContent = `change at ${+event.data}`;
|
||||
eventList.appendChild(newElement);
|
||||
});
|
||||
|
||||
evtSource.addEventListener("ping", event => {
|
||||
console.log(event);
|
||||
|
||||
const newElement = document.createElement("li");
|
||||
const eventList = document.getElementById("list");
|
||||
newElement.textContent = `ping at ${+event.data}`;
|
||||
eventList.appendChild(newElement);
|
||||
});
|
||||
};
|
||||
</script>
|
||||
|
||||
<list id="list" />
|
||||
|
||||
"#,
|
||||
))),
|
||||
|
||||
(&Method::GET, "/data") => {
|
||||
let res = Response::builder().header("Cache-Control", "no-store");
|
||||
|
||||
match serde_json::to_string(&*port_handler.lock().await) {
|
||||
Ok(data) => res
|
||||
.header("Content-Type", "application/json")
|
||||
.body(Body::from(data)),
|
||||
Err(err) => {
|
||||
error!(%err, "failed to serialize data for debug server");
|
||||
res.status(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
.body(Body::from(""))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(&Method::GET, "/events") => Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header("Cache-Control", "no-store")
|
||||
.header("Content-Type", "text/event-stream")
|
||||
.body(Body::wrap_stream({
|
||||
change_receiver
|
||||
.map(|x| ("change", x))
|
||||
.merge(
|
||||
IntervalStream::new(tokio::time::interval(
|
||||
DEBUG_SERVER_PING_INTERVAL,
|
||||
))
|
||||
.map(|x| ("ping", x.into_std())),
|
||||
)
|
||||
.filter_map(|(kind, time)| {
|
||||
let timestamp = (SystemTime::now() + time.elapsed())
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.ok()?
|
||||
.as_secs();
|
||||
|
||||
Some(Ok::<_, Infallible>(format!(
|
||||
"event:{kind}\ndata: {timestamp}\n\n"
|
||||
)))
|
||||
})
|
||||
})),
|
||||
_ => Response::builder()
|
||||
.status(StatusCode::NOT_FOUND)
|
||||
.body(Body::empty()),
|
||||
}
|
||||
}
|
||||
}))
|
||||
}
|
||||
}));
|
||||
|
||||
// Run this server for... forever!
|
||||
if let Err(error) = server.await {
|
||||
error!(%error, "debug server error");
|
||||
}
|
||||
|
@ -1,5 +1,4 @@
|
||||
#![warn(clippy::pedantic)]
|
||||
// #![allow(clippy::missing_errors_doc)]
|
||||
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
@ -20,7 +19,7 @@ use tokio::{
|
||||
io::AsyncWriteExt,
|
||||
net::{TcpListener, TcpStream},
|
||||
sync::Mutex,
|
||||
time::{sleep, Instant},
|
||||
time::sleep,
|
||||
};
|
||||
use tracing::{error, info, instrument, warn, Level};
|
||||
use tracing_subscriber::fmt::time::FormatTime;
|
||||
@ -323,7 +322,7 @@ async fn tokio_main(config: Arc<Config>) -> eyre::Result<()> {
|
||||
|
||||
let cache_path = PathBuf::from("cache.json");
|
||||
|
||||
let (change_sender, change_receiver) = tokio::sync::watch::channel(Instant::now());
|
||||
let (change_sender, change_receiver) = tokio::sync::watch::channel(std::time::Instant::now());
|
||||
|
||||
let mut port_handler = PortHandler::load_or_default(&cache_path, change_sender);
|
||||
port_handler.update_allowed_ports(&config.allowed_ports);
|
||||
@ -332,7 +331,7 @@ async fn tokio_main(config: Arc<Config>) -> eyre::Result<()> {
|
||||
|
||||
spawn(
|
||||
"cache daemon",
|
||||
cache_daemon(port_handler.clone(), cache_path, change_receiver),
|
||||
cache_daemon(port_handler.clone(), cache_path, change_receiver.clone()),
|
||||
);
|
||||
|
||||
#[cfg(feature = "debug_server")]
|
||||
@ -340,7 +339,7 @@ async fn tokio_main(config: Arc<Config>) -> eyre::Result<()> {
|
||||
warn!(%listen_addr, "debug server listening");
|
||||
spawn(
|
||||
"debug server",
|
||||
debug_server(listen_addr, port_handler.clone()),
|
||||
debug_server(listen_addr, port_handler.clone(), change_receiver),
|
||||
);
|
||||
}
|
||||
|
||||
|
153
src/ports.rs
153
src/ports.rs
@ -1,5 +1,4 @@
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
collections::{BTreeSet, HashMap, HashSet},
|
||||
fmt::{Debug, Display},
|
||||
fs::File,
|
||||
@ -23,16 +22,16 @@ use tracing::{debug, error, info, instrument, warn};
|
||||
use crate::{
|
||||
constants::{CACHE_STORE_INTERVAL, PORT_OWNERSHIP_TIMEOUT, PORT_RETRY_TIME},
|
||||
packets::Packet,
|
||||
spawn, Config, Number, Port, UnixTimestamp, TIME_FORMAT, TIME_ZONE_OFFSET,
|
||||
spawn, Config, Number, Port, UnixTimestamp,
|
||||
};
|
||||
|
||||
#[derive(Default, Serialize, Deserialize)]
|
||||
pub struct PortHandler {
|
||||
#[serde(skip)]
|
||||
pub last_update: Option<Instant>,
|
||||
pub last_update: Option<std::time::Instant>,
|
||||
|
||||
#[serde(skip)]
|
||||
pub change_sender: Option<tokio::sync::watch::Sender<Instant>>,
|
||||
pub change_sender: Option<tokio::sync::watch::Sender<std::time::Instant>>,
|
||||
|
||||
#[serde(skip)]
|
||||
port_guards: HashMap<Port, Rejector>,
|
||||
@ -55,7 +54,7 @@ pub struct PortHandler {
|
||||
pub async fn cache_daemon(
|
||||
port_handler: Arc<Mutex<PortHandler>>,
|
||||
cache_path: PathBuf,
|
||||
mut change_receiver: Receiver<Instant>,
|
||||
mut change_receiver: Receiver<std::time::Instant>,
|
||||
) {
|
||||
let mut last_store = Instant::now() - 2 * CACHE_STORE_INTERVAL;
|
||||
let mut change_timeout = None;
|
||||
@ -92,129 +91,6 @@ impl<T: Display> Debug for DisplayAsDebug<T> {
|
||||
}
|
||||
}
|
||||
|
||||
fn duration_string(duration: Duration) -> String {
|
||||
let seconds_elapsed = duration.as_secs();
|
||||
|
||||
let days = seconds_elapsed / (60 * 60 * 24);
|
||||
let hours = seconds_elapsed / (60 * 60) % 24;
|
||||
let minutes = (seconds_elapsed / 60) % 60;
|
||||
let seconds = seconds_elapsed % 60;
|
||||
|
||||
match (days > 0, hours > 0, minutes > 0) {
|
||||
(true, _, _) => format!("{days}d {hours}h {minutes}min {seconds}s"),
|
||||
(false, true, _) => format!("{hours}h {minutes}min {seconds}s"),
|
||||
(false, false, true) => format!("{minutes}min {seconds}s"),
|
||||
_ => format!("{duration:.0?}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn format_instant(instant: Instant) -> String {
|
||||
let when = duration_string(instant.elapsed()) + " ago";
|
||||
|
||||
(|| -> eyre::Result<_> {
|
||||
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)? - instant.elapsed();
|
||||
let date = time::OffsetDateTime::from_unix_timestamp(
|
||||
timestamp.as_secs().try_into().expect("timestamp overflow"),
|
||||
)?
|
||||
.to_offset(*TIME_ZONE_OFFSET.get().unwrap())
|
||||
.format(TIME_FORMAT.get().unwrap())?;
|
||||
|
||||
Ok(format!("{date} ({when})"))
|
||||
})()
|
||||
.unwrap_or(when)
|
||||
}
|
||||
|
||||
fn instant_from_timestamp(timestamp: UnixTimestamp) -> Instant {
|
||||
Instant::now() - UNIX_EPOCH.elapsed().unwrap() + Duration::from_secs(timestamp)
|
||||
}
|
||||
|
||||
#[cfg(feature = "debug_server")]
|
||||
impl Debug for PortHandler {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
const SHOW_N_FREE_PORTS: usize = 10;
|
||||
|
||||
let last_update = self
|
||||
.last_update
|
||||
.map(|last_update| Cow::from(format_instant(last_update)))
|
||||
.unwrap_or(Cow::from("?"));
|
||||
|
||||
let mut free_ports = self.free_ports.iter().copied().collect::<Vec<u16>>();
|
||||
|
||||
free_ports.sort_unstable();
|
||||
|
||||
let mut free_ports = free_ports
|
||||
.into_iter()
|
||||
.take(SHOW_N_FREE_PORTS)
|
||||
.map(|x| DisplayAsDebug(x.to_string()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if let Some(n_not_shown) = self.free_ports.len().checked_sub(SHOW_N_FREE_PORTS) {
|
||||
if n_not_shown > 0 {
|
||||
free_ports.push(DisplayAsDebug(format!("[{n_not_shown} more]")));
|
||||
}
|
||||
}
|
||||
|
||||
let errored_ports = self
|
||||
.errored_ports
|
||||
.iter()
|
||||
.rev()
|
||||
.map(|&(since, port)| {
|
||||
DisplayAsDebug(format!(
|
||||
"{port:5}: {}",
|
||||
format_instant(instant_from_timestamp(since))
|
||||
))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut allocated_ports = self
|
||||
.allocated_ports
|
||||
.iter()
|
||||
.map(|(&number, &port)| {
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
struct State<'n> {
|
||||
state: PortStatus,
|
||||
name: &'n str,
|
||||
number: u32,
|
||||
port: u16,
|
||||
last_change: DisplayAsDebug<String>,
|
||||
}
|
||||
|
||||
let state = &self.port_state[&port];
|
||||
|
||||
State {
|
||||
state: state.status,
|
||||
number,
|
||||
port,
|
||||
name: self.names.get(&number).map_or("?", |x| x.as_str()),
|
||||
last_change: DisplayAsDebug(format_instant(instant_from_timestamp(
|
||||
state.last_change,
|
||||
))),
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
allocated_ports.sort_unstable_by(|a, b| {
|
||||
a.state.cmp(&b.state).then(
|
||||
self.port_state[&a.port]
|
||||
.last_change
|
||||
.cmp(&self.port_state[&b.port].last_change)
|
||||
.reverse(),
|
||||
)
|
||||
});
|
||||
|
||||
writeln!(f, "last update: {last_update}")?;
|
||||
writeln!(f, "rejectors: {:#?}", self.port_guards)?;
|
||||
writeln!(f, "allowed ports: {:?}", self.allowed_ports.0)?;
|
||||
writeln!(f, "free ports: {free_ports:?}")?;
|
||||
|
||||
writeln!(f, "errored ports: {errored_ports:#?}")?;
|
||||
writeln!(f, "allocated ports: {allocated_ports:#?}")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Serialize, Deserialize)]
|
||||
pub struct PortState {
|
||||
last_change: UnixTimestamp,
|
||||
@ -222,18 +98,6 @@ pub struct PortState {
|
||||
status: PortStatus,
|
||||
}
|
||||
|
||||
impl Debug for PortState {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("PortState")
|
||||
.field(
|
||||
"last_change",
|
||||
&DisplayAsDebug(format_instant(instant_from_timestamp(self.last_change))),
|
||||
)
|
||||
.field("status", &self.status)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl PortState {
|
||||
pub fn new_state(&mut self, status: PortStatus) {
|
||||
self.last_change = SystemTime::now()
|
||||
@ -273,13 +137,8 @@ impl AllowedList {
|
||||
}
|
||||
|
||||
impl PortHandler {
|
||||
#[must_use]
|
||||
pub fn status_string(&self) -> String {
|
||||
format!("{self:#?}\n")
|
||||
}
|
||||
|
||||
pub fn register_update(&mut self) {
|
||||
let now = Instant::now();
|
||||
let now = std::time::Instant::now();
|
||||
self.last_update = Some(now);
|
||||
self.change_sender
|
||||
.as_ref()
|
||||
@ -310,7 +169,7 @@ impl PortHandler {
|
||||
#[instrument(skip(change_sender))]
|
||||
pub fn load_or_default(
|
||||
path: &Path,
|
||||
change_sender: tokio::sync::watch::Sender<Instant>,
|
||||
change_sender: tokio::sync::watch::Sender<std::time::Instant>,
|
||||
) -> Self {
|
||||
let mut this = Self::load(path).unwrap_or_else(|error| {
|
||||
error!(?path, %error, "failed to parse cache file");
|
||||
|
Loading…
Reference in New Issue
Block a user