Compare commits

...

27 Commits

Author SHA1 Message Date
dd7b0ca839 add version to debug page 2024-05-31 20:40:57 +02:00
19c334a974 free up allocated ports if they fail to authenticate 2024-05-31 18:27:22 +02:00
0770c00737 don't update dependencies indiscriminately 2024-02-08 15:30:40 +01:00
300914eb8d update dependencies 2024-02-07 15:40:59 +01:00
68d4071bb9 cleanup 2023-11-21 14:59:18 +01:00
eb94d310e8 send content type for index 2023-11-21 13:12:02 +01:00
5484140688 cleanup 2023-11-21 13:10:18 +01:00
db138f5123 add space in sse data 2023-09-03 19:52:44 +02:00
c2c31a1410 remove false comment 2023-09-03 13:31:09 +02:00
4ad5504004 add wait in rejector to accomodate bad TCP stacks in itelex devices 2023-09-03 13:28:58 +02:00
7ba68cedbd only allow clients that are already registered 2023-07-09 00:43:52 +02:00
76502f5e00 clean up packet errors 2023-06-24 16:14:15 +02:00
eb5e5fd0fa don't store when a client did anything but disconnect 2023-06-20 18:49:36 +02:00
1ca44e300e fix typo 2023-06-20 18:35:57 +02:00
2e8496ae37 warn instead of erroring when a client fails to send an authentication request 2023-06-20 18:33:50 +02:00
7b9582d518 fix plural for durations 2023-06-20 18:26:15 +02:00
a94a6dbb55 update dependencies 2023-06-20 13:59:00 +02:00
e77fa730dd switch packet buffer to smallvec 2023-06-20 13:58:23 +02:00
813e1af396 optimize release builds more 2023-06-11 21:19:14 +02:00
2f41d7a2ea redesign UI 2023-06-11 19:20:36 +02:00
9b39bac8d7 reformat 2023-06-11 08:20:38 +02:00
8991675f50 reformat 2023-06-11 08:06:05 +02:00
c874ecdfa3 add sorting 2023-06-11 08:00:59 +02:00
fcde6281e3 force locale 2023-06-11 07:03:30 +02:00
2adfee9825 show errored ports 2023-06-11 06:59:55 +02:00
93e2f1e0b2 fix last change 2023-06-11 06:48:42 +02:00
5fe2177a14 minor ui changes 2023-06-11 06:44:14 +02:00
13 changed files with 1134 additions and 575 deletions

652
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,33 +1,45 @@
[package] [package]
name = "centralex" name = "centralex"
version = "0.1.0" version = "1.0.1"
edition = "2021" edition = "2021"
[profile.release] [profile.release]
debug = true strip = true
lto = true
codegen-units = 1
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
tokio = { version = "1.24.2", features = ["macros", "rt-multi-thread", "net", "io-util", "sync", "time"] } tokio = { version = "1.24.2", features = [
"macros",
"rt-multi-thread",
"net",
"io-util",
"sync",
"time",
] }
time = { version = "0.3.20", features = ["local-offset", "macros"] } time = { version = "0.3.20", features = ["local-offset", "macros"] }
bytemuck = { version = "1.13.0", features = ["derive"] } bytemuck = { version = "1.13.0", features = ["derive"] }
serde = { version = "1.0.152", features = ["derive"] } serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.91" serde_json = "1.0.91"
hyper = { version = "0.14.24", optional = true, features = ["server", "http1", "tcp", "stream"] } hyper = { version = "1.0.1", optional = true, features = ["server", "http1"] }
futures = { version = "0.3.27", default-features = false, features = ["std"] } futures = { version = "0.3.27", default-features = false, features = ["std"] }
tracing = "0.1.37" tracing = "0.1.37"
tracing-subscriber = { version = "0.3.16", features = ["time"] } tracing-subscriber = { version = "0.3.16", features = ["time"] }
console-subscriber = { version = "0.1.8", optional = true } console-subscriber = { version = "0.2.0", optional = true }
once_cell = "1.17.1" once_cell = "1.17.1"
eyre = "0.6.8" eyre = "0.6.8"
color-eyre = "0.6.2" color-eyre = "0.6.2"
tracing-error = "0.2.0" tracing-error = "0.2.0"
zerocopy = "0.6.1" zerocopy = { version = "0.7.26", features = ["derive"] }
tokio-stream = { version = "0.1.14", features = ["sync"] } tokio-stream = { version = "0.1.14", features = ["sync"] }
flate2 = { version = "1.0.26", optional = true } flate2 = { version = "1.0.26", optional = true }
bytes = "1.4.0" bytes = "1.4.0"
smallvec = { version = "1.10.0", features = ["serde", "const_generics"] }
http-body-util = "0.1.0"
pin-project = "1.1.3"
async-stream = "0.3.5"
[build-dependencies] [build-dependencies]
minify-js = { version = "0.5.6", optional = true } minify-js = { version = "0.5.6", optional = true }
@@ -37,5 +49,11 @@ flate2 = { version = "1.0.26", optional = true }
[features] [features]
default = ["debug_server"] default = ["debug_server"]
debug_server = ["dep:hyper", "minify-html", "dep:minify-js", "dep:css-minify", "dep:flate2"] debug_server = [
"dep:hyper",
"dep:minify-html",
"dep:minify-js",
"dep:css-minify",
"dep:flate2",
]
tokio_console = ["dep:console-subscriber"] tokio_console = ["dep:console-subscriber"]

View File

@@ -30,7 +30,7 @@ fn pack_debug_page() -> Result<(), Box<dyn std::error::Error>> {
.unwrap(); .unwrap();
let js = std::str::from_utf8(&out)?; let js = std::str::from_utf8(&out)?;
let css = Minifier::default().minify(&css, Level::Three).unwrap(); let css = Minifier::default().minify(&css, Level::One).unwrap();
let (start, end) = html let (start, end) = html
.split_once("<!--INSERT SVG HERE-->") .split_once("<!--INSERT SVG HERE-->")
@@ -42,8 +42,15 @@ fn pack_debug_page() -> Result<(), Box<dyn std::error::Error>> {
.split_once("<!--INSERT HEAD CONTENT HERE-->") .split_once("<!--INSERT HEAD CONTENT HERE-->")
.expect("did not find head split point in html"); .expect("did not find head split point in html");
let (body_a, body_b) = body
.split_once("#INSERT VERSION HERE#")
.expect("did not find version split point in html");
let version = env!("CARGO_PKG_VERSION");
let html = minify_html::minify( let html = minify_html::minify(
format!("{head}<style>{css}</style><script>{js}</script>{body}").as_bytes(), format!("{head}<style>{css}</style><script>{js}</script>{body_a}{version}{body_b}")
.as_bytes(),
&minify_html::Cfg::spec_compliant(), &minify_html::Cfg::spec_compliant(),
); );

View File

@@ -34,7 +34,7 @@ pub async fn dyn_ip_update(
kind: PacketKind::DynIpUpdate.raw(), kind: PacketKind::DynIpUpdate.raw(),
length: 8, length: 8,
}, },
data: Vec::new(), ..Default::default()
}; };
packet.data.resize(packet.header.length as usize, 0); packet.data.resize(packet.header.length as usize, 0);
@@ -55,11 +55,11 @@ pub async fn dyn_ip_update(
packet.recv_into(&mut reader).await?; packet.recv_into(&mut reader).await?;
let result = match packet.kind() { let result = match packet.kind() {
PacketKind::DynIpUpdateResponse => Ok(<[u8; 4]>::try_from(packet.data) PacketKind::DynIpUpdateResponse => Ok(<[u8; 4]>::try_from(packet.data.as_slice())
.map_err(|err| { .map_err(|_| {
eyre!( eyre!(
"too little data for ip address. Need 4 bytes got {}", "too little data for ip address. Need 4 bytes got {}",
err.len() packet.data.len()
) )
})? })?
.into()), .into()),

View File

@@ -10,17 +10,25 @@ use tokio::{
sync::Mutex, sync::Mutex,
time::{sleep, timeout}, time::{sleep, timeout},
}; };
use tracing::{info, instrument, trace}; use tracing::{info, instrument, trace, warn};
use crate::{ use crate::{
auth::dyn_ip_update, auth::dyn_ip_update,
constants::{AUTH_TIMEOUT, CALL_ACK_TIMEOUT, CALL_TIMEOUT, PING_TIMEOUT, SEND_PING_INTERVAL}, constants::{AUTH_TIMEOUT, CALL_ACK_TIMEOUT, CALL_TIMEOUT, PING_TIMEOUT, SEND_PING_INTERVAL},
http::peer_query, http::peer_query,
packets::{Header, Packet, PacketKind, RemConnect, REJECT_OOP, REJECT_TIMEOUT}, packets::{
Header, Packet, PacketKind, RemConnect, REJECT_OOP, REJECT_TIMEOUT, REJECT_UNKNOWN_CLIENT,
},
ports::{PortHandler, PortStatus}, ports::{PortHandler, PortStatus},
Config, HandlerMetadata, Config, HandlerMetadata,
}; };
pub enum AuthResult {
OutOfPorts,
UnknownClient,
Success { port: u16 },
}
/// # Errors /// # Errors
/// - the client authentication fails /// - the client authentication fails
#[instrument(skip(config, port_handler, handler_metadata))] #[instrument(skip(config, port_handler, handler_metadata))]
@@ -30,7 +38,16 @@ async fn authenticate(
handler_metadata: &mut HandlerMetadata, handler_metadata: &mut HandlerMetadata,
number: u32, number: u32,
pin: u16, pin: u16,
) -> eyre::Result<Option<u16>> { ) -> eyre::Result<AuthResult> {
// TODO: do we want to cache this? If so, for how long?
let name = peer_query(&config.dyn_ip_server, number).await?;
let Some(name) = name else {
return Ok(AuthResult::UnknownClient);
};
info!(%name, "found client");
let mut authenticated = false; let mut authenticated = false;
loop { loop {
let mut updated_server = false; let mut updated_server = false;
@@ -41,9 +58,11 @@ async fn authenticate(
.allocate_port_for_number(config, number); .allocate_port_for_number(config, number);
let Some(port) = port else { let Some(port) = port else {
return Ok(None); return Ok(AuthResult::OutOfPorts);
}; };
handler_metadata.port = Some(port);
// make sure the client is authenticated before opening any ports // make sure the client is authenticated before opening any ports
if !authenticated { if !authenticated {
let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port).await?; let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port).await?;
@@ -71,27 +90,17 @@ async fn authenticate(
let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port).await?; let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port).await?;
} }
#[cfg(feature = "debug_server")]
let name = peer_query(&config.dyn_ip_server, number).await?;
let mut port_handler = port_handler.lock().await; let mut port_handler = port_handler.lock().await;
#[cfg(feature = "debug_server")]
if let Some(name) = name {
info!(%name, "found client name");
port_handler.names.insert(number, name);
}
port_handler.register_update(); port_handler.register_update();
port_handler.names.insert(number, name);
port_handler port_handler
.port_state .port_state
.entry(port) .entry(port)
.or_default() .or_default()
.new_state(PortStatus::Idle); .new_state(PortStatus::Idle);
handler_metadata.port = Some(port); break Ok(AuthResult::Success { port });
break Ok(Some(port));
} }
port_handler.lock().await.mark_port_error(number, port); port_handler.lock().await.mark_port_error(number, port);
@@ -152,12 +161,12 @@ async fn idle(
break Ok(Some(IdleResult::Disconnect { packet })) break Ok(Some(IdleResult::Disconnect { packet }))
} }
}, },
_ = sleep(send_next_ping_in) => { () = sleep(send_next_ping_in) => {
trace!("sending ping"); trace!("sending ping");
writer.write_all(bytemuck::bytes_of(& Header { kind: PacketKind::Ping.raw(), length: 0 })).await?; writer.write_all(bytemuck::bytes_of(& Header { kind: PacketKind::Ping.raw(), length: 0 })).await?;
last_ping_sent_at = Instant::now(); last_ping_sent_at = Instant::now();
} }
_ = sleep(next_ping_expected_in) => { () = sleep(next_ping_expected_in) => {
writer.write_all(REJECT_TIMEOUT).await?; writer.write_all(REJECT_TIMEOUT).await?;
break Ok(None); break Ok(None);
@@ -323,19 +332,33 @@ pub async fn handler(
let mut packet = Packet::default(); let mut packet = Packet::default();
let Ok(res) = timeout(AUTH_TIMEOUT, packet.recv_into_cancelation_safe(&mut reader)).await else { let Ok(res) = timeout(AUTH_TIMEOUT, packet.recv_into_cancelation_safe(&mut reader)).await
else {
writer.write_all(REJECT_TIMEOUT).await?; writer.write_all(REJECT_TIMEOUT).await?;
return Ok(()); return Ok(());
}; };
res?; res?;
if packet.kind() != PacketKind::RemConnect {
let kind = packet.kind();
warn!(%addr, ?kind, "client sent unexpected packet instead of RemConnect");
return Ok(());
}
let RemConnect { number, pin } = packet.as_rem_connect()?; let RemConnect { number, pin } = packet.as_rem_connect()?;
handler_metadata.number = Some(number); handler_metadata.number = Some(number);
let Some(port) = authenticate(config, port_handler, handler_metadata, number, pin).await? else { let port = match authenticate(config, port_handler, handler_metadata, number, pin).await? {
writer.write_all(REJECT_OOP).await?; AuthResult::OutOfPorts => {
return Ok(()); writer.write_all(REJECT_OOP).await?;
return Ok(());
}
AuthResult::UnknownClient => {
writer.write_all(REJECT_UNKNOWN_CLIENT).await?;
return Ok(());
}
AuthResult::Success { port } => port,
}; };
info!(%addr, number, port, "authenticated"); info!(%addr, number, port, "authenticated");
@@ -351,18 +374,21 @@ pub async fn handler(
packet.data.clear(); packet.data.clear();
packet.send(&mut writer).await?; packet.send(&mut writer).await?;
let Some(idle_result) = idle( let Some(idle_result) = idle(listener, packet, &mut reader, &mut writer).await? else {
listener,
packet,
&mut reader,
&mut writer,
).await? else {
return Ok(()); return Ok(());
}; };
let Some((mut caller, mut packet)) = notify_or_disconnect(idle_result, handler_metadata, port_handler, port, &mut writer).await? else { let Some((mut caller, mut packet)) = notify_or_disconnect(
idle_result,
handler_metadata,
port_handler,
port,
&mut writer,
)
.await?
else {
return Ok(()); return Ok(());
}; };
let notify_at = Instant::now(); let notify_at = Instant::now();
@@ -373,9 +399,9 @@ pub async fn handler(
); );
let Ok(res) = recv.await else { let Ok(res) = recv.await else {
writer.write_all(REJECT_TIMEOUT).await?; writer.write_all(REJECT_TIMEOUT).await?;
return Ok(()); return Ok(());
}; };
res?; res?;
match packet.kind() { match packet.kind() {

View File

@@ -1,29 +1,114 @@
use bytes::BytesMut; use bytes::{Bytes, BytesMut};
use futures::Future; use futures::Future;
use http_body_util::combinators::UnsyncBoxBody;
use http_body_util::BodyExt;
use http_body_util::StreamBody;
use hyper::body::{Body, Frame};
use hyper::header::{ACCEPT_ENCODING, CACHE_CONTROL, CONTENT_ENCODING, CONTENT_TYPE}; use hyper::header::{ACCEPT_ENCODING, CACHE_CONTROL, CONTENT_ENCODING, CONTENT_TYPE};
use hyper::rt::Executor; use hyper::rt::Executor;
use hyper::service::{make_service_fn, service_fn}; use hyper::service::service_fn;
use hyper::{Body, Method, Request, Response, Server, StatusCode}; use hyper::{Method, Request, Response, StatusCode};
use std::convert::Infallible;
use std::io::Read; use std::io::Read;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tokio::net::TcpListener;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio_stream::wrappers::{IntervalStream, WatchStream}; use tokio_stream::wrappers::{IntervalStream, WatchStream};
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tracing::error; use tracing::error;
use zerocopy::{AsBytes, FromBytes, LittleEndian, Unaligned};
use tracing::{debug, instrument}; use tracing::{debug, instrument};
use zerocopy::{AsBytes, FromBytes, FromZeroes, LittleEndian, Unaligned};
use crate::constants::DEBUG_SERVER_PING_INTERVAL; use crate::constants::DEBUG_SERVER_PING_INTERVAL;
use crate::packets::{Header, Packet}; use crate::packets::{Header, Packet};
use crate::ports::PortHandler; use crate::ports::PortHandler;
use crate::spawn; use crate::spawn;
mod tokio_io {
use std::{
pin::Pin,
task::{Context, Poll},
};
#[derive(Debug)]
#[pin_project::pin_project]
pub struct TokioIo<T> {
#[pin]
inner: T,
}
impl<T> TokioIo<T> {
pub fn new(inner: T) -> Self {
Self { inner }
}
}
impl<T> hyper::rt::Read for TokioIo<T>
where
T: tokio::io::AsyncRead,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut buf: hyper::rt::ReadBufCursor<'_>,
) -> Poll<Result<(), std::io::Error>> {
let n = unsafe {
let mut temp_buf = tokio::io::ReadBuf::uninit(buf.as_mut());
match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut temp_buf) {
Poll::Ready(Ok(())) => temp_buf.filled().len(),
other => return other,
}
};
unsafe {
buf.advance(n);
}
Poll::Ready(Ok(()))
}
}
impl<T> hyper::rt::Write for TokioIo<T>
where
T: tokio::io::AsyncWrite,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
}
fn is_write_vectored(&self) -> bool {
tokio::io::AsyncWrite::is_write_vectored(&self.inner)
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs)
}
}
}
use self::tokio_io::TokioIo;
#[derive(Clone)] #[derive(Clone)]
struct NamedExecutor; struct NamedExecutor;
impl<T: Send + 'static, Fut: Future<Output = T> + Send + 'static> Executor<Fut> for NamedExecutor { impl<T: Send + 'static, Fut: Future<Output = T> + Send + 'static> Executor<Fut> for NamedExecutor {
@@ -34,7 +119,26 @@ impl<T: Send + 'static, Fut: Future<Output = T> + Send + 'static> Executor<Fut>
const COMPRESSED_HTML: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/minified.html.gz")); const COMPRESSED_HTML: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/minified.html.gz"));
async fn index(req: &Request<Body>) -> Result<Response<Body>, hyper::http::Error> { type ResponseBody = UnsyncBoxBody<Bytes, hyper::Error>;
type RouteResponse = Result<Response<ResponseBody>, hyper::http::Error>;
fn full<D, B>(buf: B) -> ResponseBody
where
http_body_util::Full<D>: From<B> + BodyExt + Body<Data = Bytes>,
D: Send + 'static,
{
http_body_util::Full::from(buf)
.map_err(|_| unreachable!())
.boxed_unsync()
}
fn empty() -> ResponseBody {
http_body_util::Empty::new()
.map_err(|_| unreachable!())
.boxed_unsync()
}
fn index(req: &Request<hyper::body::Incoming>) -> RouteResponse {
let response = Response::builder(); let response = Response::builder();
let accepts_gzip = req let accepts_gzip = req
@@ -52,9 +156,10 @@ async fn index(req: &Request<Body>) -> Result<Response<Body>, hyper::http::Error
if accepts_gzip { if accepts_gzip {
response response
.header(CONTENT_ENCODING, "gzip") .header(CONTENT_ENCODING, "gzip")
.body(Body::from(COMPRESSED_HTML)) .header(CONTENT_TYPE, "text/html")
.body(full(COMPRESSED_HTML))
} else { } else {
let (mut sender, body) = Body::channel(); let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
spawn("gunzip task", async move { spawn("gunzip task", async move {
let mut decoder = let mut decoder =
@@ -88,59 +193,80 @@ async fn index(req: &Request<Body>) -> Result<Response<Body>, hyper::http::Error
chunk.truncate(i); chunk.truncate(i);
if sender.send_data(chunk.freeze()).await.is_err() { if sender.send(chunk.freeze()).await.is_err() {
break; break;
} }
} }
}); });
response.body(body) response.body(
StreamBody::new(async_stream::stream! {
while let Some(item) = receiver.recv().await {
yield Ok(Frame::data(item));
}
})
.boxed_unsync(),
)
} }
} }
async fn data( async fn data(
_req: &Request<Body>, _req: &Request<hyper::body::Incoming>,
port_handler: Arc<Mutex<PortHandler>>, port_handler: &Mutex<PortHandler>,
) -> Result<Response<Body>, hyper::http::Error> { ) -> RouteResponse {
let res = Response::builder().header(CACHE_CONTROL, "no-store"); let res = Response::builder().header(CACHE_CONTROL, "no-store");
match serde_json::to_string(&*port_handler.lock().await) { match serde_json::to_string(&*port_handler.lock().await) {
Ok(data) => res Ok(data) => res
.header(CONTENT_TYPE, "application/json") .header(CONTENT_TYPE, "application/json")
.body(Body::from(data)), .body(full(data)),
Err(err) => { Err(err) => {
error!(%err, "failed to serialize data for debug server"); error!(%err, "failed to serialize data for debug server");
res.status(StatusCode::INTERNAL_SERVER_ERROR) res.status(StatusCode::INTERNAL_SERVER_ERROR).body(empty())
.body(Body::from(""))
} }
} }
} }
fn events( fn events(
_req: &Request<Body>, _req: &Request<hyper::body::Incoming>,
change_receiver: tokio::sync::watch::Receiver<std::time::Instant>, change_receiver: tokio::sync::watch::Receiver<std::time::Instant>,
) -> Result<Response<Body>, hyper::http::Error> { ) -> RouteResponse {
let stream = WatchStream::new(change_receiver)
.map(|x| ("change", x))
.merge(
IntervalStream::new(tokio::time::interval(DEBUG_SERVER_PING_INTERVAL))
.map(|x| ("ping", x.into_std())),
)
.filter_map(|(kind, time)| {
let timestamp = (SystemTime::now() + time.elapsed())
.duration_since(UNIX_EPOCH)
.ok()?
.as_secs();
Some(Ok::<Frame<Bytes>, _>(Frame::data(Bytes::from(format!(
"event: {kind}\ndata: {timestamp}\n\n"
)))))
});
Response::builder() Response::builder()
.status(StatusCode::OK) .status(StatusCode::OK)
.header(CACHE_CONTROL, "no-store") .header(CACHE_CONTROL, "no-store")
.header(CONTENT_TYPE, "text/event-stream") .header(CONTENT_TYPE, "text/event-stream")
.body(Body::wrap_stream({ .body(StreamBody::new(stream).boxed_unsync())
WatchStream::new(change_receiver) }
.map(|x| ("change", x))
.merge(
IntervalStream::new(tokio::time::interval(DEBUG_SERVER_PING_INTERVAL))
.map(|x| ("ping", x.into_std())),
)
.filter_map(|(kind, time)| {
let timestamp = (SystemTime::now() + time.elapsed())
.duration_since(UNIX_EPOCH)
.ok()?
.as_secs();
Some(Ok::<_, Infallible>(format!( async fn debug_server_routes(
"event:{kind}\ndata: {timestamp}\n\n" req: Request<hyper::body::Incoming>,
))) port_handler: Arc<Mutex<PortHandler>>,
}) change_receiver: tokio::sync::watch::Receiver<std::time::Instant>,
})) ) -> RouteResponse {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") => index(&req),
(&Method::GET, "/data") => Ok(data(&req, &port_handler).await?),
(&Method::GET, "/events") => events(&req, change_receiver),
_ => Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(empty())?),
}
} }
pub async fn debug_server( pub async fn debug_server(
@@ -148,32 +274,47 @@ pub async fn debug_server(
port_handler: Arc<Mutex<PortHandler>>, port_handler: Arc<Mutex<PortHandler>>,
change_receiver: tokio::sync::watch::Receiver<std::time::Instant>, change_receiver: tokio::sync::watch::Receiver<std::time::Instant>,
) { ) {
let server = Server::bind(&addr) let listener = TcpListener::bind(addr).await;
.executor(NamedExecutor)
.serve(make_service_fn(move |_conn| {
let port_handler = port_handler.clone();
let change_receiver = change_receiver.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let port_handler = port_handler.clone();
let change_receiver = change_receiver.clone();
async move { let listener = match listener {
match (req.method(), req.uri().path()) { Ok(listener) => listener,
(&Method::GET, "/") => index(&req).await, Err(error) => {
(&Method::GET, "/data") => data(&req, port_handler).await, error!(%error, %addr, "failed to bind debug server");
(&Method::GET, "/events") => events(&req, change_receiver), return;
_ => Response::builder() }
.status(StatusCode::NOT_FOUND) };
.body(Body::empty()),
} loop {
} let conn = listener.accept().await;
}))
let stream = match conn {
Ok((stream, _)) => stream,
Err(error) => {
error!(%error, "failed accept debug server connection");
continue;
} }
})); };
if let Err(error) = server.await { let io = TokioIo::new(stream);
error!(%error, "debug server error");
let port_handler = port_handler.clone();
let change_receiver = change_receiver.clone();
tokio::task::spawn(async move {
if let Err(error) = hyper::server::conn::http1::Builder::new()
.serve_connection(
io,
service_fn(move |req| {
debug_server_routes(req, port_handler.clone(), change_receiver.clone())
}),
)
.await
{
if !error.is_incomplete_message() {
error!(%error, "Failed to serve debug server connection");
}
}
});
} }
} }
@@ -187,7 +328,7 @@ struct PeerQuery {
number: U32, number: U32,
} }
#[derive(FromBytes, Unaligned, Debug)] #[derive(FromBytes, FromZeroes, Unaligned, Debug)]
#[repr(packed)] #[repr(packed)]
#[allow(dead_code)] #[allow(dead_code)]
struct PeerReply { struct PeerReply {
@@ -212,7 +353,7 @@ pub async fn peer_query(server: &SocketAddr, number: u32) -> eyre::Result<Option
kind: 3, // Peer Query kind: 3, // Peer Query
length: 4, length: 4,
}, },
data: Vec::new(), ..Default::default()
}; };
packet.data.clear(); packet.data.clear();

View File

@@ -14,6 +14,7 @@ use futures::Future;
use http::debug_server; use http::debug_server;
use packets::{Header, Packet}; use packets::{Header, Packet};
use serde::{Deserialize, Deserializer}; use serde::{Deserialize, Deserializer};
use smallvec::SmallVec;
use time::format_description::OwnedFormatItem; use time::format_description::OwnedFormatItem;
use tokio::{ use tokio::{
io::AsyncWriteExt, io::AsyncWriteExt,
@@ -255,7 +256,10 @@ async fn connection_handler(
} }
_ => Some(err.to_string()), _ => Some(err.to_string()),
}, },
Ok(Ok(())) => None, Ok(Ok(())) => {
debug!(%addr, "finished handling client");
None
}
}; };
if let Some(error) = error { if let Some(error) = error {
@@ -280,10 +284,18 @@ async fn connection_handler(
let mut port_handler = port_handler.lock().await; let mut port_handler = port_handler.lock().await;
if let Some(port_state) = port_handler.port_state.get_mut(&port) { if let Some(port_state) = port_handler.port_state.get_mut(&port) {
// the client is known. Mark is as disconnected
port_state.new_state(PortStatus::Disconnected); port_state.new_state(PortStatus::Disconnected);
port_handler.register_update(); } else {
// the client is not known. Free its port for realloction
assert!(
port_handler.free_ports.insert(port),
"tried to free up a port that was not allocted"
);
} }
port_handler.register_update();
if let Some(listener) = handler_metadata.listener.take() { if let Some(listener) = handler_metadata.listener.take() {
port_handler.start_rejector( port_handler.start_rejector(
port, port,
@@ -293,13 +305,13 @@ async fn connection_handler(
kind: PacketKind::Reject.raw(), kind: PacketKind::Reject.raw(),
length: 3, length: 3,
}, },
data: b"nc\0".to_vec(), data: SmallVec::from_slice(b"nc\0"),
}, },
); );
} }
} }
sleep(Duration::from_secs(3)).await; sleep(Duration::from_secs(2)).await;
_ = stream.shutdown().await; _ = stream.shutdown().await;
} }
@@ -333,7 +345,7 @@ async fn tokio_main(config: Arc<Config>) -> eyre::Result<()> {
let (change_sender, change_receiver) = tokio::sync::watch::channel(std::time::Instant::now()); let (change_sender, change_receiver) = tokio::sync::watch::channel(std::time::Instant::now());
let mut port_handler = PortHandler::load_or_default(&cache_path, change_sender); let mut port_handler = PortHandler::load_or_default(&cache_path, change_sender);
port_handler.update_allowed_ports(&config.allowed_ports); port_handler.update_allowed_ports(config.allowed_ports.clone());
let port_handler = Arc::new(Mutex::new(port_handler)); let port_handler = Arc::new(Mutex::new(port_handler));

View File

@@ -1,8 +1,8 @@
use std::fmt::Debug; use std::fmt::{Debug, Display};
use bytemuck::{Pod, Zeroable}; use bytemuck::{Pod, Zeroable};
use eyre::eyre;
use serde::Serialize; use serde::Serialize;
use smallvec::SmallVec;
use tokio::{ use tokio::{
io::{AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt},
net::tcp::{ReadHalf, WriteHalf}, net::tcp::{ReadHalf, WriteHalf},
@@ -10,6 +10,48 @@ use tokio::{
pub const REJECT_OOP: &[u8; 6] = b"\x04\x04oop\x00"; pub const REJECT_OOP: &[u8; 6] = b"\x04\x04oop\x00";
pub const REJECT_TIMEOUT: &[u8; 10] = b"\x04\x08timeout\x00"; pub const REJECT_TIMEOUT: &[u8; 10] = b"\x04\x08timeout\x00";
pub const REJECT_UNKNOWN_CLIENT: &[u8; 17] = b"\x04\x0funknown client\x00";
#[derive(Debug)]
pub enum Error {
TooLittleData {
expected: u8,
got: u8,
},
Unexpected {
expected: PacketKind,
got: PacketKind,
},
Io(std::io::Error),
Client(Option<String>),
}
impl std::error::Error for Error {}
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::TooLittleData { expected, got } => write!(
f,
"received too little data: expected {expected} bytes but received only {got}"
),
Error::Unexpected { expected, got } => write!(
f,
"received an unexpected packet: expected {expected:?} but received {got:?}",
),
Error::Io(inner) => Display::fmt(inner, f),
Error::Client(Some(inner)) => write!(f, "client reported an error: {inner}"),
Error::Client(None) => write!(f, "client reported a malformed error",),
}
}
}
impl From<std::io::Error> for Error {
fn from(value: std::io::Error) -> Self {
Self::Io(value)
}
}
type Result<T> = std::result::Result<T, Error>;
#[repr(u8)] #[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -79,7 +121,7 @@ pub struct Header {
#[derive(Serialize, Default, Clone)] #[derive(Serialize, Default, Clone)]
pub struct Packet { pub struct Packet {
pub header: Header, pub header: Header,
pub data: Vec<u8>, pub data: SmallVec<[u8; 8]>,
} }
impl Packet { impl Packet {
@@ -150,10 +192,7 @@ impl Packet {
} }
#[allow(clippy::missing_errors_doc)] #[allow(clippy::missing_errors_doc)]
pub async fn recv_into_cancelation_safe( pub async fn recv_into_cancelation_safe(&mut self, stream: &mut ReadHalf<'_>) -> Result<()> {
&mut self,
stream: &mut ReadHalf<'_>,
) -> eyre::Result<()> {
// Makes sure all data is available before reading // Makes sure all data is available before reading
let header_bytes = bytemuck::bytes_of_mut(&mut self.header); let header_bytes = bytemuck::bytes_of_mut(&mut self.header);
stream.peek(header_bytes).await?; stream.peek(header_bytes).await?;
@@ -165,7 +204,7 @@ impl Packet {
} }
#[allow(clippy::missing_errors_doc)] #[allow(clippy::missing_errors_doc)]
pub async fn recv_into(&mut self, stream: &mut ReadHalf<'_>) -> eyre::Result<()> { pub async fn recv_into(&mut self, stream: &mut ReadHalf<'_>) -> Result<()> {
let header_bytes = bytemuck::bytes_of_mut(&mut self.header); let header_bytes = bytemuck::bytes_of_mut(&mut self.header);
stream.read_exact(header_bytes).await?; stream.read_exact(header_bytes).await?;
@@ -175,10 +214,7 @@ impl Packet {
stream.read_exact(&mut self.data).await?; stream.read_exact(&mut self.data).await?;
if self.header.kind == PacketKind::Error.raw() { if self.header.kind == PacketKind::Error.raw() {
return Err(eyre!( return Err(Error::Client(self.as_string().map(ToOwned::to_owned)));
"client reported error: {:?}",
self.as_string().unwrap_or("unknown dyn auth error")
));
} }
Ok(()) Ok(())
@@ -198,24 +234,26 @@ impl Packet {
/// # Errors /// # Errors
/// the packet must be a `RemConnect` packet and must contain at least 6 bytes of data /// the packet must be a `RemConnect` packet and must contain at least 6 bytes of data
pub fn as_rem_connect(&self) -> eyre::Result<RemConnect> { #[allow(clippy::missing_panics_doc)]
pub fn as_rem_connect(&self) -> Result<RemConnect> {
if self.kind() != PacketKind::RemConnect { if self.kind() != PacketKind::RemConnect {
return Err(eyre!( return Err(Error::Unexpected {
"Unexpected Packet: {:?} expected RemConnect", expected: PacketKind::RemConnect,
self.kind() got: self.kind(),
)); });
} }
if self.data.len() < 6 { if self.data.len() < 6 {
return Err(eyre!( #[allow(clippy::cast_possible_truncation)]
"Too little data for RemConnect. Need at least 6 Bytes got {}", return Err(Error::TooLittleData {
self.data.len() expected: 6,
)); got: self.data.len() as u8,
});
} }
Ok(RemConnect { Ok(RemConnect {
number: u32::from_le_bytes(self.data[..4].try_into()?), number: u32::from_le_bytes(self.data[..4].try_into().unwrap()),
pin: u16::from_le_bytes(self.data[4..6].try_into()?), pin: u16::from_le_bytes(self.data[4..6].try_into().unwrap()),
}) })
} }
} }

View File

@@ -1,6 +1,6 @@
use std::{ use std::{
collections::{BTreeSet, HashMap, HashSet}, collections::{BTreeSet, HashMap, HashSet},
fmt::{Debug, Display}, fmt::Debug,
fs::File, fs::File,
io::{BufReader, BufWriter}, io::{BufReader, BufWriter},
ops::RangeInclusive, ops::RangeInclusive,
@@ -12,10 +12,11 @@ use std::{
use eyre::eyre; use eyre::eyre;
use serde::{Deserialize, Serialize, Serializer}; use serde::{Deserialize, Serialize, Serializer};
use tokio::{ use tokio::{
io::AsyncWriteExt,
net::TcpListener, net::TcpListener,
sync::{watch::Receiver, Mutex}, sync::{watch::Receiver, Mutex},
task::JoinHandle, task::JoinHandle,
time::Instant, time::{sleep, Instant},
}; };
use tracing::{debug, error, info, instrument, warn}; use tracing::{debug, error, info, instrument, warn};
@@ -40,7 +41,7 @@ pub struct PortHandler {
allowed_ports: AllowedList, allowed_ports: AllowedList,
#[serde(skip)] #[serde(skip)]
free_ports: HashSet<Port>, pub free_ports: HashSet<Port>,
errored_ports: BTreeSet<(UnixTimestamp, Port)>, errored_ports: BTreeSet<(UnixTimestamp, Port)>,
allocated_ports: HashMap<Number, Port>, allocated_ports: HashMap<Number, Port>,
@@ -102,27 +103,34 @@ pub async fn cache_daemon(
} }
} }
#[derive(Hash, PartialEq, Eq)]
struct DisplayAsDebug<T: Display>(T);
impl<T: Display> Debug for DisplayAsDebug<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Default, Serialize, Deserialize)] #[derive(Default, Serialize, Deserialize)]
pub struct PortState { pub struct PortState {
#[serde(deserialize_with = "deserialize_last_change")]
last_change: UnixTimestamp, last_change: UnixTimestamp,
#[serde(skip_deserializing)] #[serde(skip_deserializing)]
status: PortStatus, status: PortStatus,
} }
fn deserialize_last_change<'de, D>(deserializer: D) -> Result<UnixTimestamp, D::Error>
where
D: serde::Deserializer<'de>,
{
Ok(match Option::<UnixTimestamp>::deserialize(deserializer)? {
Some(timestamp) => timestamp,
None => now(),
})
}
fn now() -> UnixTimestamp {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("timestamp overflow")
.as_secs()
}
impl PortState { impl PortState {
pub fn new_state(&mut self, status: PortStatus) { pub fn new_state(&mut self, status: PortStatus) {
self.last_change = SystemTime::now() self.last_change = now();
.duration_since(UNIX_EPOCH)
.expect("timestamp overflow")
.as_secs();
self.status = status; self.status = status;
} }
@@ -157,6 +165,8 @@ impl AllowedList {
} }
impl PortHandler { impl PortHandler {
/// # Panics
/// If the the `change_sender` could not be notified
pub fn register_update(&mut self) { pub fn register_update(&mut self) {
let now = std::time::Instant::now(); let now = std::time::Instant::now();
self.last_update = Some(now); self.last_update = Some(now);
@@ -187,7 +197,15 @@ impl PortHandler {
.unwrap() .unwrap()
.values_mut() .values_mut()
.for_each(|value| { .for_each(|value| {
value.as_object_mut().unwrap().remove("status").unwrap(); let value_object = value.as_object_mut().unwrap();
// it does not make sense to store when the did anything else other than disconnect
// because when we restart the server it will no longer be connected
if value_object.get("status").unwrap().as_str().unwrap() != "disconnected" {
*value_object.get_mut("last_change").unwrap() = serde_json::Value::Null;
}
value_object.remove("status").unwrap();
}); });
serde_json::to_writer(BufWriter::new(File::create(&temp_file)?), &value)?; serde_json::to_writer(BufWriter::new(File::create(&temp_file)?), &value)?;
@@ -218,10 +236,10 @@ impl PortHandler {
this this
} }
pub fn update_allowed_ports(&mut self, allowed_ports: &AllowedList) { pub fn update_allowed_ports(&mut self, allowed_ports: AllowedList) {
self.register_update(); self.register_update();
self.allowed_ports = allowed_ports.clone(); self.allowed_ports = allowed_ports;
self.free_ports.clear(); // remove all ports self.free_ports.clear(); // remove all ports
self.free_ports self.free_ports
@@ -314,7 +332,7 @@ impl Debug for Rejector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Rejector") f.debug_struct("Rejector")
.field("message", &self.state.1) .field("message", &self.state.1)
.finish() .finish_non_exhaustive()
} }
} }
@@ -330,12 +348,22 @@ impl Rejector {
spawn(&format!("rejector for port {port}",), async move { spawn(&format!("rejector for port {port}",), async move {
let (listener, packet) = state.as_ref(); let (listener, packet) = state.as_ref();
let packet = Arc::new(packet.clone());
let listener = listener.lock().await; let listener = listener.lock().await;
loop { loop {
if let Ok((mut socket, _)) = listener.accept().await { if let Ok((mut socket, _)) = listener.accept().await {
let (_, mut writer) = socket.split(); let packet = packet.clone();
_ = packet.send(&mut writer).await; tokio::task::spawn(async move {
let (_, mut writer) = socket.split();
_ = packet.send(&mut writer).await;
// wait two seconds before closing the connection
// to accomodate bad itelex tpc stacks
sleep(Duration::from_secs(2)).await;
_ = socket.shutdown().await;
});
} }
} }
}) })

View File

@@ -1 +1 @@
<svg><g transform="translate(-38.3 -51.4)"><path d="m98.8 55.1-6.52 6.52-3.78-3.78 6.52-6.52zm-7.49 20.5-8.5 8.5-16.8-16.8 8.5-8.5zm-16.9 0.0741a11.9 11.9 45 0 1 0.0256-16.8 11.9 11.9 45 0 1 16.8-0.0256 11.9 11.9 45 0 1-0.0256 16.8 11.9 11.9 45 0 1-16.8 0.0256zm-36.2 32.6 6.52-6.52 3.78 3.78-6.52 6.52zm19.7-25.3 8.39-8.39 1.88 1.88-8.39 8.39zm7.46 7.5 8.39-8.39 1.88 1.88-8.39 8.39zm-19.7-2.72 8.5-8.5 16.8 16.8-8.5 8.5zm16.9-0.0741a11.9 11.9 45 0 1-0.0256 16.8 11.9 11.9 45 0 1-16.8 0.0256 11.9 11.9 45 0 1 0.0256-16.8 11.9 11.9 45 0 1 16.8-0.0256z"/></g></svg> <svg version="1.1" viewBox="0 0 60.59 60.734" xmlns="http://www.w3.org/2000/svg"><g transform="translate(-38.21,-51.32)"><path d="m98.8 55.1-6.52 6.52-3.78-3.78 6.52-6.52zm-7.49 20.5-8.5 8.5-16.8-16.8 8.5-8.5zm-16.9 0.0741a11.9 11.9 45 0 1 0.0256-16.8 11.9 11.9 45 0 1 16.8-0.0256 11.9 11.9 45 0 1-0.0256 16.8 11.9 11.9 45 0 1-16.8 0.0256zm-36.2 32.6 6.52-6.52 3.78 3.78-6.52 6.52zm19.7-25.3 8.39-8.39 1.88 1.88-8.39 8.39zm7.46 7.5 8.39-8.39 1.88 1.88-8.39 8.39zm-19.7-2.72 8.5-8.5 16.8 16.8-8.5 8.5zm16.9-0.0741a11.9 11.9 45 0 1-0.0256 16.8 11.9 11.9 45 0 1-16.8 0.0256 11.9 11.9 45 0 1 0.0256-16.8 11.9 11.9 45 0 1 16.8-0.0256z"/></g></svg>

Before

Width:  |  Height:  |  Size: 564 B

After

Width:  |  Height:  |  Size: 643 B

View File

@@ -10,22 +10,24 @@
</head> </head>
<body> <body>
<p id="free_ports" /> <div class="content">
<div id="connected_box">
<div id="connected" class="hidden"><!--INSERT SVG HERE--></div>
<p id="last_update" />
</div>
<table id="table"> <div id="header">
<tr> <p id="free_ports" />
<th>Nummer</th> <div id="connected" class="hidden"><!--INSERT SVG HERE--></div>
<th>Port</th> <p id="last_ping" title="v#INSERT VERSION HERE#" />
<th>Zustand</th> </div>
<th>Name</th>
<th>Meldung</th> <table id="table">
<th>Letzte Änderung</th> <tr>
</tr> <th onclick="sort(this,'name')">Name</th>
</table> <th onclick="sort(this,'number')">Nummer</th>
<th onclick="sort(this,'port')">Port</th>
<th onclick="sort(this,'status')">Zustand</th>
<th onclick="sort(this,'last_change')">Seit</th>
</tr>
</table>
</div>
</body> </body>
</html> </html>

View File

@@ -2,39 +2,89 @@ body {
background-color: #eee; background-color: #eee;
} }
#connected_box { .last_change {
position: absolute; text-align: right;
top: 5%;
right: 5%;
width: 5%;
height: 5%;
} }
.last_change .unit {
padding-left: 1ch;
min-width: 8ch;
display: inline-block;
text-align: left;
}
#last_ping,
#connected,
#free_ports {
margin-top: 0;
margin-bottom: 0.25em;
}
#connected svg {
height: 1em;
}
#header {
display: flex;
flex-direction: row;
grid-template-columns: auto 1fr;
justify-content: space-between;
}
td, td,
th { th {
border: 1px solid black; border: 1px solid black;
padding: 0.5em; padding: 0.5em;
font-family: monospace;
} }
table { table {
border-spacing: 0; border-spacing: 0;
max-width: 100%;
margin: 0 auto;
} }
.number { th {
text-align: right; cursor: pointer;
user-select: none;
font-size: inherit;
} }
.text { th::after {
font-family: monospace;
padding-left: 3px;
}
th:not(.sort)::after {
content: "\a0";
}
.sort-up::after {
content: "\25b2";
}
.sort-down::after {
content: "\25bc";
}
td {
text-align: left;
}
.port {
text-align: left; text-align: left;
} }
.visible { .visible {
opacity: 1; opacity: 0;
transition: opacity 500ms linear; /*transition: opacity 500ms linear;*/
} }
.hidden { .hidden {
opacity: 0.2; opacity: 1;
transition: opacity 6000ms linear; transition: opacity 5000ms ease-in;
}
#connected {
fill: rgb(224, 32, 6)
} }

View File

@@ -1,127 +1,256 @@
window.onload = () => { window.onload = () => {
const table_elem = document.getElementById("table"); const table_elem = document.getElementById("table");
const last_update = document.getElementById("last_update"); const last_change = document.getElementById("last_change");
const connected = document.getElementById("connected"); const last_ping = document.getElementById("last_ping");
const free_ports = document.getElementById("free_ports"); const connected = document.getElementById("connected");
const free_ports = document.getElementById("free_ports");
const timeout_duration = 10*1000; const timeout_duration = 10 * 1000;
const retry_timeout = 5*1000; const retry_timeout = 5 * 1000;
let reconnect_timeout; let reconnect_timeout;
let ping_timeout; let ping_timeout;
let evtSource; let evtSource;
let table = []; let table = [];
let format_date = date => date.toLocaleDateString() + ' ' + date.toLocaleTimeString(); let last_update = new Date();
let print_table = () => { let direction = -1;
while(table_elem.children.length > 1) { let oldkey = "last_change";
table_elem.removeChild(table_elem.lastChild);
}
for (let row of table) { table_elem.firstChild.firstChild.lastChild.className = "sort sort-down";
let tr = document.createElement("tr"); let time_ago = (ms) => {
let value = ms / 1000;
let values = [ // let prev = 0;
row.number, let unit = 0;
row.port,
row.status,
row.name || "?",
row.rejector || "",
format_date(row.last_change)
];
for(let value of values) { let factors = [
let td = document.createElement("td"); [1, "Sekunde", "n"],
td.innerText = value; [60, "Minute", "n"],
td.className = Number.isInteger(value) ? "number" : "text"; [60, "Stunde", "n"],
tr.appendChild(td); [24, "Tag", "en"],
} [7, "Woche", "n"],
[4.348214, "Monat", "en"],
[12, "Jahr", "en"],
];
table_elem.appendChild(tr) for (let i in factors) {
} let factor = factors[i][0];
}; let new_value = Math.floor(value / factor);
if (new_value == 0) break;
let update_table = data => { // prev = Math.floor(value % factor);
console.log(data); value = new_value;
unit = i;
const allowed_ports = data.allowed_ports.map(x => x.end - x.start + 1).reduce((a,b) => a + b, 0);
free_ports.innerText = `Freie Ports: ${allowed_ports - Object.keys(data.allocated_ports).length - data.errored_ports.length}`;
table = [];
for(let number in data.allocated_ports) {
let port = data.allocated_ports[number];
number = +number;
let {status, last_change} = data.port_state[port];
let rejector = data.rejectors[port] || null;
if (rejector && rejector instanceof Array) {
rejector = rejector.map(x => "0x"+x.toString(16).padStart(2, 0)).join(" ")
}
last_change = new Date(last_change * 1000);
let name = data.names[number] || null;
switch(status) {
case "disconnected":
status = "getrennt";
break;
case "idle":
status = "bereit";
break;
case "in_call":
status = "anruf";
break;
}
table.push({port, number, status, last_change, rejector, name})
}
console.log(table);
print_table();
};
let connect_event_source = () => {
clearTimeout(reconnect_timeout);
clearTimeout(ping_timeout);
ping_timeout = setTimeout(connect_event_source, timeout_duration);
evtSource && evtSource.close && evtSource.close();
evtSource = new EventSource("/events");
evtSource.addEventListener("change", event => {
last_update.innerText = `Letzte Änderung: ${format_date(new Date(+event.data * 1000))}`;
fetch("/data")
.then(res => res.json())
.then(update_table);
});
evtSource.addEventListener("ping", event => {
clearTimeout(ping_timeout);
ping_timeout = setTimeout(connect_event_source, timeout_duration);
last_update.innerText = `Letzte Änderung: ${format_date(new Date(+event.data * 1000))}`;
connected.className = "visible";
setTimeout(() => connected.className = "hidden", 1000);
});
evtSource.onerror = () => {
clearTimeout(reconnect_timeout);
reconnect_timeout = setTimeout(connect_event_source, retry_timeout);
};
} }
connect_event_source(); let factor = factors[unit];
}; return [value, factors[unit][1] + (value == 1 ? "" : factor[2])];
};
sort = (element, key) => {
if (key == oldkey) {
direction *= -1;
} else {
oldkey = key;
direction = -1;
}
for (let child of table_elem.firstChild.firstChild.children) {
child.className = "";
}
element.className = `sort ${direction > 0 ? "sort-up" : "sort-down"}`;
do_sort(oldkey, direction);
};
let do_sort = (key, direction) => {
let is_number = !!~(["port", "number", "last_change"].indexOf(key));
table = table.sort((a, b) => (direction * (
is_number ? a[key] - b[key] : ("" + a[key]).localeCompare(b[key], "de-DE")
)));
populate_table();
};
let fmt = Intl.DateTimeFormat("de-DE", {
dateStyle: "medium",
timeStyle: "medium",
});
let format_date = (date) => fmt.format(date).replace(", ", " ");
let format_time = (date) => fmt.format(date).split(", ", 2)[1];
let populate_table = () => {
// clear everything except for the header row
while (table_elem.rows.length > 1) {
table_elem.deleteRow(-1);
}
for (let row of table) {
let tr = table_elem.insertRow(-1);
let values = [
row.name === null ? "?" : row.name,
row.number,
row.port,
row.status,
time_ago(last_update - row.last_change),
];
let names = [
"name",
"number",
"port",
"status",
"last_change",
];
for (let i in values) {
let value = values[i];
let name = names[i];
let td = tr.insertCell(-1);
td.className = name;
if (name == "last_change") {
let [number, unit] = value;
let span = document.createElement("span");
// span.className = "value";
span.innerText = number;
td.appendChild(span);
span = document.createElement("span");
span.className = "unit";
span.innerText = unit;
td.appendChild(span);
} else {
td.innerText = value;
}
}
}
};
let update_table = (data) => {
const allowed_ports = data.allowed_ports.map((x) => x.end - x.start + 1)
.reduce((a, b) => a + b, 0);
free_ports.innerText = `Freie Ports: ${
allowed_ports - Object.keys(data.allocated_ports).length -
data.errored_ports.length
}`;
// last_change.innerHTML = `Letzte Änderung: ${format_date(new Date(+data.last_update * 1000))}`;
table = [];
for (let number in data.allocated_ports) {
try {
let port = data.allocated_ports[number];
number = +number;
// allocated port has no state. This means that it is unknown and should not be displayed
if (data.port_state[port] == undefined) {
continue;
}
let { status, last_change } = data.port_state[port];
let rejector = data.rejectors[port] || null;
if (rejector && rejector instanceof Array) {
rejector = rejector.map((x) => "0x" + x.toString(16).padStart(2, 0))
.join(" ");
}
last_change = new Date(last_change * 1000);
let name = data.names[number] || null;
switch (status) {
case "disconnected":
status = rejector ? `getrennt: ${rejector}` : "getrennt";
break;
case "idle":
status = "bereit";
break;
case "in_call":
status = "anruf";
break;
}
table.push({ port, number, status, last_change, rejector, name });
} catch (error) {
console.error(error);
}
}
for (let [timestamp, port] of data.errored_ports) {
table.push({
port,
number: null,
status: "Fehler",
last_change: new Date(timestamp * 1000),
rejector: null,
name: "",
});
}
do_sort(oldkey, direction);
populate_table();
};
let format_event = (event, method) =>
(method || format_date)(new Date(+event.data * 1000));
let display_disconnected;
let connect_event_source = () => {
clearTimeout(reconnect_timeout);
clearTimeout(ping_timeout);
ping_timeout = setTimeout(connect_event_source, timeout_duration);
evtSource && evtSource.close && evtSource.close();
evtSource = new EventSource("/events");
evtSource.addEventListener("change", (event) => {
last_ping.innerText = `Stand: ${format_event(event, format_time)}`;
last_update = new Date(+event.data * 1000);
fetch("/data")
.then((res) => res.json())
.then(update_table);
});
evtSource.addEventListener("ping", (event) => {
clearTimeout(ping_timeout);
ping_timeout = setTimeout(connect_event_source, timeout_duration);
last_update = new Date(+event.data * 1000);
last_ping.innerText = `Stand: ${format_event(event, format_time)}`;
connected.className = "visible";
populate_table();
clearTimeout(display_disconnected);
display_disconnected = setTimeout(
() => connected.className = "hidden",
5000,
);
});
evtSource.onerror = () => {
clearTimeout(reconnect_timeout);
reconnect_timeout = setTimeout(connect_event_source, retry_timeout);
};
};
connect_event_source();
};