Compare commits
1 Commits
master
...
soruh-patc
Author | SHA1 | Date | |
---|---|---|---|
497c27248c |
@ -1,2 +1,3 @@
|
||||
[build]
|
||||
# rustflags = [ "--cfg", "tokio_unstable" ]
|
||||
rustflags = [ "--cfg", "tokio_unstable" ]
|
||||
|
||||
|
797
Cargo.lock
generated
797
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
47
Cargo.toml
47
Cargo.toml
@ -1,59 +1,34 @@
|
||||
[package]
|
||||
name = "centralex"
|
||||
version = "1.0.1"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[profile.release]
|
||||
strip = true
|
||||
lto = true
|
||||
codegen-units = 1
|
||||
debug = true
|
||||
|
||||
[profile.dev.package.backtrace]
|
||||
opt-level = 3
|
||||
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1.24.2", features = [
|
||||
"macros",
|
||||
"rt-multi-thread",
|
||||
"net",
|
||||
"io-util",
|
||||
"sync",
|
||||
"time",
|
||||
] }
|
||||
tokio = { version = "1.24.2", features = ["macros", "rt-multi-thread", "net", "io-util", "sync", "time"] }
|
||||
time = { version = "0.3.20", features = ["local-offset", "macros"] }
|
||||
bytemuck = { version = "1.13.0", features = ["derive"] }
|
||||
serde = { version = "1.0.152", features = ["derive"] }
|
||||
serde_json = "1.0.91"
|
||||
hyper = { version = "1.0.1", optional = true, features = ["server", "http1"] }
|
||||
hyper = { version = "0.14.24", optional = true, features = ["server", "http1", "tcp"] }
|
||||
futures = { version = "0.3.27", default-features = false, features = ["std"] }
|
||||
tracing = "0.1.37"
|
||||
tracing-subscriber = { version = "0.3.16", features = ["time"] }
|
||||
console-subscriber = { version = "0.2.0", optional = true }
|
||||
console-subscriber = { version = "0.1.8", optional = true }
|
||||
once_cell = "1.17.1"
|
||||
eyre = "0.6.8"
|
||||
color-eyre = "0.6.2"
|
||||
tracing-error = "0.2.0"
|
||||
zerocopy = { version = "0.7.26", features = ["derive"] }
|
||||
tokio-stream = { version = "0.1.14", features = ["sync"] }
|
||||
flate2 = { version = "1.0.26", optional = true }
|
||||
bytes = "1.4.0"
|
||||
smallvec = { version = "1.10.0", features = ["serde", "const_generics"] }
|
||||
http-body-util = "0.1.0"
|
||||
pin-project = "1.1.3"
|
||||
async-stream = "0.3.5"
|
||||
|
||||
[build-dependencies]
|
||||
minify-js = { version = "0.5.6", optional = true }
|
||||
minify-html = { version = "0.11.1", optional = true }
|
||||
css-minify = { version = "0.3.1", optional = true }
|
||||
flate2 = { version = "1.0.26", optional = true }
|
||||
|
||||
[features]
|
||||
default = ["debug_server"]
|
||||
debug_server = [
|
||||
"dep:hyper",
|
||||
"dep:minify-html",
|
||||
"dep:minify-js",
|
||||
"dep:css-minify",
|
||||
"dep:flate2",
|
||||
]
|
||||
default = ["debug_server", "tokio_console"]
|
||||
debug_server = ["dep:hyper"]
|
||||
tokio_console = ["dep:console-subscriber"]
|
||||
|
65
build.rs
65
build.rs
@ -1,65 +0,0 @@
|
||||
fn main() {
|
||||
#[cfg(feature = "debug_server")]
|
||||
pack_debug_page().unwrap();
|
||||
|
||||
println!("cargo:rerun-if-changed=web/main.js");
|
||||
println!("cargo:rerun-if-changed=web/index.html");
|
||||
println!("cargo:rerun-if-changed=web/main.css");
|
||||
println!("cargo:rerun-if-changed=web/connected.svg");
|
||||
}
|
||||
|
||||
#[cfg(feature = "debug_server")]
|
||||
fn pack_debug_page() -> Result<(), Box<dyn std::error::Error>> {
|
||||
use flate2::{write::GzEncoder, Compression};
|
||||
use std::io::Write;
|
||||
|
||||
use css_minify::optimizations::{Level, Minifier};
|
||||
|
||||
let js = std::fs::read_to_string("web/main.js").unwrap();
|
||||
let html = std::fs::read_to_string("web/index.html").unwrap();
|
||||
let css = std::fs::read_to_string("web/main.css").unwrap();
|
||||
let svg = std::fs::read_to_string("web/connected.svg").unwrap();
|
||||
|
||||
let mut out = Vec::new();
|
||||
minify_js::minify(
|
||||
&minify_js::Session::new(),
|
||||
minify_js::TopLevelMode::Global,
|
||||
js.as_bytes(),
|
||||
&mut out,
|
||||
)
|
||||
.unwrap();
|
||||
let js = std::str::from_utf8(&out)?;
|
||||
|
||||
let css = Minifier::default().minify(&css, Level::One).unwrap();
|
||||
|
||||
let (start, end) = html
|
||||
.split_once("<!--INSERT SVG HERE-->")
|
||||
.expect("did not find svg split point in html");
|
||||
|
||||
let html = format!("{start}{svg}{end}");
|
||||
|
||||
let (head, body) = html
|
||||
.split_once("<!--INSERT HEAD CONTENT HERE-->")
|
||||
.expect("did not find head split point in html");
|
||||
|
||||
let (body_a, body_b) = body
|
||||
.split_once("#INSERT VERSION HERE#")
|
||||
.expect("did not find version split point in html");
|
||||
|
||||
let version = env!("CARGO_PKG_VERSION");
|
||||
|
||||
let html = minify_html::minify(
|
||||
format!("{head}<style>{css}</style><script>{js}</script>{body_a}{version}{body_b}")
|
||||
.as_bytes(),
|
||||
&minify_html::Cfg::spec_compliant(),
|
||||
);
|
||||
|
||||
let mut encoder = GzEncoder::new(
|
||||
std::fs::File::create(std::env::var("OUT_DIR").unwrap() + "/minified.html.gz")?,
|
||||
Compression::best(),
|
||||
);
|
||||
|
||||
encoder.write_all(&html)?;
|
||||
|
||||
Ok(())
|
||||
}
|
18
install.sh
18
install.sh
@ -24,8 +24,8 @@ then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
user_name="itelex"
|
||||
install_dir="/home/itelex/"
|
||||
user_name="centralex"
|
||||
install_dir="/var/lib/centralex"
|
||||
service_file="/etc/systemd/system/centralex.service"
|
||||
|
||||
if [ $# -lt 1 ]; then
|
||||
@ -52,7 +52,7 @@ case "$step" in
|
||||
rm rustup.sh
|
||||
|
||||
echo "cloning source code..."
|
||||
git clone https://gitea.h.glsys.de/soruh/centralex centralex
|
||||
git clone https://github.com/soruh/centralex centralex
|
||||
|
||||
echo "creating default config..."
|
||||
cp centralex/config-template.json centralex/config.json
|
||||
@ -63,8 +63,8 @@ case "$step" in
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# echo "creating user $user_name..."
|
||||
# useradd -s /usr/sbin/nologin --create-home --home-dir "$install_dir" "$user_name"
|
||||
echo "creating user $user_name..."
|
||||
useradd -s /usr/sbin/nologin --create-home --home-dir "$install_dir" "$user_name"
|
||||
|
||||
echo "creating service file..."
|
||||
cat > "$service_file" << EOF
|
||||
@ -72,10 +72,10 @@ case "$step" in
|
||||
Description=Centralex
|
||||
|
||||
[Service]
|
||||
Environment=RUST_BACKTRACE=1
|
||||
Enviroment=RUST_BACKTRACE=1
|
||||
ExecStart=$install_dir/.cargo/bin/cargo run --release
|
||||
Type=simple
|
||||
User=$user_name
|
||||
User=centralex
|
||||
WorkingDirectory=$install_dir/centralex
|
||||
|
||||
[Install]
|
||||
@ -100,6 +100,4 @@ EOF
|
||||
;;
|
||||
esac
|
||||
|
||||
echo "success"
|
||||
|
||||
exit 0
|
||||
echo "installation complete"
|
||||
|
45
src/auth.rs
45
src/auth.rs
@ -2,21 +2,9 @@ use std::net::SocketAddr;
|
||||
|
||||
use eyre::eyre;
|
||||
use tracing::{debug, instrument};
|
||||
use zerocopy::{AsBytes, LittleEndian};
|
||||
|
||||
use crate::packets::{Header, Packet, PacketKind};
|
||||
|
||||
type U16 = zerocopy::U16<LittleEndian>;
|
||||
type U32 = zerocopy::U32<LittleEndian>;
|
||||
|
||||
#[derive(AsBytes)]
|
||||
#[repr(C)]
|
||||
struct DynIpUpdate {
|
||||
number: U32,
|
||||
pin: U16,
|
||||
port: U16,
|
||||
}
|
||||
|
||||
/// # Errors
|
||||
/// - the dyn ip server returns a malformed response or is unreachable
|
||||
/// - the authentication fails
|
||||
@ -34,43 +22,46 @@ pub async fn dyn_ip_update(
|
||||
kind: PacketKind::DynIpUpdate.raw(),
|
||||
length: 8,
|
||||
},
|
||||
..Default::default()
|
||||
data: Vec::new(),
|
||||
};
|
||||
|
||||
packet.data.resize(packet.header.length as usize, 0);
|
||||
|
||||
DynIpUpdate {
|
||||
number: number.into(),
|
||||
pin: pin.into(),
|
||||
port: port.into(),
|
||||
}
|
||||
.write_to(packet.data.as_mut_slice())
|
||||
.unwrap();
|
||||
packet.data.clear();
|
||||
packet.data.reserve(packet.header.length as usize);
|
||||
packet.data.extend_from_slice(&number.to_le_bytes());
|
||||
packet.data.extend_from_slice(&pin.to_le_bytes());
|
||||
packet.data.extend_from_slice(&port.to_le_bytes());
|
||||
|
||||
let mut socket = tokio::net::TcpStream::connect(server).await?;
|
||||
|
||||
let (mut reader, mut writer) = socket.split();
|
||||
|
||||
packet.send(&mut writer).await?;
|
||||
|
||||
packet.recv_into(&mut reader).await?;
|
||||
|
||||
let result = match packet.kind() {
|
||||
PacketKind::DynIpUpdateResponse => Ok(<[u8; 4]>::try_from(packet.data.as_slice())
|
||||
.map_err(|_| {
|
||||
PacketKind::DynIpUpdateResponse => Ok(<[u8; 4]>::try_from(packet.data)
|
||||
.map_err(|err| {
|
||||
eyre!(
|
||||
"too little data for ip address. Need 4 bytes got {}",
|
||||
packet.data.len()
|
||||
err.len()
|
||||
)
|
||||
})?
|
||||
.into()),
|
||||
PacketKind::Error => {
|
||||
let first_zero = packet
|
||||
.data
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find_map(|(i, x)| (*x == 0).then_some(i));
|
||||
|
||||
return Err(eyre!(
|
||||
"{}",
|
||||
packet.as_string().unwrap_or("unknown dyn auth error")
|
||||
std::str::from_utf8(first_zero.map_or(&packet.data, |i| &packet.data[..i]),)?
|
||||
));
|
||||
}
|
||||
|
||||
_ => return Err(eyre!("auth server returned unexpected packet")),
|
||||
_ => return Err(eyre!("server returned unexpected packet")),
|
||||
};
|
||||
|
||||
debug!(?result, "finished dyn ip update");
|
||||
|
@ -10,25 +10,16 @@ use tokio::{
|
||||
sync::Mutex,
|
||||
time::{sleep, timeout},
|
||||
};
|
||||
use tracing::{info, instrument, trace, warn};
|
||||
use tracing::{info, instrument, trace};
|
||||
|
||||
use crate::{
|
||||
auth::dyn_ip_update,
|
||||
constants::{AUTH_TIMEOUT, CALL_ACK_TIMEOUT, CALL_TIMEOUT, PING_TIMEOUT, SEND_PING_INTERVAL},
|
||||
http::peer_query,
|
||||
packets::{
|
||||
Header, Packet, PacketKind, RemConnect, REJECT_OOP, REJECT_TIMEOUT, REJECT_UNKNOWN_CLIENT,
|
||||
},
|
||||
packets::{Header, Packet, PacketKind, RemConnect, REJECT_OOP, REJECT_TIMEOUT},
|
||||
ports::{PortHandler, PortStatus},
|
||||
Config, HandlerMetadata,
|
||||
};
|
||||
|
||||
pub enum AuthResult {
|
||||
OutOfPorts,
|
||||
UnknownClient,
|
||||
Success { port: u16 },
|
||||
}
|
||||
|
||||
/// # Errors
|
||||
/// - the client authentication fails
|
||||
#[instrument(skip(config, port_handler, handler_metadata))]
|
||||
@ -38,16 +29,7 @@ async fn authenticate(
|
||||
handler_metadata: &mut HandlerMetadata,
|
||||
number: u32,
|
||||
pin: u16,
|
||||
) -> eyre::Result<AuthResult> {
|
||||
// TODO: do we want to cache this? If so, for how long?
|
||||
let name = peer_query(&config.dyn_ip_server, number).await?;
|
||||
|
||||
let Some(name) = name else {
|
||||
return Ok(AuthResult::UnknownClient);
|
||||
};
|
||||
|
||||
info!(%name, "found client");
|
||||
|
||||
) -> eyre::Result<Option<u16>> {
|
||||
let mut authenticated = false;
|
||||
loop {
|
||||
let mut updated_server = false;
|
||||
@ -58,11 +40,9 @@ async fn authenticate(
|
||||
.allocate_port_for_number(config, number);
|
||||
|
||||
let Some(port) = port else {
|
||||
return Ok(AuthResult::OutOfPorts);
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
handler_metadata.port = Some(port);
|
||||
|
||||
// make sure the client is authenticated before opening any ports
|
||||
if !authenticated {
|
||||
let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port).await?;
|
||||
@ -70,9 +50,9 @@ async fn authenticate(
|
||||
updated_server = true;
|
||||
}
|
||||
|
||||
let listener = if let Some((listener, _packet)) =
|
||||
port_handler.lock().await.stop_rejector(port).await
|
||||
{
|
||||
let mut port_handler = port_handler.lock().await;
|
||||
|
||||
let listener = if let Some((listener, _packet)) = port_handler.stop_rejector(port).await {
|
||||
Ok(listener)
|
||||
} else {
|
||||
TcpListener::bind((config.listen_addr.ip(), port)).await
|
||||
@ -90,20 +70,19 @@ async fn authenticate(
|
||||
let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port).await?;
|
||||
}
|
||||
|
||||
let mut port_handler = port_handler.lock().await;
|
||||
port_handler.register_update();
|
||||
|
||||
port_handler.names.insert(number, name);
|
||||
port_handler
|
||||
.port_state
|
||||
.entry(port)
|
||||
.or_default()
|
||||
.new_state(PortStatus::Idle);
|
||||
|
||||
break Ok(AuthResult::Success { port });
|
||||
handler_metadata.port = Some(port);
|
||||
|
||||
break Ok(Some(port));
|
||||
}
|
||||
|
||||
port_handler.lock().await.mark_port_error(number, port);
|
||||
port_handler.mark_port_error(number, port);
|
||||
}
|
||||
}
|
||||
|
||||
@ -161,12 +140,12 @@ async fn idle(
|
||||
break Ok(Some(IdleResult::Disconnect { packet }))
|
||||
}
|
||||
},
|
||||
() = sleep(send_next_ping_in) => {
|
||||
_ = sleep(send_next_ping_in) => {
|
||||
trace!("sending ping");
|
||||
writer.write_all(bytemuck::bytes_of(& Header { kind: PacketKind::Ping.raw(), length: 0 })).await?;
|
||||
last_ping_sent_at = Instant::now();
|
||||
}
|
||||
() = sleep(next_ping_expected_in) => {
|
||||
_ = sleep(next_ping_expected_in) => {
|
||||
|
||||
writer.write_all(REJECT_TIMEOUT).await?;
|
||||
break Ok(None);
|
||||
@ -286,7 +265,7 @@ async fn connect(
|
||||
client.set_nodelay(true)?;
|
||||
caller.set_nodelay(true)?;
|
||||
|
||||
_ = timeout(CALL_TIMEOUT, tokio::io::copy_bidirectional(client, caller)).await;
|
||||
let _ = timeout(CALL_TIMEOUT, tokio::io::copy_bidirectional(client, caller)).await;
|
||||
|
||||
{
|
||||
let mut port_handler = port_handler.lock().await;
|
||||
@ -332,33 +311,19 @@ pub async fn handler(
|
||||
|
||||
let mut packet = Packet::default();
|
||||
|
||||
let Ok(res) = timeout(AUTH_TIMEOUT, packet.recv_into_cancelation_safe(&mut reader)).await
|
||||
else {
|
||||
let Ok(res) = timeout(AUTH_TIMEOUT, packet.recv_into_cancelation_safe(&mut reader)).await else {
|
||||
writer.write_all(REJECT_TIMEOUT).await?;
|
||||
return Ok(());
|
||||
};
|
||||
res?;
|
||||
|
||||
if packet.kind() != PacketKind::RemConnect {
|
||||
let kind = packet.kind();
|
||||
warn!(%addr, ?kind, "client sent unexpected packet instead of RemConnect");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let RemConnect { number, pin } = packet.as_rem_connect()?;
|
||||
|
||||
handler_metadata.number = Some(number);
|
||||
|
||||
let port = match authenticate(config, port_handler, handler_metadata, number, pin).await? {
|
||||
AuthResult::OutOfPorts => {
|
||||
writer.write_all(REJECT_OOP).await?;
|
||||
return Ok(());
|
||||
}
|
||||
AuthResult::UnknownClient => {
|
||||
writer.write_all(REJECT_UNKNOWN_CLIENT).await?;
|
||||
return Ok(());
|
||||
}
|
||||
AuthResult::Success { port } => port,
|
||||
let Some(port) = authenticate(config, port_handler, handler_metadata, number, pin).await? else {
|
||||
writer.write_all(REJECT_OOP).await?;
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
info!(%addr, number, port, "authenticated");
|
||||
@ -374,21 +339,18 @@ pub async fn handler(
|
||||
packet.data.clear();
|
||||
packet.send(&mut writer).await?;
|
||||
|
||||
let Some(idle_result) = idle(listener, packet, &mut reader, &mut writer).await? else {
|
||||
let Some(idle_result) = idle(
|
||||
listener,
|
||||
packet,
|
||||
&mut reader,
|
||||
&mut writer,
|
||||
).await? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let Some((mut caller, mut packet)) = notify_or_disconnect(
|
||||
idle_result,
|
||||
handler_metadata,
|
||||
port_handler,
|
||||
port,
|
||||
&mut writer,
|
||||
)
|
||||
.await?
|
||||
else {
|
||||
let Some((mut caller, mut packet)) = notify_or_disconnect(idle_result, handler_metadata, port_handler, port, &mut writer).await? else {
|
||||
return Ok(());
|
||||
};
|
||||
};
|
||||
|
||||
let notify_at = Instant::now();
|
||||
|
||||
@ -399,9 +361,9 @@ pub async fn handler(
|
||||
);
|
||||
|
||||
let Ok(res) = recv.await else {
|
||||
writer.write_all(REJECT_TIMEOUT).await?;
|
||||
return Ok(());
|
||||
};
|
||||
writer.write_all(REJECT_TIMEOUT).await?;
|
||||
return Ok(());
|
||||
};
|
||||
res?;
|
||||
|
||||
match packet.kind() {
|
||||
|
@ -9,5 +9,3 @@ pub const PING_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
pub const SEND_PING_INTERVAL: Duration = Duration::from_secs(20);
|
||||
|
||||
pub const CACHE_STORE_INTERVAL: Duration = Duration::from_secs(5);
|
||||
|
||||
pub const DEBUG_SERVER_PING_INTERVAL: Duration = Duration::from_secs(5);
|
||||
|
43
src/debug_server.rs
Normal file
43
src/debug_server.rs
Normal file
@ -0,0 +1,43 @@
|
||||
use futures::Future;
|
||||
use hyper::rt::Executor;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Response, Server};
|
||||
use std::convert::Infallible;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::error;
|
||||
|
||||
use crate::ports::PortHandler;
|
||||
use crate::spawn;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct NamedExecutor;
|
||||
impl<T: Send + 'static, Fut: Future<Output = T> + Send + 'static> Executor<Fut> for NamedExecutor {
|
||||
fn execute(&self, fut: Fut) {
|
||||
spawn("http worker", fut);
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn debug_server(addr: SocketAddr, port_handler: Arc<Mutex<PortHandler>>) {
|
||||
let server = Server::bind(&addr)
|
||||
.executor(NamedExecutor)
|
||||
.serve(make_service_fn(move |_conn| {
|
||||
let port_handler = port_handler.clone();
|
||||
async move {
|
||||
Ok::<_, Infallible>(service_fn(move |_req| {
|
||||
let port_handler = port_handler.clone();
|
||||
async move {
|
||||
Ok::<_, Infallible>(Response::new(Body::from(
|
||||
port_handler.lock().await.status_string(),
|
||||
)))
|
||||
}
|
||||
}))
|
||||
}
|
||||
}));
|
||||
|
||||
// Run this server for... forever!
|
||||
if let Err(error) = server.await {
|
||||
error!(%error, "debug server error");
|
||||
}
|
||||
}
|
390
src/http.rs
390
src/http.rs
@ -1,390 +0,0 @@
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures::Future;
|
||||
use http_body_util::combinators::UnsyncBoxBody;
|
||||
use http_body_util::BodyExt;
|
||||
use http_body_util::StreamBody;
|
||||
use hyper::body::{Body, Frame};
|
||||
use hyper::header::{ACCEPT_ENCODING, CACHE_CONTROL, CONTENT_ENCODING, CONTENT_TYPE};
|
||||
use hyper::rt::Executor;
|
||||
use hyper::service::service_fn;
|
||||
use hyper::{Method, Request, Response, StatusCode};
|
||||
use std::io::Read;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_stream::wrappers::{IntervalStream, WatchStream};
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::error;
|
||||
use tracing::{debug, instrument};
|
||||
use zerocopy::{AsBytes, FromBytes, FromZeroes, LittleEndian, Unaligned};
|
||||
|
||||
use crate::constants::DEBUG_SERVER_PING_INTERVAL;
|
||||
use crate::packets::{Header, Packet};
|
||||
use crate::ports::PortHandler;
|
||||
use crate::spawn;
|
||||
|
||||
mod tokio_io {
|
||||
use std::{
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
#[pin_project::pin_project]
|
||||
pub struct TokioIo<T> {
|
||||
#[pin]
|
||||
inner: T,
|
||||
}
|
||||
|
||||
impl<T> TokioIo<T> {
|
||||
pub fn new(inner: T) -> Self {
|
||||
Self { inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> hyper::rt::Read for TokioIo<T>
|
||||
where
|
||||
T: tokio::io::AsyncRead,
|
||||
{
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
mut buf: hyper::rt::ReadBufCursor<'_>,
|
||||
) -> Poll<Result<(), std::io::Error>> {
|
||||
let n = unsafe {
|
||||
let mut temp_buf = tokio::io::ReadBuf::uninit(buf.as_mut());
|
||||
match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut temp_buf) {
|
||||
Poll::Ready(Ok(())) => temp_buf.filled().len(),
|
||||
other => return other,
|
||||
}
|
||||
};
|
||||
|
||||
unsafe {
|
||||
buf.advance(n);
|
||||
}
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> hyper::rt::Write for TokioIo<T>
|
||||
where
|
||||
T: tokio::io::AsyncWrite,
|
||||
{
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, std::io::Error>> {
|
||||
tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
|
||||
}
|
||||
|
||||
fn poll_flush(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), std::io::Error>> {
|
||||
tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
|
||||
}
|
||||
|
||||
fn poll_shutdown(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), std::io::Error>> {
|
||||
tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
|
||||
}
|
||||
|
||||
fn is_write_vectored(&self) -> bool {
|
||||
tokio::io::AsyncWrite::is_write_vectored(&self.inner)
|
||||
}
|
||||
|
||||
fn poll_write_vectored(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
bufs: &[std::io::IoSlice<'_>],
|
||||
) -> Poll<Result<usize, std::io::Error>> {
|
||||
tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs)
|
||||
}
|
||||
}
|
||||
}
|
||||
use self::tokio_io::TokioIo;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct NamedExecutor;
|
||||
impl<T: Send + 'static, Fut: Future<Output = T> + Send + 'static> Executor<Fut> for NamedExecutor {
|
||||
fn execute(&self, fut: Fut) {
|
||||
spawn("http worker", fut);
|
||||
}
|
||||
}
|
||||
|
||||
const COMPRESSED_HTML: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/minified.html.gz"));
|
||||
|
||||
type ResponseBody = UnsyncBoxBody<Bytes, hyper::Error>;
|
||||
type RouteResponse = Result<Response<ResponseBody>, hyper::http::Error>;
|
||||
|
||||
fn full<D, B>(buf: B) -> ResponseBody
|
||||
where
|
||||
http_body_util::Full<D>: From<B> + BodyExt + Body<Data = Bytes>,
|
||||
D: Send + 'static,
|
||||
{
|
||||
http_body_util::Full::from(buf)
|
||||
.map_err(|_| unreachable!())
|
||||
.boxed_unsync()
|
||||
}
|
||||
|
||||
fn empty() -> ResponseBody {
|
||||
http_body_util::Empty::new()
|
||||
.map_err(|_| unreachable!())
|
||||
.boxed_unsync()
|
||||
}
|
||||
|
||||
fn index(req: &Request<hyper::body::Incoming>) -> RouteResponse {
|
||||
let response = Response::builder();
|
||||
|
||||
let accepts_gzip = req
|
||||
.headers()
|
||||
.get(ACCEPT_ENCODING)
|
||||
.map_or(false, |accept_encoding| {
|
||||
accept_encoding
|
||||
.as_bytes()
|
||||
.split(|x| *x == b',')
|
||||
.filter_map(|x| x.split(|x| *x == b';').next())
|
||||
.filter_map(|x| std::str::from_utf8(x).ok())
|
||||
.any(|x| x.trim() == "gzip")
|
||||
});
|
||||
|
||||
if accepts_gzip {
|
||||
response
|
||||
.header(CONTENT_ENCODING, "gzip")
|
||||
.header(CONTENT_TYPE, "text/html")
|
||||
.body(full(COMPRESSED_HTML))
|
||||
} else {
|
||||
let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
spawn("gunzip task", async move {
|
||||
let mut decoder =
|
||||
flate2::bufread::GzDecoder::new(std::io::Cursor::new(COMPRESSED_HTML));
|
||||
|
||||
let mut done = false;
|
||||
while !done {
|
||||
let mut chunk = BytesMut::zeroed(256);
|
||||
|
||||
let mut i = 0;
|
||||
|
||||
loop {
|
||||
let dst = &mut chunk.as_bytes_mut()[i..];
|
||||
|
||||
if dst.is_empty() {
|
||||
break; // we are done
|
||||
}
|
||||
|
||||
match decoder.read(dst) {
|
||||
Ok(n) => {
|
||||
if n == 0 {
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
|
||||
i += n;
|
||||
}
|
||||
Err(err) => unreachable!("failed to read from gzip decode: {err}"),
|
||||
}
|
||||
}
|
||||
|
||||
chunk.truncate(i);
|
||||
|
||||
if sender.send(chunk.freeze()).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
response.body(
|
||||
StreamBody::new(async_stream::stream! {
|
||||
while let Some(item) = receiver.recv().await {
|
||||
yield Ok(Frame::data(item));
|
||||
}
|
||||
})
|
||||
.boxed_unsync(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
async fn data(
|
||||
_req: &Request<hyper::body::Incoming>,
|
||||
port_handler: &Mutex<PortHandler>,
|
||||
) -> RouteResponse {
|
||||
let res = Response::builder().header(CACHE_CONTROL, "no-store");
|
||||
|
||||
match serde_json::to_string(&*port_handler.lock().await) {
|
||||
Ok(data) => res
|
||||
.header(CONTENT_TYPE, "application/json")
|
||||
.body(full(data)),
|
||||
Err(err) => {
|
||||
error!(%err, "failed to serialize data for debug server");
|
||||
res.status(StatusCode::INTERNAL_SERVER_ERROR).body(empty())
|
||||
}
|
||||
}
|
||||
}
|
||||
fn events(
|
||||
_req: &Request<hyper::body::Incoming>,
|
||||
change_receiver: tokio::sync::watch::Receiver<std::time::Instant>,
|
||||
) -> RouteResponse {
|
||||
let stream = WatchStream::new(change_receiver)
|
||||
.map(|x| ("change", x))
|
||||
.merge(
|
||||
IntervalStream::new(tokio::time::interval(DEBUG_SERVER_PING_INTERVAL))
|
||||
.map(|x| ("ping", x.into_std())),
|
||||
)
|
||||
.filter_map(|(kind, time)| {
|
||||
let timestamp = (SystemTime::now() + time.elapsed())
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.ok()?
|
||||
.as_secs();
|
||||
|
||||
Some(Ok::<Frame<Bytes>, _>(Frame::data(Bytes::from(format!(
|
||||
"event: {kind}\ndata: {timestamp}\n\n"
|
||||
)))))
|
||||
});
|
||||
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(CACHE_CONTROL, "no-store")
|
||||
.header(CONTENT_TYPE, "text/event-stream")
|
||||
.body(StreamBody::new(stream).boxed_unsync())
|
||||
}
|
||||
|
||||
async fn debug_server_routes(
|
||||
req: Request<hyper::body::Incoming>,
|
||||
port_handler: Arc<Mutex<PortHandler>>,
|
||||
change_receiver: tokio::sync::watch::Receiver<std::time::Instant>,
|
||||
) -> RouteResponse {
|
||||
match (req.method(), req.uri().path()) {
|
||||
(&Method::GET, "/") => index(&req),
|
||||
(&Method::GET, "/data") => Ok(data(&req, &port_handler).await?),
|
||||
(&Method::GET, "/events") => events(&req, change_receiver),
|
||||
_ => Ok(Response::builder()
|
||||
.status(StatusCode::NOT_FOUND)
|
||||
.body(empty())?),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn debug_server(
|
||||
addr: SocketAddr,
|
||||
port_handler: Arc<Mutex<PortHandler>>,
|
||||
change_receiver: tokio::sync::watch::Receiver<std::time::Instant>,
|
||||
) {
|
||||
let listener = TcpListener::bind(addr).await;
|
||||
|
||||
let listener = match listener {
|
||||
Ok(listener) => listener,
|
||||
Err(error) => {
|
||||
error!(%error, %addr, "failed to bind debug server");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
loop {
|
||||
let conn = listener.accept().await;
|
||||
|
||||
let stream = match conn {
|
||||
Ok((stream, _)) => stream,
|
||||
Err(error) => {
|
||||
error!(%error, "failed accept debug server connection");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let io = TokioIo::new(stream);
|
||||
|
||||
let port_handler = port_handler.clone();
|
||||
let change_receiver = change_receiver.clone();
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
if let Err(error) = hyper::server::conn::http1::Builder::new()
|
||||
.serve_connection(
|
||||
io,
|
||||
service_fn(move |req| {
|
||||
debug_server_routes(req, port_handler.clone(), change_receiver.clone())
|
||||
}),
|
||||
)
|
||||
.await
|
||||
{
|
||||
if !error.is_incomplete_message() {
|
||||
error!(%error, "Failed to serve debug server connection");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
type U16 = zerocopy::U16<LittleEndian>;
|
||||
type U32 = zerocopy::U32<LittleEndian>;
|
||||
|
||||
#[derive(AsBytes)]
|
||||
#[repr(transparent)]
|
||||
#[allow(dead_code)]
|
||||
struct PeerQuery {
|
||||
number: U32,
|
||||
}
|
||||
|
||||
#[derive(FromBytes, FromZeroes, Unaligned, Debug)]
|
||||
#[repr(packed)]
|
||||
#[allow(dead_code)]
|
||||
struct PeerReply {
|
||||
number: U32,
|
||||
name: [u8; 40],
|
||||
flags: U16,
|
||||
kind: u8,
|
||||
hostname: [u8; 40],
|
||||
ipaddress: [u8; 4],
|
||||
port: U16,
|
||||
extension: u8,
|
||||
pin: U16,
|
||||
timestamp: U32,
|
||||
}
|
||||
|
||||
#[instrument]
|
||||
pub async fn peer_query(server: &SocketAddr, number: u32) -> eyre::Result<Option<String>> {
|
||||
debug!(%number, "looking up");
|
||||
|
||||
let mut packet = Packet {
|
||||
header: Header {
|
||||
kind: 3, // Peer Query
|
||||
length: 4,
|
||||
},
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
packet.data.clear();
|
||||
packet.data.resize(packet.header.length as usize, 0);
|
||||
|
||||
PeerQuery {
|
||||
number: number.into(),
|
||||
}
|
||||
.write_to(packet.data.as_mut_slice())
|
||||
.unwrap();
|
||||
|
||||
let mut socket = tokio::net::TcpStream::connect(server).await?;
|
||||
|
||||
let (mut reader, mut writer) = socket.split();
|
||||
|
||||
packet.send(&mut writer).await?;
|
||||
packet.recv_into(&mut reader).await?;
|
||||
|
||||
Ok(if packet.kind().raw() == 5 {
|
||||
// PeerReply
|
||||
PeerReply::read_from(packet.data.as_slice()).and_then(|x| {
|
||||
let i = x
|
||||
.name
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find_map(|(i, c)| (*c == 0).then_some(i))
|
||||
.unwrap_or(x.name.len());
|
||||
|
||||
Some(std::str::from_utf8(&x.name[..i]).ok()?.to_owned())
|
||||
})
|
||||
} else {
|
||||
None
|
||||
})
|
||||
}
|
76
src/main.rs
76
src/main.rs
@ -1,4 +1,5 @@
|
||||
#![warn(clippy::pedantic)]
|
||||
// #![allow(clippy::missing_errors_doc)]
|
||||
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
@ -10,19 +11,18 @@ use std::{
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use debug_server::debug_server;
|
||||
use futures::Future;
|
||||
use http::debug_server;
|
||||
use packets::{Header, Packet};
|
||||
use serde::{Deserialize, Deserializer};
|
||||
use smallvec::SmallVec;
|
||||
use time::format_description::OwnedFormatItem;
|
||||
use tokio::{
|
||||
io::AsyncWriteExt,
|
||||
net::{TcpListener, TcpStream},
|
||||
sync::Mutex,
|
||||
time::sleep,
|
||||
time::{sleep, Instant},
|
||||
};
|
||||
use tracing::{debug, error, info, instrument, warn, Level};
|
||||
use tracing::{error, info, instrument, warn, Level};
|
||||
use tracing_subscriber::fmt::time::FormatTime;
|
||||
|
||||
use crate::packets::PacketKind;
|
||||
@ -32,7 +32,7 @@ pub mod auth;
|
||||
pub mod client;
|
||||
pub mod constants;
|
||||
#[cfg(feature = "debug_server")]
|
||||
pub mod http;
|
||||
pub mod debug_server;
|
||||
pub mod packets;
|
||||
pub mod ports;
|
||||
|
||||
@ -185,7 +185,7 @@ fn setup_tracing(config: &Config) {
|
||||
Level::ERROR => write!(writer, " {:>5} ", level.red())?,
|
||||
}
|
||||
|
||||
write!(writer, "{:18}{}", meta.target().dimmed(), ":".bold())?;
|
||||
write!(writer, "{:17}{}", meta.target().dimmed(), ":".bold())?;
|
||||
|
||||
/*
|
||||
if let Some(filename) = meta.file() {
|
||||
@ -248,18 +248,8 @@ async fn connection_handler(
|
||||
|
||||
let error = match res {
|
||||
Err(_) => Some("internal server error".to_owned()),
|
||||
Ok(Err(err)) => match err.downcast_ref::<std::io::Error>() {
|
||||
Some(io_error) if io_error.kind() == std::io::ErrorKind::UnexpectedEof => {
|
||||
// don't print an error on dropped connections
|
||||
debug!(%addr, "Client dropped their connection");
|
||||
None
|
||||
}
|
||||
_ => Some(err.to_string()),
|
||||
},
|
||||
Ok(Ok(())) => {
|
||||
debug!(%addr, "finished handling client");
|
||||
None
|
||||
}
|
||||
Ok(Err(err)) => Some(err.to_string()),
|
||||
Ok(Ok(())) => None,
|
||||
};
|
||||
|
||||
if let Some(error) = error {
|
||||
@ -275,27 +265,18 @@ async fn connection_handler(
|
||||
length: packet.data.len().try_into().unwrap(), // this will never fail, as we just truncated the vector
|
||||
};
|
||||
|
||||
// Attempt to notify the client of the failure
|
||||
let (_, mut writer) = stream.split();
|
||||
_ = packet.send(&mut writer).await;
|
||||
let _ = packet.send(&mut writer).await;
|
||||
}
|
||||
|
||||
if let Some(port) = handler_metadata.port {
|
||||
let mut port_handler = port_handler.lock().await;
|
||||
|
||||
if let Some(port_state) = port_handler.port_state.get_mut(&port) {
|
||||
// the client is known. Mark is as disconnected
|
||||
port_state.new_state(PortStatus::Disconnected);
|
||||
} else {
|
||||
// the client is not known. Free its port for realloction
|
||||
assert!(
|
||||
port_handler.free_ports.insert(port),
|
||||
"tried to free up a port that was not allocted"
|
||||
);
|
||||
port_handler.register_update();
|
||||
}
|
||||
|
||||
port_handler.register_update();
|
||||
|
||||
if let Some(listener) = handler_metadata.listener.take() {
|
||||
port_handler.start_rejector(
|
||||
port,
|
||||
@ -305,14 +286,14 @@ async fn connection_handler(
|
||||
kind: PacketKind::Reject.raw(),
|
||||
length: 3,
|
||||
},
|
||||
data: SmallVec::from_slice(b"nc\0"),
|
||||
data: b"nc\0".to_vec(),
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
sleep(Duration::from_secs(2)).await;
|
||||
_ = stream.shutdown().await;
|
||||
sleep(Duration::from_secs(3)).await;
|
||||
let _ = stream.shutdown().await;
|
||||
}
|
||||
|
||||
fn main() -> eyre::Result<()> {
|
||||
@ -342,16 +323,16 @@ async fn tokio_main(config: Arc<Config>) -> eyre::Result<()> {
|
||||
|
||||
let cache_path = PathBuf::from("cache.json");
|
||||
|
||||
let (change_sender, change_receiver) = tokio::sync::watch::channel(std::time::Instant::now());
|
||||
let (change_sender, change_receiver) = tokio::sync::watch::channel(Instant::now());
|
||||
|
||||
let mut port_handler = PortHandler::load_or_default(&cache_path, change_sender);
|
||||
port_handler.update_allowed_ports(config.allowed_ports.clone());
|
||||
port_handler.update_allowed_ports(&config.allowed_ports);
|
||||
|
||||
let port_handler = Arc::new(Mutex::new(port_handler));
|
||||
|
||||
spawn(
|
||||
"cache daemon",
|
||||
cache_daemon(port_handler.clone(), cache_path, change_receiver.clone()),
|
||||
cache_daemon(port_handler.clone(), cache_path, change_receiver),
|
||||
);
|
||||
|
||||
#[cfg(feature = "debug_server")]
|
||||
@ -359,7 +340,7 @@ async fn tokio_main(config: Arc<Config>) -> eyre::Result<()> {
|
||||
warn!(%listen_addr, "debug server listening");
|
||||
spawn(
|
||||
"debug server",
|
||||
debug_server(listen_addr, port_handler.clone(), change_receiver),
|
||||
debug_server(listen_addr, port_handler.clone()),
|
||||
);
|
||||
}
|
||||
|
||||
@ -369,23 +350,16 @@ async fn tokio_main(config: Arc<Config>) -> eyre::Result<()> {
|
||||
"centralex server listening"
|
||||
);
|
||||
|
||||
loop {
|
||||
let connection = listener.accept().await;
|
||||
while let Ok((stream, addr)) = listener.accept().await {
|
||||
info!(%addr, "new connection");
|
||||
|
||||
match connection {
|
||||
Ok((stream, addr)) => {
|
||||
info!(%addr, "new connection");
|
||||
|
||||
spawn(
|
||||
&format!("connection to {addr}"),
|
||||
connection_handler(stream, addr, config.clone(), port_handler.clone()),
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
error!(%err, "failed to accept connection");
|
||||
}
|
||||
}
|
||||
spawn(
|
||||
&format!("connection to {addr}"),
|
||||
connection_handler(stream, addr, config.clone(), port_handler.clone()),
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
|
133
src/packets.rs
133
src/packets.rs
@ -1,8 +1,7 @@
|
||||
use std::fmt::{Debug, Display};
|
||||
use std::fmt::Debug;
|
||||
|
||||
use bytemuck::{Pod, Zeroable};
|
||||
use serde::Serialize;
|
||||
use smallvec::SmallVec;
|
||||
use eyre::eyre;
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
net::tcp::{ReadHalf, WriteHalf},
|
||||
@ -10,48 +9,6 @@ use tokio::{
|
||||
|
||||
pub const REJECT_OOP: &[u8; 6] = b"\x04\x04oop\x00";
|
||||
pub const REJECT_TIMEOUT: &[u8; 10] = b"\x04\x08timeout\x00";
|
||||
pub const REJECT_UNKNOWN_CLIENT: &[u8; 17] = b"\x04\x0funknown client\x00";
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
TooLittleData {
|
||||
expected: u8,
|
||||
got: u8,
|
||||
},
|
||||
Unexpected {
|
||||
expected: PacketKind,
|
||||
got: PacketKind,
|
||||
},
|
||||
Io(std::io::Error),
|
||||
Client(Option<String>),
|
||||
}
|
||||
|
||||
impl std::error::Error for Error {}
|
||||
impl Display for Error {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Error::TooLittleData { expected, got } => write!(
|
||||
f,
|
||||
"received too little data: expected {expected} bytes but received only {got}"
|
||||
),
|
||||
Error::Unexpected { expected, got } => write!(
|
||||
f,
|
||||
"received an unexpected packet: expected {expected:?} but received {got:?}",
|
||||
),
|
||||
Error::Io(inner) => Display::fmt(inner, f),
|
||||
Error::Client(Some(inner)) => write!(f, "client reported an error: {inner}"),
|
||||
Error::Client(None) => write!(f, "client reported a malformed error",),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for Error {
|
||||
fn from(value: std::io::Error) -> Self {
|
||||
Self::Io(value)
|
||||
}
|
||||
}
|
||||
|
||||
type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
#[repr(u8)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
@ -111,56 +68,35 @@ impl PacketKind {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Default, Clone, Copy, Pod, Zeroable)]
|
||||
#[derive(Default, Clone, Copy, Pod, Zeroable)]
|
||||
#[repr(C)]
|
||||
pub struct Header {
|
||||
pub kind: u8,
|
||||
pub length: u8,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Default, Clone)]
|
||||
#[derive(Default, Clone)]
|
||||
pub struct Packet {
|
||||
pub header: Header,
|
||||
pub data: SmallVec<[u8; 8]>,
|
||||
}
|
||||
|
||||
impl Packet {
|
||||
#[must_use]
|
||||
pub fn data(&self) -> &[u8] {
|
||||
&self.data[..self.header.length as usize]
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn as_string(&self) -> Option<&str> {
|
||||
let data = self.data();
|
||||
let nul = data.iter().enumerate().find(|(_i, c)| **c == 0);
|
||||
|
||||
let data = if let Some((i, _)) = nul {
|
||||
&data[..i]
|
||||
} else {
|
||||
data
|
||||
};
|
||||
|
||||
std::str::from_utf8(data).ok()
|
||||
}
|
||||
pub data: Vec<u8>,
|
||||
}
|
||||
|
||||
impl Debug for Packet {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let mut debugger = f.debug_struct("Packet");
|
||||
let data = &self.data;
|
||||
|
||||
debugger.field("kind", &PacketKind::from_u8(self.header.kind));
|
||||
let str_data = std::str::from_utf8(&data[..data.len().saturating_sub(1)]).ok();
|
||||
|
||||
match self.as_string() {
|
||||
Some(string) if string.chars().all(|c| !c.is_control()) => {
|
||||
debugger.field("data", &string);
|
||||
}
|
||||
_ => {
|
||||
debugger.field("data", &self.data());
|
||||
}
|
||||
}
|
||||
let data = if let Some(str_data) = str_data.as_ref() {
|
||||
str_data as &dyn Debug
|
||||
} else {
|
||||
&data as &dyn Debug
|
||||
};
|
||||
|
||||
debugger.finish()
|
||||
f.debug_struct("Packet")
|
||||
.field("kind", &PacketKind::from_u8(self.header.kind))
|
||||
.field("data", &data)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
@ -192,7 +128,10 @@ impl Packet {
|
||||
}
|
||||
|
||||
#[allow(clippy::missing_errors_doc)]
|
||||
pub async fn recv_into_cancelation_safe(&mut self, stream: &mut ReadHalf<'_>) -> Result<()> {
|
||||
pub async fn recv_into_cancelation_safe(
|
||||
&mut self,
|
||||
stream: &mut ReadHalf<'_>,
|
||||
) -> std::io::Result<()> {
|
||||
// Makes sure all data is available before reading
|
||||
let header_bytes = bytemuck::bytes_of_mut(&mut self.header);
|
||||
stream.peek(header_bytes).await?;
|
||||
@ -204,7 +143,7 @@ impl Packet {
|
||||
}
|
||||
|
||||
#[allow(clippy::missing_errors_doc)]
|
||||
pub async fn recv_into(&mut self, stream: &mut ReadHalf<'_>) -> Result<()> {
|
||||
pub async fn recv_into(&mut self, stream: &mut ReadHalf<'_>) -> std::io::Result<()> {
|
||||
let header_bytes = bytemuck::bytes_of_mut(&mut self.header);
|
||||
|
||||
stream.read_exact(header_bytes).await?;
|
||||
@ -213,17 +152,13 @@ impl Packet {
|
||||
|
||||
stream.read_exact(&mut self.data).await?;
|
||||
|
||||
if self.header.kind == PacketKind::Error.raw() {
|
||||
return Err(Error::Client(self.as_string().map(ToOwned::to_owned)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::missing_errors_doc)]
|
||||
pub async fn send(&self, stream: &mut WriteHalf<'_>) -> std::io::Result<()> {
|
||||
stream.write_all(bytemuck::bytes_of(&self.header)).await?;
|
||||
stream.write_all(self.data()).await?;
|
||||
stream.write_all(&self.data).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -234,26 +169,24 @@ impl Packet {
|
||||
|
||||
/// # Errors
|
||||
/// the packet must be a `RemConnect` packet and must contain at least 6 bytes of data
|
||||
#[allow(clippy::missing_panics_doc)]
|
||||
pub fn as_rem_connect(&self) -> Result<RemConnect> {
|
||||
pub fn as_rem_connect(&self) -> eyre::Result<RemConnect> {
|
||||
if self.kind() != PacketKind::RemConnect {
|
||||
return Err(Error::Unexpected {
|
||||
expected: PacketKind::RemConnect,
|
||||
got: self.kind(),
|
||||
});
|
||||
return Err(eyre!(
|
||||
"Unexpected Packet: {:?} expected RemConnect",
|
||||
self.kind()
|
||||
));
|
||||
}
|
||||
|
||||
if self.data.len() < 6 {
|
||||
#[allow(clippy::cast_possible_truncation)]
|
||||
return Err(Error::TooLittleData {
|
||||
expected: 6,
|
||||
got: self.data.len() as u8,
|
||||
});
|
||||
return Err(eyre!(
|
||||
"Too little data for RemConnect. Need at least 6 Bytes got {}",
|
||||
self.data.len()
|
||||
));
|
||||
}
|
||||
|
||||
Ok(RemConnect {
|
||||
number: u32::from_le_bytes(self.data[..4].try_into().unwrap()),
|
||||
pin: u16::from_le_bytes(self.data[4..6].try_into().unwrap()),
|
||||
number: u32::from_le_bytes(self.data[..4].try_into()?),
|
||||
pin: u16::from_le_bytes(self.data[4..6].try_into()?),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
314
src/ports.rs
314
src/ports.rs
@ -1,6 +1,7 @@
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
collections::{BTreeSet, HashMap, HashSet},
|
||||
fmt::Debug,
|
||||
fmt::{Debug, Display},
|
||||
fs::File,
|
||||
io::{BufReader, BufWriter},
|
||||
ops::RangeInclusive,
|
||||
@ -10,71 +11,47 @@ use std::{
|
||||
};
|
||||
|
||||
use eyre::eyre;
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::{
|
||||
io::AsyncWriteExt,
|
||||
net::TcpListener,
|
||||
sync::{watch::Receiver, Mutex},
|
||||
task::JoinHandle,
|
||||
time::{sleep, Instant},
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
|
||||
use crate::{
|
||||
constants::{CACHE_STORE_INTERVAL, PORT_OWNERSHIP_TIMEOUT, PORT_RETRY_TIME},
|
||||
packets::Packet,
|
||||
spawn, Config, Number, Port, UnixTimestamp,
|
||||
spawn, Config, Number, Port, UnixTimestamp, TIME_FORMAT, TIME_ZONE_OFFSET,
|
||||
};
|
||||
|
||||
#[derive(Default, Serialize, Deserialize)]
|
||||
pub struct PortHandler {
|
||||
#[serde(skip_deserializing)]
|
||||
#[serde(serialize_with = "serialize_last_update")]
|
||||
pub last_update: Option<std::time::Instant>,
|
||||
#[serde(skip)]
|
||||
pub last_update: Option<Instant>,
|
||||
|
||||
#[serde(skip)]
|
||||
pub change_sender: Option<tokio::sync::watch::Sender<std::time::Instant>>,
|
||||
pub change_sender: Option<tokio::sync::watch::Sender<Instant>>,
|
||||
|
||||
#[serde(skip_deserializing)]
|
||||
rejectors: HashMap<Port, Rejector>,
|
||||
#[serde(skip)]
|
||||
port_guards: HashMap<Port, Rejector>,
|
||||
|
||||
allowed_ports: AllowedList,
|
||||
|
||||
#[serde(skip)]
|
||||
pub free_ports: HashSet<Port>,
|
||||
|
||||
free_ports: HashSet<Port>,
|
||||
errored_ports: BTreeSet<(UnixTimestamp, Port)>,
|
||||
allocated_ports: HashMap<Number, Port>,
|
||||
|
||||
pub port_state: HashMap<Port, PortState>,
|
||||
|
||||
#[cfg(feature = "debug_server")]
|
||||
#[serde(default)]
|
||||
pub names: HashMap<Number, String>,
|
||||
}
|
||||
|
||||
#[allow(clippy::missing_errors_doc)]
|
||||
pub fn serialize_last_update<S: Serializer>(
|
||||
last_update: &Option<std::time::Instant>,
|
||||
serializer: S,
|
||||
) -> Result<S::Ok, S::Error> {
|
||||
last_update
|
||||
.and_then(|instant| {
|
||||
Some(
|
||||
(SystemTime::now() + instant.elapsed())
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.ok()?
|
||||
.as_secs(),
|
||||
)
|
||||
})
|
||||
.serialize(serializer)
|
||||
}
|
||||
|
||||
#[instrument(skip(port_handler, change_receiver))]
|
||||
pub async fn cache_daemon(
|
||||
port_handler: Arc<Mutex<PortHandler>>,
|
||||
cache_path: PathBuf,
|
||||
mut change_receiver: Receiver<std::time::Instant>,
|
||||
mut change_receiver: Receiver<Instant>,
|
||||
) {
|
||||
let mut last_store = Instant::now() - 2 * CACHE_STORE_INTERVAL;
|
||||
let mut change_timeout = None;
|
||||
@ -103,41 +80,163 @@ pub async fn cache_daemon(
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Hash, PartialEq, Eq)]
|
||||
struct DisplayAsDebug<T: Display>(T);
|
||||
impl<T: Display> Debug for DisplayAsDebug<T> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
fn duration_in_hours(duration: Duration) -> String {
|
||||
let seconds_elapsed = duration.as_secs();
|
||||
|
||||
let hours = seconds_elapsed / (60 * 60);
|
||||
let minutes = (seconds_elapsed / 60) % 60;
|
||||
let seconds = seconds_elapsed % 60;
|
||||
|
||||
match (hours > 0, minutes > 0) {
|
||||
(true, _) => format!("{hours}h {minutes}min {seconds}s"),
|
||||
(false, true) => format!("{minutes}min {seconds}s"),
|
||||
_ => format!("{duration:.0?}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn format_instant(instant: Instant) -> String {
|
||||
let when = duration_in_hours(instant.elapsed()) + " ago";
|
||||
|
||||
(|| -> eyre::Result<_> {
|
||||
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)? - instant.elapsed();
|
||||
let date = time::OffsetDateTime::from_unix_timestamp(
|
||||
timestamp.as_secs().try_into().expect("timestamp overflow"),
|
||||
)?
|
||||
.to_offset(*TIME_ZONE_OFFSET.get().unwrap())
|
||||
.format(TIME_FORMAT.get().unwrap())?;
|
||||
|
||||
Ok(format!("{date} ({when})"))
|
||||
})()
|
||||
.unwrap_or(when)
|
||||
}
|
||||
|
||||
fn instant_from_timestamp(timestamp: UnixTimestamp) -> Instant {
|
||||
Instant::now() - UNIX_EPOCH.elapsed().unwrap() + Duration::from_secs(timestamp)
|
||||
}
|
||||
|
||||
impl Debug for PortHandler {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
const SHOW_N_FREE_PORTS: usize = 10;
|
||||
|
||||
let last_update = self
|
||||
.last_update
|
||||
.map(|last_update| Cow::from(format_instant(last_update)))
|
||||
.unwrap_or(Cow::from("?"));
|
||||
|
||||
let mut free_ports = self.free_ports.iter().copied().collect::<Vec<u16>>();
|
||||
|
||||
free_ports.sort_unstable();
|
||||
|
||||
let mut free_ports = free_ports
|
||||
.into_iter()
|
||||
.take(SHOW_N_FREE_PORTS)
|
||||
.map(|x| DisplayAsDebug(x.to_string()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if let Some(n_not_shown) = self.free_ports.len().checked_sub(SHOW_N_FREE_PORTS) {
|
||||
if n_not_shown > 0 {
|
||||
free_ports.push(DisplayAsDebug(format!("[{n_not_shown} more]")));
|
||||
}
|
||||
}
|
||||
|
||||
let errored_ports = self
|
||||
.errored_ports
|
||||
.iter()
|
||||
.rev()
|
||||
.map(|&(since, port)| {
|
||||
DisplayAsDebug(format!(
|
||||
"{port:5}: {}",
|
||||
format_instant(instant_from_timestamp(since))
|
||||
))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut allocated_ports = self
|
||||
.allocated_ports
|
||||
.iter()
|
||||
.map(|(&number, &port)| {
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
struct State {
|
||||
state: PortStatus,
|
||||
number: u32,
|
||||
port: u16,
|
||||
last_change: DisplayAsDebug<String>,
|
||||
}
|
||||
|
||||
let state = &self.port_state[&port];
|
||||
|
||||
State {
|
||||
state: state.status,
|
||||
number,
|
||||
port,
|
||||
last_change: DisplayAsDebug(format_instant(instant_from_timestamp(
|
||||
state.last_change,
|
||||
))),
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
allocated_ports.sort_unstable_by(|a, b| {
|
||||
a.state.cmp(&b.state).then(
|
||||
self.port_state[&a.port]
|
||||
.last_change
|
||||
.cmp(&self.port_state[&b.port].last_change)
|
||||
.reverse(),
|
||||
)
|
||||
});
|
||||
|
||||
writeln!(f, "last update: {last_update}")?;
|
||||
writeln!(f, "rejectors: {:#?}", self.port_guards)?;
|
||||
writeln!(f, "allowed ports: {:?}", self.allowed_ports.0)?;
|
||||
writeln!(f, "free ports: {free_ports:?}")?;
|
||||
|
||||
writeln!(f, "errored ports: {errored_ports:#?}")?;
|
||||
writeln!(f, "allocated ports: {allocated_ports:#?}")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Serialize, Deserialize)]
|
||||
pub struct PortState {
|
||||
#[serde(deserialize_with = "deserialize_last_change")]
|
||||
last_change: UnixTimestamp,
|
||||
#[serde(skip_deserializing)]
|
||||
#[serde(skip)]
|
||||
status: PortStatus,
|
||||
}
|
||||
|
||||
fn deserialize_last_change<'de, D>(deserializer: D) -> Result<UnixTimestamp, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
Ok(match Option::<UnixTimestamp>::deserialize(deserializer)? {
|
||||
Some(timestamp) => timestamp,
|
||||
None => now(),
|
||||
})
|
||||
}
|
||||
|
||||
fn now() -> UnixTimestamp {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("timestamp overflow")
|
||||
.as_secs()
|
||||
impl Debug for PortState {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("PortState")
|
||||
.field(
|
||||
"last_change",
|
||||
&DisplayAsDebug(format_instant(instant_from_timestamp(self.last_change))),
|
||||
)
|
||||
.field("status", &self.status)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl PortState {
|
||||
pub fn new_state(&mut self, status: PortStatus) {
|
||||
self.last_change = now();
|
||||
self.last_change = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("timestamp overflow")
|
||||
.as_secs();
|
||||
|
||||
self.status = status;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, PartialOrd, Ord)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum PortStatus {
|
||||
InCall,
|
||||
Idle,
|
||||
@ -165,14 +264,17 @@ impl AllowedList {
|
||||
}
|
||||
|
||||
impl PortHandler {
|
||||
/// # Panics
|
||||
/// If the the `change_sender` could not be notified
|
||||
#[must_use]
|
||||
pub fn status_string(&self) -> String {
|
||||
format!("{self:#?}\n")
|
||||
}
|
||||
|
||||
pub fn register_update(&mut self) {
|
||||
let now = std::time::Instant::now();
|
||||
let now = Instant::now();
|
||||
self.last_update = Some(now);
|
||||
self.change_sender
|
||||
.as_ref()
|
||||
.expect("PortHandler is missing its change_sender")
|
||||
.expect("PortHandler is missing it's change_sender")
|
||||
.send(now)
|
||||
.expect("failed to notify cache writer");
|
||||
}
|
||||
@ -181,65 +283,42 @@ impl PortHandler {
|
||||
#[instrument(skip(self))]
|
||||
pub fn store(&self, cache: &Path) -> std::io::Result<()> {
|
||||
debug!("storing cache");
|
||||
let temp_file = cache.with_extension("temp");
|
||||
let temp_file = cache.with_extension(".temp");
|
||||
|
||||
let mut value = serde_json::to_value(self)?;
|
||||
|
||||
let value_object = value.as_object_mut().unwrap();
|
||||
|
||||
value_object.remove("rejectors").unwrap();
|
||||
value_object.remove("last_update").unwrap();
|
||||
|
||||
value_object
|
||||
.get_mut("port_state")
|
||||
.unwrap()
|
||||
.as_object_mut()
|
||||
.unwrap()
|
||||
.values_mut()
|
||||
.for_each(|value| {
|
||||
let value_object = value.as_object_mut().unwrap();
|
||||
|
||||
// it does not make sense to store when the did anything else other than disconnect
|
||||
// because when we restart the server it will no longer be connected
|
||||
if value_object.get("status").unwrap().as_str().unwrap() != "disconnected" {
|
||||
*value_object.get_mut("last_change").unwrap() = serde_json::Value::Null;
|
||||
}
|
||||
|
||||
value_object.remove("status").unwrap();
|
||||
});
|
||||
|
||||
serde_json::to_writer(BufWriter::new(File::create(&temp_file)?), &value)?;
|
||||
serde_json::to_writer(BufWriter::new(File::create(&temp_file)?), self)?;
|
||||
std::fs::rename(temp_file, cache)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::missing_errors_doc)]
|
||||
pub fn load(cache: &Path) -> std::io::Result<Self> {
|
||||
#[instrument(skip(change_sender))]
|
||||
pub fn load(
|
||||
cache: &Path,
|
||||
change_sender: tokio::sync::watch::Sender<Instant>,
|
||||
) -> std::io::Result<Self> {
|
||||
info!("loading cache");
|
||||
Ok(serde_json::from_reader(BufReader::new(File::open(cache)?))?)
|
||||
let mut cache: Self = serde_json::from_reader(BufReader::new(File::open(cache)?))?;
|
||||
cache.change_sender = Some(change_sender);
|
||||
Ok(cache)
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
#[instrument(skip(change_sender))]
|
||||
pub fn load_or_default(
|
||||
path: &Path,
|
||||
change_sender: tokio::sync::watch::Sender<std::time::Instant>,
|
||||
change_sender: tokio::sync::watch::Sender<Instant>,
|
||||
) -> Self {
|
||||
let mut this = Self::load(path).unwrap_or_else(|error| {
|
||||
Self::load(path, change_sender).unwrap_or_else(|error| {
|
||||
error!(?path, %error, "failed to parse cache file");
|
||||
Self::default()
|
||||
});
|
||||
|
||||
this.change_sender = Some(change_sender);
|
||||
|
||||
this
|
||||
})
|
||||
}
|
||||
|
||||
pub fn update_allowed_ports(&mut self, allowed_ports: AllowedList) {
|
||||
pub fn update_allowed_ports(&mut self, allowed_ports: &AllowedList) {
|
||||
self.register_update();
|
||||
|
||||
self.allowed_ports = allowed_ports;
|
||||
self.allowed_ports = allowed_ports.clone();
|
||||
|
||||
self.free_ports.clear(); // remove all ports
|
||||
self.free_ports
|
||||
@ -277,7 +356,7 @@ impl PortHandler {
|
||||
|
||||
let port_guard = Rejector::start(listener, packet);
|
||||
|
||||
if self.rejectors.insert(port, port_guard).is_some() {
|
||||
if self.port_guards.insert(port, port_guard).is_some() {
|
||||
unreachable!("Tried to start rejector that is already running. This should have been impossible since it requires two listeners on the same port.");
|
||||
}
|
||||
}
|
||||
@ -286,7 +365,7 @@ impl PortHandler {
|
||||
pub async fn stop_rejector(&mut self, port: Port) -> Option<(TcpListener, Packet)> {
|
||||
info!(port, "stopping rejector");
|
||||
|
||||
Some(self.rejectors.remove(&port)?.stop().await)
|
||||
Some(self.port_guards.remove(&port)?.stop().await)
|
||||
}
|
||||
|
||||
/// # Errors
|
||||
@ -314,25 +393,11 @@ struct Rejector {
|
||||
handle: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl Serialize for Rejector {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let packet = &self.state.1;
|
||||
|
||||
match packet.as_string() {
|
||||
Some(string) if string.chars().all(|c| !c.is_control()) => string.serialize(serializer),
|
||||
_ => packet.data().serialize(serializer),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for Rejector {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Rejector")
|
||||
.field("message", &self.state.1)
|
||||
.finish_non_exhaustive()
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
@ -348,22 +413,12 @@ impl Rejector {
|
||||
spawn(&format!("rejector for port {port}",), async move {
|
||||
let (listener, packet) = state.as_ref();
|
||||
|
||||
let packet = Arc::new(packet.clone());
|
||||
|
||||
let listener = listener.lock().await;
|
||||
|
||||
loop {
|
||||
if let Ok((mut socket, _)) = listener.accept().await {
|
||||
let packet = packet.clone();
|
||||
tokio::task::spawn(async move {
|
||||
let (_, mut writer) = socket.split();
|
||||
_ = packet.send(&mut writer).await;
|
||||
|
||||
// wait two seconds before closing the connection
|
||||
// to accomodate bad itelex tpc stacks
|
||||
sleep(Duration::from_secs(2)).await;
|
||||
_ = socket.shutdown().await;
|
||||
});
|
||||
let (_, mut writer) = socket.split();
|
||||
let _ = packet.send(&mut writer).await;
|
||||
}
|
||||
}
|
||||
})
|
||||
@ -374,7 +429,7 @@ impl Rejector {
|
||||
#[instrument(skip(self))]
|
||||
async fn stop(self) -> (TcpListener, Packet) {
|
||||
self.handle.abort();
|
||||
_ = self.handle.await;
|
||||
let _ = self.handle.await;
|
||||
let (listener, packet) = Arc::try_unwrap(self.state).unwrap();
|
||||
(listener.into_inner(), packet)
|
||||
}
|
||||
@ -410,7 +465,7 @@ impl PortHandler {
|
||||
};
|
||||
|
||||
if let Some(port) = port {
|
||||
info!(port, "allocated");
|
||||
info!(port, "allocated port");
|
||||
}
|
||||
|
||||
port
|
||||
@ -471,13 +526,10 @@ impl PortHandler {
|
||||
self.register_update();
|
||||
info!(port, old_number, "reused port");
|
||||
assert!(self.allocated_ports.remove(&old_number).is_some());
|
||||
#[cfg(feature = "debug_server")]
|
||||
self.names.remove(&old_number);
|
||||
|
||||
return Some(port);
|
||||
}
|
||||
|
||||
None // TODO: are there more ways?
|
||||
None // TODO
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
|
@ -1 +0,0 @@
|
||||
<svg version="1.1" viewBox="0 0 60.59 60.734" xmlns="http://www.w3.org/2000/svg"><g transform="translate(-38.21,-51.32)"><path d="m98.8 55.1-6.52 6.52-3.78-3.78 6.52-6.52zm-7.49 20.5-8.5 8.5-16.8-16.8 8.5-8.5zm-16.9 0.0741a11.9 11.9 45 0 1 0.0256-16.8 11.9 11.9 45 0 1 16.8-0.0256 11.9 11.9 45 0 1-0.0256 16.8 11.9 11.9 45 0 1-16.8 0.0256zm-36.2 32.6 6.52-6.52 3.78 3.78-6.52 6.52zm19.7-25.3 8.39-8.39 1.88 1.88-8.39 8.39zm7.46 7.5 8.39-8.39 1.88 1.88-8.39 8.39zm-19.7-2.72 8.5-8.5 16.8 16.8-8.5 8.5zm16.9-0.0741a11.9 11.9 45 0 1-0.0256 16.8 11.9 11.9 45 0 1-16.8 0.0256 11.9 11.9 45 0 1 0.0256-16.8 11.9 11.9 45 0 1 16.8-0.0256z"/></g></svg>
|
Before Width: | Height: | Size: 643 B |
@ -1,33 +0,0 @@
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
|
||||
<head>
|
||||
<link rel="icon" href="data:,">
|
||||
<meta charset='utf-8'>
|
||||
<meta name='viewport' content='width=device-width, initial-scale=1'>
|
||||
<title>Centralex State</title>
|
||||
<!--INSERT HEAD CONTENT HERE-->
|
||||
</head>
|
||||
|
||||
<body>
|
||||
<div class="content">
|
||||
|
||||
<div id="header">
|
||||
<p id="free_ports" />
|
||||
<div id="connected" class="hidden"><!--INSERT SVG HERE--></div>
|
||||
<p id="last_ping" title="v#INSERT VERSION HERE#" />
|
||||
</div>
|
||||
|
||||
<table id="table">
|
||||
<tr>
|
||||
<th onclick="sort(this,'name')">Name</th>
|
||||
<th onclick="sort(this,'number')">Nummer</th>
|
||||
<th onclick="sort(this,'port')">Port</th>
|
||||
<th onclick="sort(this,'status')">Zustand</th>
|
||||
<th onclick="sort(this,'last_change')">Seit</th>
|
||||
</tr>
|
||||
</table>
|
||||
</div>
|
||||
</body>
|
||||
|
||||
</html>
|
90
web/main.css
90
web/main.css
@ -1,90 +0,0 @@
|
||||
body {
|
||||
background-color: #eee;
|
||||
}
|
||||
|
||||
.last_change {
|
||||
text-align: right;
|
||||
}
|
||||
|
||||
.last_change .unit {
|
||||
padding-left: 1ch;
|
||||
min-width: 8ch;
|
||||
display: inline-block;
|
||||
text-align: left;
|
||||
}
|
||||
|
||||
#last_ping,
|
||||
#connected,
|
||||
#free_ports {
|
||||
margin-top: 0;
|
||||
margin-bottom: 0.25em;
|
||||
}
|
||||
|
||||
#connected svg {
|
||||
height: 1em;
|
||||
}
|
||||
|
||||
#header {
|
||||
display: flex;
|
||||
flex-direction: row;
|
||||
grid-template-columns: auto 1fr;
|
||||
justify-content: space-between;
|
||||
}
|
||||
|
||||
td,
|
||||
th {
|
||||
border: 1px solid black;
|
||||
padding: 0.5em;
|
||||
font-family: monospace;
|
||||
}
|
||||
|
||||
table {
|
||||
border-spacing: 0;
|
||||
max-width: 100%;
|
||||
margin: 0 auto;
|
||||
}
|
||||
|
||||
th {
|
||||
cursor: pointer;
|
||||
user-select: none;
|
||||
font-size: inherit;
|
||||
}
|
||||
|
||||
th::after {
|
||||
font-family: monospace;
|
||||
padding-left: 3px;
|
||||
}
|
||||
|
||||
th:not(.sort)::after {
|
||||
content: "\a0";
|
||||
}
|
||||
|
||||
.sort-up::after {
|
||||
content: "\25b2";
|
||||
}
|
||||
|
||||
.sort-down::after {
|
||||
content: "\25bc";
|
||||
}
|
||||
|
||||
td {
|
||||
text-align: left;
|
||||
}
|
||||
|
||||
.port {
|
||||
text-align: left;
|
||||
}
|
||||
|
||||
.visible {
|
||||
opacity: 0;
|
||||
/*transition: opacity 500ms linear;*/
|
||||
}
|
||||
|
||||
.hidden {
|
||||
opacity: 1;
|
||||
transition: opacity 5000ms ease-in;
|
||||
}
|
||||
|
||||
#connected {
|
||||
fill: rgb(224, 32, 6)
|
||||
}
|
256
web/main.js
256
web/main.js
@ -1,256 +0,0 @@
|
||||
window.onload = () => {
|
||||
const table_elem = document.getElementById("table");
|
||||
const last_change = document.getElementById("last_change");
|
||||
const last_ping = document.getElementById("last_ping");
|
||||
const connected = document.getElementById("connected");
|
||||
const free_ports = document.getElementById("free_ports");
|
||||
|
||||
const timeout_duration = 10 * 1000;
|
||||
const retry_timeout = 5 * 1000;
|
||||
|
||||
let reconnect_timeout;
|
||||
let ping_timeout;
|
||||
let evtSource;
|
||||
let table = [];
|
||||
|
||||
let last_update = new Date();
|
||||
|
||||
let direction = -1;
|
||||
let oldkey = "last_change";
|
||||
|
||||
table_elem.firstChild.firstChild.lastChild.className = "sort sort-down";
|
||||
|
||||
let time_ago = (ms) => {
|
||||
let value = ms / 1000;
|
||||
// let prev = 0;
|
||||
let unit = 0;
|
||||
|
||||
let factors = [
|
||||
[1, "Sekunde", "n"],
|
||||
[60, "Minute", "n"],
|
||||
[60, "Stunde", "n"],
|
||||
[24, "Tag", "en"],
|
||||
[7, "Woche", "n"],
|
||||
[4.348214, "Monat", "en"],
|
||||
[12, "Jahr", "en"],
|
||||
];
|
||||
|
||||
for (let i in factors) {
|
||||
let factor = factors[i][0];
|
||||
let new_value = Math.floor(value / factor);
|
||||
if (new_value == 0) break;
|
||||
|
||||
// prev = Math.floor(value % factor);
|
||||
value = new_value;
|
||||
unit = i;
|
||||
}
|
||||
|
||||
let factor = factors[unit];
|
||||
|
||||
return [value, factors[unit][1] + (value == 1 ? "" : factor[2])];
|
||||
};
|
||||
|
||||
sort = (element, key) => {
|
||||
if (key == oldkey) {
|
||||
direction *= -1;
|
||||
} else {
|
||||
oldkey = key;
|
||||
direction = -1;
|
||||
}
|
||||
|
||||
for (let child of table_elem.firstChild.firstChild.children) {
|
||||
child.className = "";
|
||||
}
|
||||
|
||||
element.className = `sort ${direction > 0 ? "sort-up" : "sort-down"}`;
|
||||
|
||||
do_sort(oldkey, direction);
|
||||
};
|
||||
|
||||
let do_sort = (key, direction) => {
|
||||
let is_number = !!~(["port", "number", "last_change"].indexOf(key));
|
||||
|
||||
table = table.sort((a, b) => (direction * (
|
||||
is_number ? a[key] - b[key] : ("" + a[key]).localeCompare(b[key], "de-DE")
|
||||
)));
|
||||
|
||||
populate_table();
|
||||
};
|
||||
|
||||
let fmt = Intl.DateTimeFormat("de-DE", {
|
||||
dateStyle: "medium",
|
||||
timeStyle: "medium",
|
||||
});
|
||||
|
||||
let format_date = (date) => fmt.format(date).replace(", ", " ");
|
||||
let format_time = (date) => fmt.format(date).split(", ", 2)[1];
|
||||
|
||||
let populate_table = () => {
|
||||
// clear everything except for the header row
|
||||
while (table_elem.rows.length > 1) {
|
||||
table_elem.deleteRow(-1);
|
||||
}
|
||||
|
||||
for (let row of table) {
|
||||
let tr = table_elem.insertRow(-1);
|
||||
|
||||
let values = [
|
||||
row.name === null ? "?" : row.name,
|
||||
row.number,
|
||||
row.port,
|
||||
row.status,
|
||||
time_ago(last_update - row.last_change),
|
||||
];
|
||||
|
||||
let names = [
|
||||
"name",
|
||||
"number",
|
||||
"port",
|
||||
"status",
|
||||
"last_change",
|
||||
];
|
||||
|
||||
for (let i in values) {
|
||||
let value = values[i];
|
||||
let name = names[i];
|
||||
|
||||
let td = tr.insertCell(-1);
|
||||
td.className = name;
|
||||
|
||||
if (name == "last_change") {
|
||||
let [number, unit] = value;
|
||||
|
||||
let span = document.createElement("span");
|
||||
// span.className = "value";
|
||||
span.innerText = number;
|
||||
td.appendChild(span);
|
||||
|
||||
span = document.createElement("span");
|
||||
span.className = "unit";
|
||||
span.innerText = unit;
|
||||
td.appendChild(span);
|
||||
} else {
|
||||
td.innerText = value;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let update_table = (data) => {
|
||||
const allowed_ports = data.allowed_ports.map((x) => x.end - x.start + 1)
|
||||
.reduce((a, b) => a + b, 0);
|
||||
free_ports.innerText = `Freie Ports: ${
|
||||
allowed_ports - Object.keys(data.allocated_ports).length -
|
||||
data.errored_ports.length
|
||||
}`;
|
||||
|
||||
// last_change.innerHTML = `Letzte Änderung: ${format_date(new Date(+data.last_update * 1000))}`;
|
||||
|
||||
table = [];
|
||||
|
||||
for (let number in data.allocated_ports) {
|
||||
try {
|
||||
let port = data.allocated_ports[number];
|
||||
number = +number;
|
||||
|
||||
// allocated port has no state. This means that it is unknown and should not be displayed
|
||||
if (data.port_state[port] == undefined) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let { status, last_change } = data.port_state[port];
|
||||
|
||||
let rejector = data.rejectors[port] || null;
|
||||
|
||||
if (rejector && rejector instanceof Array) {
|
||||
rejector = rejector.map((x) => "0x" + x.toString(16).padStart(2, 0))
|
||||
.join(" ");
|
||||
}
|
||||
|
||||
last_change = new Date(last_change * 1000);
|
||||
|
||||
let name = data.names[number] || null;
|
||||
|
||||
switch (status) {
|
||||
case "disconnected":
|
||||
status = rejector ? `getrennt: ${rejector}` : "getrennt";
|
||||
break;
|
||||
case "idle":
|
||||
status = "bereit";
|
||||
break;
|
||||
case "in_call":
|
||||
status = "anruf";
|
||||
break;
|
||||
}
|
||||
|
||||
table.push({ port, number, status, last_change, rejector, name });
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
}
|
||||
}
|
||||
|
||||
for (let [timestamp, port] of data.errored_ports) {
|
||||
table.push({
|
||||
port,
|
||||
number: null,
|
||||
status: "Fehler",
|
||||
last_change: new Date(timestamp * 1000),
|
||||
rejector: null,
|
||||
name: "",
|
||||
});
|
||||
}
|
||||
|
||||
do_sort(oldkey, direction);
|
||||
|
||||
populate_table();
|
||||
};
|
||||
|
||||
let format_event = (event, method) =>
|
||||
(method || format_date)(new Date(+event.data * 1000));
|
||||
|
||||
let display_disconnected;
|
||||
|
||||
let connect_event_source = () => {
|
||||
clearTimeout(reconnect_timeout);
|
||||
clearTimeout(ping_timeout);
|
||||
ping_timeout = setTimeout(connect_event_source, timeout_duration);
|
||||
|
||||
evtSource && evtSource.close && evtSource.close();
|
||||
|
||||
evtSource = new EventSource("/events");
|
||||
evtSource.addEventListener("change", (event) => {
|
||||
last_ping.innerText = `Stand: ${format_event(event, format_time)}`;
|
||||
|
||||
last_update = new Date(+event.data * 1000);
|
||||
|
||||
fetch("/data")
|
||||
.then((res) => res.json())
|
||||
.then(update_table);
|
||||
});
|
||||
|
||||
evtSource.addEventListener("ping", (event) => {
|
||||
clearTimeout(ping_timeout);
|
||||
ping_timeout = setTimeout(connect_event_source, timeout_duration);
|
||||
|
||||
last_update = new Date(+event.data * 1000);
|
||||
|
||||
last_ping.innerText = `Stand: ${format_event(event, format_time)}`;
|
||||
connected.className = "visible";
|
||||
|
||||
populate_table();
|
||||
|
||||
clearTimeout(display_disconnected);
|
||||
display_disconnected = setTimeout(
|
||||
() => connected.className = "hidden",
|
||||
5000,
|
||||
);
|
||||
});
|
||||
|
||||
evtSource.onerror = () => {
|
||||
clearTimeout(reconnect_timeout);
|
||||
reconnect_timeout = setTimeout(connect_event_source, retry_timeout);
|
||||
};
|
||||
};
|
||||
|
||||
connect_event_source();
|
||||
};
|
Loading…
Reference in New Issue
Block a user