Compare commits

..

15 Commits

Author SHA1 Message Date
2aedf86736 basic web view 2023-06-11 06:30:31 +02:00
e609cb0f44 disable favicon 2023-06-11 05:35:10 +02:00
75c12677d9 add connected symbol 2023-06-11 05:29:59 +02:00
04deb1d89c compress html 2023-06-11 04:18:07 +02:00
ed3195afeb add user status to debug server /data 2023-06-11 02:58:16 +02:00
af5c090600 refactors 2023-06-11 02:40:39 +02:00
50fa67409c refactors 2023-06-11 02:39:01 +02:00
ccb0ce87e1 new debug server outline 2023-06-11 01:32:21 +02:00
b8afaba4ef don't error on dropped connections 2023-06-11 01:32:13 +02:00
4cca315f61 new debug server outline 2023-06-11 01:22:28 +02:00
1ae573dd76 start on new debug server 2023-06-11 00:12:02 +02:00
15672536f6 fix c string formatting 2023-06-10 20:18:52 +02:00
b5d2a63909 fix start without exisiting cache 2023-06-09 20:23:33 +02:00
904091c455 remove unnecessary .ok() 2023-06-09 15:43:53 +02:00
02f44cfab6 continue if accept fails 2023-06-09 15:43:01 +02:00
16 changed files with 851 additions and 382 deletions

View File

@@ -1,38 +0,0 @@
name: "Code Checks"
on:
pull_request:
jobs:
# Check formatting with rustfmt
formatting:
name: cargo fmt
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
# Ensure rustfmt is installed and setup problem matcher
- uses: actions-rust-lang/setup-rust-toolchain@v1
with:
components: rustfmt
- name: Rustfmt Check
uses: actions-rust-lang/rustfmt@v1
# Check code with clippy
clippy:
name: cargo clippy
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
# Ensure clippy is installed and setup problem matcher
- uses: actions-rust-lang/setup-rust-toolchain@v1
with:
components: clippy
- run: cargo clippy -- -D warnings
# Run tests
test:
name: cargo test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions-rust-lang/setup-rust-toolchain@v1
- run: cargo test

185
Cargo.lock generated
View File

@@ -17,6 +17,26 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "ahash"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f"
dependencies = [
"cfg-if",
"once_cell",
"version_check",
]
[[package]]
name = "aho-corasick"
version = "0.7.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "anyhow" name = "anyhow"
version = "1.0.70" version = "1.0.70"
@@ -117,7 +137,7 @@ dependencies = [
"cc", "cc",
"cfg-if", "cfg-if",
"libc", "libc",
"miniz_oxide", "miniz_oxide 0.6.2",
"object", "object",
"rustc-demangle", "rustc-demangle",
] ]
@@ -134,6 +154,12 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bumpalo"
version = "3.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1"
[[package]] [[package]]
name = "bytemuck" name = "bytemuck"
version = "1.13.1" version = "1.13.1"
@@ -177,16 +203,22 @@ name = "centralex"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"bytemuck", "bytemuck",
"bytes",
"color-eyre", "color-eyre",
"console-subscriber", "console-subscriber",
"css-minify",
"eyre", "eyre",
"flate2",
"futures", "futures",
"hyper", "hyper",
"minify-html",
"minify-js 0.5.6",
"once_cell", "once_cell",
"serde", "serde",
"serde_json", "serde_json",
"time", "time",
"tokio", "tokio",
"tokio-stream",
"tracing", "tracing",
"tracing-error", "tracing-error",
"tracing-subscriber", "tracing-subscriber",
@@ -262,6 +294,12 @@ dependencies = [
"tracing-subscriber", "tracing-subscriber",
] ]
[[package]]
name = "convert_case"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e"
[[package]] [[package]]
name = "crc32fast" name = "crc32fast"
version = "1.3.2" version = "1.3.2"
@@ -290,6 +328,30 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "css-minify"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "874c6e2d19f8d4a285083b11a3241bfbe01ac3ed85f26e1e6b34888d960552bd"
dependencies = [
"derive_more",
"indexmap",
"nom",
]
[[package]]
name = "derive_more"
version = "0.99.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321"
dependencies = [
"convert_case",
"proc-macro2",
"quote",
"rustc_version",
"syn 1.0.109",
]
[[package]] [[package]]
name = "either" name = "either"
version = "1.8.1" version = "1.8.1"
@@ -308,12 +370,12 @@ dependencies = [
[[package]] [[package]]
name = "flate2" name = "flate2"
version = "1.0.25" version = "1.0.26"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8a2db397cb1c8772f31494cb8917e48cd1e64f0fa7efac59fbd741a0a8ce841" checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743"
dependencies = [ dependencies = [
"crc32fast", "crc32fast",
"miniz_oxide", "miniz_oxide 0.7.1",
] ]
[[package]] [[package]]
@@ -429,6 +491,16 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hashbrown"
version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e"
dependencies = [
"ahash",
"bumpalo",
]
[[package]] [[package]]
name = "hdrhistogram" name = "hdrhistogram"
version = "7.5.2" version = "7.5.2"
@@ -540,7 +612,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399" checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"hashbrown", "hashbrown 0.12.3",
] ]
[[package]] [[package]]
@@ -606,6 +678,40 @@ version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "minify-html"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc4d9147754a49e80557df835eb59e743eab1bf75410a134f55dc4b9dbb692ad"
dependencies = [
"aho-corasick",
"css-minify",
"lazy_static",
"memchr",
"minify-js 0.4.3",
"rustc-hash",
]
[[package]]
name = "minify-js"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c300f90ba1138b5c5daf5d9441dc9bdc67b808aac22cf638362a2647bc213be4"
dependencies = [
"lazy_static",
"parse-js 0.10.3",
]
[[package]]
name = "minify-js"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22d6c512a82abddbbc13b70609cb2beff01be2c7afff534d6e5e1c85e438fc8b"
dependencies = [
"lazy_static",
"parse-js 0.17.0",
]
[[package]] [[package]]
name = "minimal-lexical" name = "minimal-lexical"
version = "0.2.1" version = "0.2.1"
@@ -621,6 +727,15 @@ dependencies = [
"adler", "adler",
] ]
[[package]]
name = "miniz_oxide"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7"
dependencies = [
"adler",
]
[[package]] [[package]]
name = "mio" name = "mio"
version = "0.8.6" version = "0.8.6"
@@ -708,6 +823,30 @@ version = "3.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f"
[[package]]
name = "parse-js"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30534759e6ad87aa144c396544747e1c25b1020bd133356fd758c8facec764e5"
dependencies = [
"aho-corasick",
"lazy_static",
"memchr",
]
[[package]]
name = "parse-js"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ec3b11d443640ec35165ee8f6f0559f1c6f41878d70330fe9187012b5935f02"
dependencies = [
"aho-corasick",
"bumpalo",
"hashbrown 0.13.2",
"lazy_static",
"memchr",
]
[[package]] [[package]]
name = "percent-encoding" name = "percent-encoding"
version = "2.2.0" version = "2.2.0"
@@ -862,6 +1001,21 @@ version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342"
[[package]]
name = "rustc-hash"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc_version"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
dependencies = [
"semver",
]
[[package]] [[package]]
name = "rustversion" name = "rustversion"
version = "1.0.12" version = "1.0.12"
@@ -874,6 +1028,12 @@ version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041"
[[package]]
name = "semver"
version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.157" version = "1.0.157"
@@ -1048,20 +1208,21 @@ dependencies = [
[[package]] [[package]]
name = "tokio-stream" name = "tokio-stream"
version = "0.1.12" version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313" checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"pin-project-lite", "pin-project-lite",
"tokio", "tokio",
"tokio-util",
] ]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.7" version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2" checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-core", "futures-core",
@@ -1236,6 +1397,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "version_check"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]] [[package]]
name = "want" name = "want"
version = "0.3.0" version = "0.3.0"

View File

@@ -6,9 +6,6 @@ edition = "2021"
[profile.release] [profile.release]
debug = true debug = true
[profile.dev.package.backtrace]
opt-level = 3
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -18,7 +15,7 @@ time = { version = "0.3.20", features = ["local-offset", "macros"] }
bytemuck = { version = "1.13.0", features = ["derive"] } bytemuck = { version = "1.13.0", features = ["derive"] }
serde = { version = "1.0.152", features = ["derive"] } serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.91" serde_json = "1.0.91"
hyper = { version = "0.14.24", optional = true, features = ["server", "http1", "tcp"] } hyper = { version = "0.14.24", optional = true, features = ["server", "http1", "tcp", "stream"] }
futures = { version = "0.3.27", default-features = false, features = ["std"] } futures = { version = "0.3.27", default-features = false, features = ["std"] }
tracing = "0.1.37" tracing = "0.1.37"
tracing-subscriber = { version = "0.3.16", features = ["time"] } tracing-subscriber = { version = "0.3.16", features = ["time"] }
@@ -28,8 +25,17 @@ eyre = "0.6.8"
color-eyre = "0.6.2" color-eyre = "0.6.2"
tracing-error = "0.2.0" tracing-error = "0.2.0"
zerocopy = "0.6.1" zerocopy = "0.6.1"
tokio-stream = { version = "0.1.14", features = ["sync"] }
flate2 = { version = "1.0.26", optional = true }
bytes = "1.4.0"
[build-dependencies]
minify-js = { version = "0.5.6", optional = true }
minify-html = { version = "0.11.1", optional = true }
css-minify = { version = "0.3.1", optional = true }
flate2 = { version = "1.0.26", optional = true }
[features] [features]
default = ["debug_server"] default = ["debug_server"]
debug_server = ["dep:hyper"] debug_server = ["dep:hyper", "minify-html", "dep:minify-js", "dep:css-minify", "dep:flate2"]
tokio_console = ["dep:console-subscriber"] tokio_console = ["dep:console-subscriber"]

58
build.rs Normal file
View File

@@ -0,0 +1,58 @@
fn main() {
#[cfg(feature = "debug_server")]
pack_debug_page().unwrap();
println!("cargo:rerun-if-changed=web/main.js");
println!("cargo:rerun-if-changed=web/index.html");
println!("cargo:rerun-if-changed=web/main.css");
println!("cargo:rerun-if-changed=web/connected.svg");
}
#[cfg(feature = "debug_server")]
fn pack_debug_page() -> Result<(), Box<dyn std::error::Error>> {
use flate2::{write::GzEncoder, Compression};
use std::io::Write;
use css_minify::optimizations::{Level, Minifier};
let js = std::fs::read_to_string("web/main.js").unwrap();
let html = std::fs::read_to_string("web/index.html").unwrap();
let css = std::fs::read_to_string("web/main.css").unwrap();
let svg = std::fs::read_to_string("web/connected.svg").unwrap();
let mut out = Vec::new();
minify_js::minify(
&minify_js::Session::new(),
minify_js::TopLevelMode::Global,
js.as_bytes(),
&mut out,
)
.unwrap();
let js = std::str::from_utf8(&out)?;
let css = Minifier::default().minify(&css, Level::Three).unwrap();
let (start, end) = html
.split_once("<!--INSERT SVG HERE-->")
.expect("did not find svg split point in html");
let html = format!("{start}{svg}{end}");
let (head, body) = html
.split_once("<!--INSERT HEAD CONTENT HERE-->")
.expect("did not find head split point in html");
let html = minify_html::minify(
format!("{head}<style>{css}</style><script>{js}</script>{body}").as_bytes(),
&minify_html::Cfg::spec_compliant(),
);
let mut encoder = GzEncoder::new(
std::fs::File::create(std::env::var("OUT_DIR").unwrap() + "/minified.html.gz")?,
Compression::best(),
);
encoder.write_all(&html)?;
Ok(())
}

View File

@@ -64,19 +64,13 @@ pub async fn dyn_ip_update(
})? })?
.into()), .into()),
PacketKind::Error => { PacketKind::Error => {
let first_zero = packet
.data
.iter()
.enumerate()
.find_map(|(i, x)| (*x == 0).then_some(i));
return Err(eyre!( return Err(eyre!(
"{}", "{}",
std::str::from_utf8(first_zero.map_or(&packet.data, |i| &packet.data[..i]),)? packet.as_string().unwrap_or("unknown dyn auth error")
)); ));
} }
_ => return Err(eyre!("server returned unexpected packet")), _ => return Err(eyre!("auth server returned unexpected packet")),
}; };
debug!(?result, "finished dyn ip update"); debug!(?result, "finished dyn ip update");

View File

@@ -15,7 +15,7 @@ use tracing::{info, instrument, trace};
use crate::{ use crate::{
auth::dyn_ip_update, auth::dyn_ip_update,
constants::{AUTH_TIMEOUT, CALL_ACK_TIMEOUT, CALL_TIMEOUT, PING_TIMEOUT, SEND_PING_INTERVAL}, constants::{AUTH_TIMEOUT, CALL_ACK_TIMEOUT, CALL_TIMEOUT, PING_TIMEOUT, SEND_PING_INTERVAL},
debug_server::peer_query, http::peer_query,
packets::{Header, Packet, PacketKind, RemConnect, REJECT_OOP, REJECT_TIMEOUT}, packets::{Header, Packet, PacketKind, RemConnect, REJECT_OOP, REJECT_TIMEOUT},
ports::{PortHandler, PortStatus}, ports::{PortHandler, PortStatus},
Config, HandlerMetadata, Config, HandlerMetadata,

View File

@@ -9,3 +9,5 @@ pub const PING_TIMEOUT: Duration = Duration::from_secs(30);
pub const SEND_PING_INTERVAL: Duration = Duration::from_secs(20); pub const SEND_PING_INTERVAL: Duration = Duration::from_secs(20);
pub const CACHE_STORE_INTERVAL: Duration = Duration::from_secs(5); pub const CACHE_STORE_INTERVAL: Duration = Duration::from_secs(5);
pub const DEBUG_SERVER_PING_INTERVAL: Duration = Duration::from_secs(5);

View File

@@ -1,119 +0,0 @@
use futures::Future;
use hyper::rt::Executor;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Response, Server};
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::error;
use zerocopy::{AsBytes, FromBytes, LittleEndian, Unaligned};
use tracing::{debug, instrument};
use crate::packets::{Header, Packet};
use crate::ports::PortHandler;
use crate::spawn;
#[derive(Clone)]
struct NamedExecutor;
impl<T: Send + 'static, Fut: Future<Output = T> + Send + 'static> Executor<Fut> for NamedExecutor {
fn execute(&self, fut: Fut) {
spawn("http worker", fut);
}
}
pub async fn debug_server(addr: SocketAddr, port_handler: Arc<Mutex<PortHandler>>) {
let server = Server::bind(&addr)
.executor(NamedExecutor)
.serve(make_service_fn(move |_conn| {
let port_handler = port_handler.clone();
async move {
Ok::<_, Infallible>(service_fn(move |_req| {
let port_handler = port_handler.clone();
async move {
Ok::<_, Infallible>(Response::new(Body::from(
port_handler.lock().await.status_string(),
)))
}
}))
}
}));
// Run this server for... forever!
if let Err(error) = server.await {
error!(%error, "debug server error");
}
}
type U16 = zerocopy::U16<LittleEndian>;
type U32 = zerocopy::U32<LittleEndian>;
#[derive(AsBytes)]
#[repr(transparent)]
#[allow(dead_code)]
struct PeerQuery {
number: U32,
}
#[derive(FromBytes, Unaligned, Debug)]
#[repr(packed)]
#[allow(dead_code)]
struct PeerReply {
number: U32,
name: [u8; 40],
flags: U16,
kind: u8,
hostname: [u8; 40],
ipaddress: [u8; 4],
port: U16,
extension: u8,
pin: U16,
timestamp: U32,
}
#[instrument]
pub async fn peer_query(server: &SocketAddr, number: u32) -> eyre::Result<Option<String>> {
debug!(%number, "looking up");
let mut packet = Packet {
header: Header {
kind: 3, // Peer Query
length: 4,
},
data: Vec::new(),
};
packet.data.clear();
packet.data.resize(packet.header.length as usize, 0);
PeerQuery {
number: number.into(),
}
.write_to(packet.data.as_mut_slice())
.unwrap();
let mut socket = tokio::net::TcpStream::connect(server).await?;
let (mut reader, mut writer) = socket.split();
packet.send(&mut writer).await?;
packet.recv_into(&mut reader).await?;
Ok(if packet.kind().raw() == 5 {
// PeerReply
PeerReply::read_from(packet.data.as_slice()).and_then(|x| {
let i = x
.name
.iter()
.enumerate()
.find_map(|(i, c)| (*c == 0).then_some(i))
.unwrap_or(x.name.len());
Some(std::str::from_utf8(&x.name[..i]).ok()?.to_owned())
})
} else {
None
})
}

249
src/http.rs Normal file
View File

@@ -0,0 +1,249 @@
use bytes::BytesMut;
use futures::Future;
use hyper::header::{ACCEPT_ENCODING, CACHE_CONTROL, CONTENT_ENCODING, CONTENT_TYPE};
use hyper::rt::Executor;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use std::convert::Infallible;
use std::io::Read;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::Mutex;
use tokio_stream::wrappers::{IntervalStream, WatchStream};
use tokio_stream::StreamExt;
use tracing::error;
use zerocopy::{AsBytes, FromBytes, LittleEndian, Unaligned};
use tracing::{debug, instrument};
use crate::constants::DEBUG_SERVER_PING_INTERVAL;
use crate::packets::{Header, Packet};
use crate::ports::PortHandler;
use crate::spawn;
#[derive(Clone)]
struct NamedExecutor;
impl<T: Send + 'static, Fut: Future<Output = T> + Send + 'static> Executor<Fut> for NamedExecutor {
fn execute(&self, fut: Fut) {
spawn("http worker", fut);
}
}
const COMPRESSED_HTML: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/minified.html.gz"));
async fn index(req: &Request<Body>) -> Result<Response<Body>, hyper::http::Error> {
let response = Response::builder();
let accepts_gzip = req
.headers()
.get(ACCEPT_ENCODING)
.map_or(false, |accept_encoding| {
accept_encoding
.as_bytes()
.split(|x| *x == b',')
.filter_map(|x| x.split(|x| *x == b';').next())
.filter_map(|x| std::str::from_utf8(x).ok())
.any(|x| x.trim() == "gzip")
});
if accepts_gzip {
response
.header(CONTENT_ENCODING, "gzip")
.body(Body::from(COMPRESSED_HTML))
} else {
let (mut sender, body) = Body::channel();
spawn("gunzip task", async move {
let mut decoder =
flate2::bufread::GzDecoder::new(std::io::Cursor::new(COMPRESSED_HTML));
let mut done = false;
while !done {
let mut chunk = BytesMut::zeroed(256);
let mut i = 0;
loop {
let dst = &mut chunk.as_bytes_mut()[i..];
if dst.is_empty() {
break; // we are done
}
match decoder.read(dst) {
Ok(n) => {
if n == 0 {
done = true;
break;
}
i += n;
}
Err(err) => unreachable!("failed to read from gzip decode: {err}"),
}
}
chunk.truncate(i);
if sender.send_data(chunk.freeze()).await.is_err() {
break;
}
}
});
response.body(body)
}
}
async fn data(
_req: &Request<Body>,
port_handler: Arc<Mutex<PortHandler>>,
) -> Result<Response<Body>, hyper::http::Error> {
let res = Response::builder().header(CACHE_CONTROL, "no-store");
match serde_json::to_string(&*port_handler.lock().await) {
Ok(data) => res
.header(CONTENT_TYPE, "application/json")
.body(Body::from(data)),
Err(err) => {
error!(%err, "failed to serialize data for debug server");
res.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::from(""))
}
}
}
fn events(
_req: &Request<Body>,
change_receiver: tokio::sync::watch::Receiver<std::time::Instant>,
) -> Result<Response<Body>, hyper::http::Error> {
Response::builder()
.status(StatusCode::OK)
.header(CACHE_CONTROL, "no-store")
.header(CONTENT_TYPE, "text/event-stream")
.body(Body::wrap_stream({
WatchStream::new(change_receiver)
.map(|x| ("change", x))
.merge(
IntervalStream::new(tokio::time::interval(DEBUG_SERVER_PING_INTERVAL))
.map(|x| ("ping", x.into_std())),
)
.filter_map(|(kind, time)| {
let timestamp = (SystemTime::now() + time.elapsed())
.duration_since(UNIX_EPOCH)
.ok()?
.as_secs();
Some(Ok::<_, Infallible>(format!(
"event:{kind}\ndata: {timestamp}\n\n"
)))
})
}))
}
pub async fn debug_server(
addr: SocketAddr,
port_handler: Arc<Mutex<PortHandler>>,
change_receiver: tokio::sync::watch::Receiver<std::time::Instant>,
) {
let server = Server::bind(&addr)
.executor(NamedExecutor)
.serve(make_service_fn(move |_conn| {
let port_handler = port_handler.clone();
let change_receiver = change_receiver.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let port_handler = port_handler.clone();
let change_receiver = change_receiver.clone();
async move {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") => index(&req).await,
(&Method::GET, "/data") => data(&req, port_handler).await,
(&Method::GET, "/events") => events(&req, change_receiver),
_ => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty()),
}
}
}))
}
}));
if let Err(error) = server.await {
error!(%error, "debug server error");
}
}
type U16 = zerocopy::U16<LittleEndian>;
type U32 = zerocopy::U32<LittleEndian>;
#[derive(AsBytes)]
#[repr(transparent)]
#[allow(dead_code)]
struct PeerQuery {
number: U32,
}
#[derive(FromBytes, Unaligned, Debug)]
#[repr(packed)]
#[allow(dead_code)]
struct PeerReply {
number: U32,
name: [u8; 40],
flags: U16,
kind: u8,
hostname: [u8; 40],
ipaddress: [u8; 4],
port: U16,
extension: u8,
pin: U16,
timestamp: U32,
}
#[instrument]
pub async fn peer_query(server: &SocketAddr, number: u32) -> eyre::Result<Option<String>> {
debug!(%number, "looking up");
let mut packet = Packet {
header: Header {
kind: 3, // Peer Query
length: 4,
},
data: Vec::new(),
};
packet.data.clear();
packet.data.resize(packet.header.length as usize, 0);
PeerQuery {
number: number.into(),
}
.write_to(packet.data.as_mut_slice())
.unwrap();
let mut socket = tokio::net::TcpStream::connect(server).await?;
let (mut reader, mut writer) = socket.split();
packet.send(&mut writer).await?;
packet.recv_into(&mut reader).await?;
Ok(if packet.kind().raw() == 5 {
// PeerReply
PeerReply::read_from(packet.data.as_slice()).and_then(|x| {
let i = x
.name
.iter()
.enumerate()
.find_map(|(i, c)| (*c == 0).then_some(i))
.unwrap_or(x.name.len());
Some(std::str::from_utf8(&x.name[..i]).ok()?.to_owned())
})
} else {
None
})
}

View File

@@ -1,5 +1,4 @@
#![warn(clippy::pedantic)] #![warn(clippy::pedantic)]
#![allow(clippy::let_underscore_untyped)] // false positive in stable
use std::{ use std::{
fmt::Debug, fmt::Debug,
@@ -11,8 +10,8 @@ use std::{
time::Duration, time::Duration,
}; };
use debug_server::debug_server;
use futures::Future; use futures::Future;
use http::debug_server;
use packets::{Header, Packet}; use packets::{Header, Packet};
use serde::{Deserialize, Deserializer}; use serde::{Deserialize, Deserializer};
use time::format_description::OwnedFormatItem; use time::format_description::OwnedFormatItem;
@@ -20,9 +19,9 @@ use tokio::{
io::AsyncWriteExt, io::AsyncWriteExt,
net::{TcpListener, TcpStream}, net::{TcpListener, TcpStream},
sync::Mutex, sync::Mutex,
time::{sleep, Instant}, time::sleep,
}; };
use tracing::{error, info, instrument, warn, Level}; use tracing::{debug, error, info, instrument, warn, Level};
use tracing_subscriber::fmt::time::FormatTime; use tracing_subscriber::fmt::time::FormatTime;
use crate::packets::PacketKind; use crate::packets::PacketKind;
@@ -32,7 +31,7 @@ pub mod auth;
pub mod client; pub mod client;
pub mod constants; pub mod constants;
#[cfg(feature = "debug_server")] #[cfg(feature = "debug_server")]
pub mod debug_server; pub mod http;
pub mod packets; pub mod packets;
pub mod ports; pub mod ports;
@@ -185,7 +184,7 @@ fn setup_tracing(config: &Config) {
Level::ERROR => write!(writer, " {:>5} ", level.red())?, Level::ERROR => write!(writer, " {:>5} ", level.red())?,
} }
write!(writer, "{:23}{}", meta.target().dimmed(), ":".bold())?; write!(writer, "{:18}{}", meta.target().dimmed(), ":".bold())?;
/* /*
if let Some(filename) = meta.file() { if let Some(filename) = meta.file() {
@@ -248,7 +247,14 @@ async fn connection_handler(
let error = match res { let error = match res {
Err(_) => Some("internal server error".to_owned()), Err(_) => Some("internal server error".to_owned()),
Ok(Err(err)) => Some(err.to_string()), Ok(Err(err)) => match err.downcast_ref::<std::io::Error>() {
Some(io_error) if io_error.kind() == std::io::ErrorKind::UnexpectedEof => {
// don't print an error on dropped connections
debug!(%addr, "Client dropped their connection");
None
}
_ => Some(err.to_string()),
},
Ok(Ok(())) => None, Ok(Ok(())) => None,
}; };
@@ -265,6 +271,7 @@ async fn connection_handler(
length: packet.data.len().try_into().unwrap(), // this will never fail, as we just truncated the vector length: packet.data.len().try_into().unwrap(), // this will never fail, as we just truncated the vector
}; };
// Attempt to notify the client of the failure
let (_, mut writer) = stream.split(); let (_, mut writer) = stream.split();
_ = packet.send(&mut writer).await; _ = packet.send(&mut writer).await;
} }
@@ -323,7 +330,7 @@ async fn tokio_main(config: Arc<Config>) -> eyre::Result<()> {
let cache_path = PathBuf::from("cache.json"); let cache_path = PathBuf::from("cache.json");
let (change_sender, change_receiver) = tokio::sync::watch::channel(Instant::now()); let (change_sender, change_receiver) = tokio::sync::watch::channel(std::time::Instant::now());
let mut port_handler = PortHandler::load_or_default(&cache_path, change_sender); let mut port_handler = PortHandler::load_or_default(&cache_path, change_sender);
port_handler.update_allowed_ports(&config.allowed_ports); port_handler.update_allowed_ports(&config.allowed_ports);
@@ -332,7 +339,7 @@ async fn tokio_main(config: Arc<Config>) -> eyre::Result<()> {
spawn( spawn(
"cache daemon", "cache daemon",
cache_daemon(port_handler.clone(), cache_path, change_receiver), cache_daemon(port_handler.clone(), cache_path, change_receiver.clone()),
); );
#[cfg(feature = "debug_server")] #[cfg(feature = "debug_server")]
@@ -340,7 +347,7 @@ async fn tokio_main(config: Arc<Config>) -> eyre::Result<()> {
warn!(%listen_addr, "debug server listening"); warn!(%listen_addr, "debug server listening");
spawn( spawn(
"debug server", "debug server",
debug_server(listen_addr, port_handler.clone()), debug_server(listen_addr, port_handler.clone(), change_receiver),
); );
} }
@@ -350,7 +357,11 @@ async fn tokio_main(config: Arc<Config>) -> eyre::Result<()> {
"centralex server listening" "centralex server listening"
); );
while let Ok((stream, addr)) = listener.accept().await { loop {
let connection = listener.accept().await;
match connection {
Ok((stream, addr)) => {
info!(%addr, "new connection"); info!(%addr, "new connection");
spawn( spawn(
@@ -358,8 +369,11 @@ async fn tokio_main(config: Arc<Config>) -> eyre::Result<()> {
connection_handler(stream, addr, config.clone(), port_handler.clone()), connection_handler(stream, addr, config.clone(), port_handler.clone()),
); );
} }
Err(err) => {
Ok(()) error!(%err, "failed to accept connection");
}
}
}
} }
#[derive(Debug, Default)] #[derive(Debug, Default)]

View File

@@ -2,6 +2,7 @@ use std::fmt::Debug;
use bytemuck::{Pod, Zeroable}; use bytemuck::{Pod, Zeroable};
use eyre::eyre; use eyre::eyre;
use serde::Serialize;
use tokio::{ use tokio::{
io::{AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt},
net::tcp::{ReadHalf, WriteHalf}, net::tcp::{ReadHalf, WriteHalf},
@@ -68,35 +69,56 @@ impl PacketKind {
} }
} }
#[derive(Default, Clone, Copy, Pod, Zeroable)] #[derive(Serialize, Default, Clone, Copy, Pod, Zeroable)]
#[repr(C)] #[repr(C)]
pub struct Header { pub struct Header {
pub kind: u8, pub kind: u8,
pub length: u8, pub length: u8,
} }
#[derive(Default, Clone)] #[derive(Serialize, Default, Clone)]
pub struct Packet { pub struct Packet {
pub header: Header, pub header: Header,
pub data: Vec<u8>, pub data: Vec<u8>,
} }
impl Debug for Packet { impl Packet {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { #[must_use]
let data = &self.data; pub fn data(&self) -> &[u8] {
&self.data[..self.header.length as usize]
}
let str_data = std::str::from_utf8(&data[..data.len().saturating_sub(1)]).ok(); #[must_use]
pub fn as_string(&self) -> Option<&str> {
let data = self.data();
let nul = data.iter().enumerate().find(|(_i, c)| **c == 0);
let data = if let Some(str_data) = str_data.as_ref() { let data = if let Some((i, _)) = nul {
str_data as &dyn Debug &data[..i]
} else { } else {
&data as &dyn Debug data
}; };
f.debug_struct("Packet") std::str::from_utf8(data).ok()
.field("kind", &PacketKind::from_u8(self.header.kind)) }
.field("data", &data) }
.finish()
impl Debug for Packet {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut debugger = f.debug_struct("Packet");
debugger.field("kind", &PacketKind::from_u8(self.header.kind));
match self.as_string() {
Some(string) if string.chars().all(|c| !c.is_control()) => {
debugger.field("data", &string);
}
_ => {
debugger.field("data", &self.data());
}
}
debugger.finish()
} }
} }
@@ -131,7 +153,7 @@ impl Packet {
pub async fn recv_into_cancelation_safe( pub async fn recv_into_cancelation_safe(
&mut self, &mut self,
stream: &mut ReadHalf<'_>, stream: &mut ReadHalf<'_>,
) -> std::io::Result<()> { ) -> eyre::Result<()> {
// Makes sure all data is available before reading // Makes sure all data is available before reading
let header_bytes = bytemuck::bytes_of_mut(&mut self.header); let header_bytes = bytemuck::bytes_of_mut(&mut self.header);
stream.peek(header_bytes).await?; stream.peek(header_bytes).await?;
@@ -143,7 +165,7 @@ impl Packet {
} }
#[allow(clippy::missing_errors_doc)] #[allow(clippy::missing_errors_doc)]
pub async fn recv_into(&mut self, stream: &mut ReadHalf<'_>) -> std::io::Result<()> { pub async fn recv_into(&mut self, stream: &mut ReadHalf<'_>) -> eyre::Result<()> {
let header_bytes = bytemuck::bytes_of_mut(&mut self.header); let header_bytes = bytemuck::bytes_of_mut(&mut self.header);
stream.read_exact(header_bytes).await?; stream.read_exact(header_bytes).await?;
@@ -152,13 +174,20 @@ impl Packet {
stream.read_exact(&mut self.data).await?; stream.read_exact(&mut self.data).await?;
if self.header.kind == PacketKind::Error.raw() {
return Err(eyre!(
"client reported error: {:?}",
self.as_string().unwrap_or("unknown dyn auth error")
));
}
Ok(()) Ok(())
} }
#[allow(clippy::missing_errors_doc)] #[allow(clippy::missing_errors_doc)]
pub async fn send(&self, stream: &mut WriteHalf<'_>) -> std::io::Result<()> { pub async fn send(&self, stream: &mut WriteHalf<'_>) -> std::io::Result<()> {
stream.write_all(bytemuck::bytes_of(&self.header)).await?; stream.write_all(bytemuck::bytes_of(&self.header)).await?;
stream.write_all(&self.data).await?; stream.write_all(self.data()).await?;
Ok(()) Ok(())
} }

View File

@@ -1,5 +1,4 @@
use std::{ use std::{
borrow::Cow,
collections::{BTreeSet, HashMap, HashSet}, collections::{BTreeSet, HashMap, HashSet},
fmt::{Debug, Display}, fmt::{Debug, Display},
fs::File, fs::File,
@@ -11,7 +10,7 @@ use std::{
}; };
use eyre::eyre; use eyre::eyre;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize, Serializer};
use tokio::{ use tokio::{
net::TcpListener, net::TcpListener,
sync::{watch::Receiver, Mutex}, sync::{watch::Receiver, Mutex},
@@ -23,24 +22,26 @@ use tracing::{debug, error, info, instrument, warn};
use crate::{ use crate::{
constants::{CACHE_STORE_INTERVAL, PORT_OWNERSHIP_TIMEOUT, PORT_RETRY_TIME}, constants::{CACHE_STORE_INTERVAL, PORT_OWNERSHIP_TIMEOUT, PORT_RETRY_TIME},
packets::Packet, packets::Packet,
spawn, Config, Number, Port, UnixTimestamp, TIME_FORMAT, TIME_ZONE_OFFSET, spawn, Config, Number, Port, UnixTimestamp,
}; };
#[derive(Default, Serialize, Deserialize)] #[derive(Default, Serialize, Deserialize)]
pub struct PortHandler { pub struct PortHandler {
#[serde(skip)] #[serde(skip_deserializing)]
pub last_update: Option<Instant>, #[serde(serialize_with = "serialize_last_update")]
pub last_update: Option<std::time::Instant>,
#[serde(skip)] #[serde(skip)]
pub change_sender: Option<tokio::sync::watch::Sender<Instant>>, pub change_sender: Option<tokio::sync::watch::Sender<std::time::Instant>>,
#[serde(skip)] #[serde(skip_deserializing)]
port_guards: HashMap<Port, Rejector>, rejectors: HashMap<Port, Rejector>,
allowed_ports: AllowedList, allowed_ports: AllowedList,
#[serde(skip)] #[serde(skip)]
free_ports: HashSet<Port>, free_ports: HashSet<Port>,
errored_ports: BTreeSet<(UnixTimestamp, Port)>, errored_ports: BTreeSet<(UnixTimestamp, Port)>,
allocated_ports: HashMap<Number, Port>, allocated_ports: HashMap<Number, Port>,
@@ -51,11 +52,28 @@ pub struct PortHandler {
pub names: HashMap<Number, String>, pub names: HashMap<Number, String>,
} }
#[allow(clippy::missing_errors_doc)]
pub fn serialize_last_update<S: Serializer>(
last_update: &Option<std::time::Instant>,
serializer: S,
) -> Result<S::Ok, S::Error> {
last_update
.and_then(|instant| {
Some(
(SystemTime::now() + instant.elapsed())
.duration_since(UNIX_EPOCH)
.ok()?
.as_secs(),
)
})
.serialize(serializer)
}
#[instrument(skip(port_handler, change_receiver))] #[instrument(skip(port_handler, change_receiver))]
pub async fn cache_daemon( pub async fn cache_daemon(
port_handler: Arc<Mutex<PortHandler>>, port_handler: Arc<Mutex<PortHandler>>,
cache_path: PathBuf, cache_path: PathBuf,
mut change_receiver: Receiver<Instant>, mut change_receiver: Receiver<std::time::Instant>,
) { ) {
let mut last_store = Instant::now() - 2 * CACHE_STORE_INTERVAL; let mut last_store = Instant::now() - 2 * CACHE_STORE_INTERVAL;
let mut change_timeout = None; let mut change_timeout = None;
@@ -92,148 +110,13 @@ impl<T: Display> Debug for DisplayAsDebug<T> {
} }
} }
fn duration_string(duration: Duration) -> String {
let seconds_elapsed = duration.as_secs();
let days = seconds_elapsed / (60 * 60 * 24);
let hours = seconds_elapsed / (60 * 60) % 24;
let minutes = (seconds_elapsed / 60) % 60;
let seconds = seconds_elapsed % 60;
match (days > 0, hours > 0, minutes > 0) {
(true, _, _) => format!("{days}d {hours}h {minutes}min {seconds}s"),
(false, true, _) => format!("{hours}h {minutes}min {seconds}s"),
(false, false, true) => format!("{minutes}min {seconds}s"),
_ => format!("{duration:.0?}"),
}
}
fn format_instant(instant: Instant) -> String {
let when = duration_string(instant.elapsed()) + " ago";
(|| -> eyre::Result<_> {
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)? - instant.elapsed();
let date = time::OffsetDateTime::from_unix_timestamp(
timestamp.as_secs().try_into().expect("timestamp overflow"),
)?
.to_offset(*TIME_ZONE_OFFSET.get().unwrap())
.format(TIME_FORMAT.get().unwrap())?;
Ok(format!("{date} ({when})"))
})()
.unwrap_or(when)
}
fn instant_from_timestamp(timestamp: UnixTimestamp) -> Instant {
Instant::now() - UNIX_EPOCH.elapsed().unwrap() + Duration::from_secs(timestamp)
}
#[cfg(feature = "debug_server")]
impl Debug for PortHandler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
const SHOW_N_FREE_PORTS: usize = 10;
let last_update = self
.last_update
.map(|last_update| Cow::from(format_instant(last_update)))
.unwrap_or(Cow::from("?"));
let mut free_ports = self.free_ports.iter().copied().collect::<Vec<u16>>();
free_ports.sort_unstable();
let mut free_ports = free_ports
.into_iter()
.take(SHOW_N_FREE_PORTS)
.map(|x| DisplayAsDebug(x.to_string()))
.collect::<Vec<_>>();
if let Some(n_not_shown) = self.free_ports.len().checked_sub(SHOW_N_FREE_PORTS) {
if n_not_shown > 0 {
free_ports.push(DisplayAsDebug(format!("[{n_not_shown} more]")));
}
}
let errored_ports = self
.errored_ports
.iter()
.rev()
.map(|&(since, port)| {
DisplayAsDebug(format!(
"{port:5}: {}",
format_instant(instant_from_timestamp(since))
))
})
.collect::<Vec<_>>();
let mut allocated_ports = self
.allocated_ports
.iter()
.map(|(&number, &port)| {
#[derive(Debug)]
#[allow(dead_code)]
struct State<'n> {
state: PortStatus,
name: &'n str,
number: u32,
port: u16,
last_change: DisplayAsDebug<String>,
}
let state = &self.port_state[&port];
State {
state: state.status,
number,
port,
name: self.names.get(&number).map_or("?", |x| x.as_str()),
last_change: DisplayAsDebug(format_instant(instant_from_timestamp(
state.last_change,
))),
}
})
.collect::<Vec<_>>();
allocated_ports.sort_unstable_by(|a, b| {
a.state.cmp(&b.state).then(
self.port_state[&a.port]
.last_change
.cmp(&self.port_state[&b.port].last_change)
.reverse(),
)
});
writeln!(f, "last update: {last_update}")?;
writeln!(f, "rejectors: {:#?}", self.port_guards)?;
writeln!(f, "allowed ports: {:?}", self.allowed_ports.0)?;
writeln!(f, "free ports: {free_ports:?}")?;
writeln!(f, "errored ports: {errored_ports:#?}")?;
writeln!(f, "allocated ports: {allocated_ports:#?}")?;
Ok(())
}
}
#[derive(Default, Serialize, Deserialize)] #[derive(Default, Serialize, Deserialize)]
pub struct PortState { pub struct PortState {
last_change: UnixTimestamp, last_change: UnixTimestamp,
#[serde(skip)] #[serde(skip_deserializing)]
status: PortStatus, status: PortStatus,
} }
impl Debug for PortState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PortState")
.field(
"last_change",
&DisplayAsDebug(format_instant(instant_from_timestamp(self.last_change))),
)
.field("status", &self.status)
.finish()
}
}
impl PortState { impl PortState {
pub fn new_state(&mut self, status: PortStatus) { pub fn new_state(&mut self, status: PortStatus) {
self.last_change = SystemTime::now() self.last_change = SystemTime::now()
@@ -246,6 +129,7 @@ impl PortState {
} }
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, PartialOrd, Ord)] #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, PartialOrd, Ord)]
#[serde(rename_all = "snake_case")]
pub enum PortStatus { pub enum PortStatus {
InCall, InCall,
Idle, Idle,
@@ -273,17 +157,12 @@ impl AllowedList {
} }
impl PortHandler { impl PortHandler {
#[must_use]
pub fn status_string(&self) -> String {
format!("{self:#?}\n")
}
pub fn register_update(&mut self) { pub fn register_update(&mut self) {
let now = Instant::now(); let now = std::time::Instant::now();
self.last_update = Some(now); self.last_update = Some(now);
self.change_sender self.change_sender
.as_ref() .as_ref()
.expect("PortHandler is missing it's change_sender") .expect("PortHandler is missing its change_sender")
.send(now) .send(now)
.expect("failed to notify cache writer"); .expect("failed to notify cache writer");
} }
@@ -292,36 +171,51 @@ impl PortHandler {
#[instrument(skip(self))] #[instrument(skip(self))]
pub fn store(&self, cache: &Path) -> std::io::Result<()> { pub fn store(&self, cache: &Path) -> std::io::Result<()> {
debug!("storing cache"); debug!("storing cache");
let temp_file = cache.with_extension(".temp"); let temp_file = cache.with_extension("temp");
serde_json::to_writer(BufWriter::new(File::create(&temp_file)?), self)?; let mut value = serde_json::to_value(self)?;
let value_object = value.as_object_mut().unwrap();
value_object.remove("rejectors").unwrap();
value_object.remove("last_update").unwrap();
value_object
.get_mut("port_state")
.unwrap()
.as_object_mut()
.unwrap()
.values_mut()
.for_each(|value| {
value.as_object_mut().unwrap().remove("status").unwrap();
});
serde_json::to_writer(BufWriter::new(File::create(&temp_file)?), &value)?;
std::fs::rename(temp_file, cache)?; std::fs::rename(temp_file, cache)?;
Ok(()) Ok(())
} }
#[allow(clippy::missing_errors_doc)] #[allow(clippy::missing_errors_doc)]
#[instrument(skip(change_sender))] pub fn load(cache: &Path) -> std::io::Result<Self> {
pub fn load(
cache: &Path,
change_sender: tokio::sync::watch::Sender<Instant>,
) -> std::io::Result<Self> {
info!("loading cache"); info!("loading cache");
let mut cache: Self = serde_json::from_reader(BufReader::new(File::open(cache)?))?; Ok(serde_json::from_reader(BufReader::new(File::open(cache)?))?)
cache.change_sender = Some(change_sender);
Ok(cache)
} }
#[must_use] #[must_use]
#[instrument(skip(change_sender))] #[instrument(skip(change_sender))]
pub fn load_or_default( pub fn load_or_default(
path: &Path, path: &Path,
change_sender: tokio::sync::watch::Sender<Instant>, change_sender: tokio::sync::watch::Sender<std::time::Instant>,
) -> Self { ) -> Self {
Self::load(path, change_sender).unwrap_or_else(|error| { let mut this = Self::load(path).unwrap_or_else(|error| {
error!(?path, %error, "failed to parse cache file"); error!(?path, %error, "failed to parse cache file");
Self::default() Self::default()
}) });
this.change_sender = Some(change_sender);
this
} }
pub fn update_allowed_ports(&mut self, allowed_ports: &AllowedList) { pub fn update_allowed_ports(&mut self, allowed_ports: &AllowedList) {
@@ -365,7 +259,7 @@ impl PortHandler {
let port_guard = Rejector::start(listener, packet); let port_guard = Rejector::start(listener, packet);
if self.port_guards.insert(port, port_guard).is_some() { if self.rejectors.insert(port, port_guard).is_some() {
unreachable!("Tried to start rejector that is already running. This should have been impossible since it requires two listeners on the same port."); unreachable!("Tried to start rejector that is already running. This should have been impossible since it requires two listeners on the same port.");
} }
} }
@@ -374,7 +268,7 @@ impl PortHandler {
pub async fn stop_rejector(&mut self, port: Port) -> Option<(TcpListener, Packet)> { pub async fn stop_rejector(&mut self, port: Port) -> Option<(TcpListener, Packet)> {
info!(port, "stopping rejector"); info!(port, "stopping rejector");
Some(self.port_guards.remove(&port)?.stop().await) Some(self.rejectors.remove(&port)?.stop().await)
} }
/// # Errors /// # Errors
@@ -402,6 +296,20 @@ struct Rejector {
handle: JoinHandle<()>, handle: JoinHandle<()>,
} }
impl Serialize for Rejector {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let packet = &self.state.1;
match packet.as_string() {
Some(string) if string.chars().all(|c| !c.is_control()) => string.serialize(serializer),
_ => packet.data().serialize(serializer),
}
}
}
impl Debug for Rejector { impl Debug for Rejector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Rejector") f.debug_struct("Rejector")
@@ -541,7 +449,7 @@ impl PortHandler {
return Some(port); return Some(port);
} }
None // TODO None // TODO: are there more ways?
} }
#[instrument(skip(self))] #[instrument(skip(self))]

1
web/connected.svg Normal file
View File

@@ -0,0 +1 @@
<svg><g transform="translate(-38.3 -51.4)"><path d="m98.8 55.1-6.52 6.52-3.78-3.78 6.52-6.52zm-7.49 20.5-8.5 8.5-16.8-16.8 8.5-8.5zm-16.9 0.0741a11.9 11.9 45 0 1 0.0256-16.8 11.9 11.9 45 0 1 16.8-0.0256 11.9 11.9 45 0 1-0.0256 16.8 11.9 11.9 45 0 1-16.8 0.0256zm-36.2 32.6 6.52-6.52 3.78 3.78-6.52 6.52zm19.7-25.3 8.39-8.39 1.88 1.88-8.39 8.39zm7.46 7.5 8.39-8.39 1.88 1.88-8.39 8.39zm-19.7-2.72 8.5-8.5 16.8 16.8-8.5 8.5zm16.9-0.0741a11.9 11.9 45 0 1-0.0256 16.8 11.9 11.9 45 0 1-16.8 0.0256 11.9 11.9 45 0 1 0.0256-16.8 11.9 11.9 45 0 1 16.8-0.0256z"/></g></svg>

After

Width:  |  Height:  |  Size: 564 B

31
web/index.html Normal file
View File

@@ -0,0 +1,31 @@
<!DOCTYPE html>
<html>
<head>
<link rel="icon" href="data:,">
<meta charset='utf-8'>
<meta name='viewport' content='width=device-width, initial-scale=1'>
<title>Centralex State</title>
<!--INSERT HEAD CONTENT HERE-->
</head>
<body>
<p id="free_ports" />
<div id="connected_box">
<div id="connected" class="hidden"><!--INSERT SVG HERE--></div>
<p id="last_update" />
</div>
<table id="table">
<tr>
<th>Nummer</th>
<th>Port</th>
<th>Zustand</th>
<th>Name</th>
<th>Meldung</th>
<th>Letzte Änderung</th>
</tr>
</table>
</body>
</html>

40
web/main.css Normal file
View File

@@ -0,0 +1,40 @@
body {
background-color: #eee;
}
#connected_box {
position: absolute;
top: 5%;
right: 5%;
width: 5%;
height: 5%;
}
td,
th {
border: 1px solid black;
padding: 0.5em;
}
table {
border-spacing: 0;
}
.number {
text-align: right;
}
.text {
text-align: left;
}
.visible {
opacity: 1;
transition: opacity 500ms linear;
}
.hidden {
opacity: 0.2;
transition: opacity 6000ms linear;
}

127
web/main.js Normal file
View File

@@ -0,0 +1,127 @@
window.onload = () => {
const table_elem = document.getElementById("table");
const last_update = document.getElementById("last_update");
const connected = document.getElementById("connected");
const free_ports = document.getElementById("free_ports");
const timeout_duration = 10*1000;
const retry_timeout = 5*1000;
let reconnect_timeout;
let ping_timeout;
let evtSource;
let table = [];
let format_date = date => date.toLocaleDateString() + ' ' + date.toLocaleTimeString();
let print_table = () => {
while(table_elem.children.length > 1) {
table_elem.removeChild(table_elem.lastChild);
}
for (let row of table) {
let tr = document.createElement("tr");
let values = [
row.number,
row.port,
row.status,
row.name || "?",
row.rejector || "",
format_date(row.last_change)
];
for(let value of values) {
let td = document.createElement("td");
td.innerText = value;
td.className = Number.isInteger(value) ? "number" : "text";
tr.appendChild(td);
}
table_elem.appendChild(tr)
}
};
let update_table = data => {
console.log(data);
const allowed_ports = data.allowed_ports.map(x => x.end - x.start + 1).reduce((a,b) => a + b, 0);
free_ports.innerText = `Freie Ports: ${allowed_ports - Object.keys(data.allocated_ports).length - data.errored_ports.length}`;
table = [];
for(let number in data.allocated_ports) {
let port = data.allocated_ports[number];
number = +number;
let {status, last_change} = data.port_state[port];
let rejector = data.rejectors[port] || null;
if (rejector && rejector instanceof Array) {
rejector = rejector.map(x => "0x"+x.toString(16).padStart(2, 0)).join(" ")
}
last_change = new Date(last_change * 1000);
let name = data.names[number] || null;
switch(status) {
case "disconnected":
status = "getrennt";
break;
case "idle":
status = "bereit";
break;
case "in_call":
status = "anruf";
break;
}
table.push({port, number, status, last_change, rejector, name})
}
console.log(table);
print_table();
};
let connect_event_source = () => {
clearTimeout(reconnect_timeout);
clearTimeout(ping_timeout);
ping_timeout = setTimeout(connect_event_source, timeout_duration);
evtSource && evtSource.close && evtSource.close();
evtSource = new EventSource("/events");
evtSource.addEventListener("change", event => {
last_update.innerText = `Letzte Änderung: ${format_date(new Date(+event.data * 1000))}`;
fetch("/data")
.then(res => res.json())
.then(update_table);
});
evtSource.addEventListener("ping", event => {
clearTimeout(ping_timeout);
ping_timeout = setTimeout(connect_event_source, timeout_duration);
last_update.innerText = `Letzte Änderung: ${format_date(new Date(+event.data * 1000))}`;
connected.className = "visible";
setTimeout(() => connected.className = "hidden", 1000);
});
evtSource.onerror = () => {
clearTimeout(reconnect_timeout);
reconnect_timeout = setTimeout(connect_event_source, retry_timeout);
};
}
connect_event_source();
};