Compare commits

..

43 Commits

Author SHA1 Message Date
b887d2475f Update '.gitea/workflows/checks.yaml'
All checks were successful
Code Checks / cargo clippy (pull_request) Successful in 3m10s
Code Checks / cargo test (pull_request) Successful in 3m10s
Code Checks / cargo fmt (pull_request) Successful in 3m4s
2023-05-07 02:32:36 +02:00
e98c996874 don't test all-features
All checks were successful
Code Checks / cargo fmt (pull_request) Successful in 3m4s
Code Checks / cargo clippy (pull_request) Successful in 3m10s
Code Checks / cargo test (pull_request) Successful in 3m10s
2023-05-07 02:21:09 +02:00
c3444b927a fix CI
Some checks failed
Code Checks / cargo fmt (pull_request) Successful in 3m4s
Code Checks / cargo clippy (pull_request) Successful in 3m10s
Code Checks / cargo test (pull_request) Failing after 3m17s
2023-05-07 02:13:31 +02:00
fbd9d268bb Update '.gitea/workflows/checks.yaml'
Some checks failed
Code Checks / cargo fmt (pull_request) Successful in 3m5s
Code Checks / cargo clippy (pull_request) Failing after 3m10s
Code Checks / cargo test (pull_request) Failing after 3m17s
2023-05-07 02:01:36 +02:00
3a93a3bc90 Update '.gitea/workflows/checks.yaml'
Some checks failed
Code Checks / cargo fmt (pull_request) Successful in 3m6s
Code Checks / cargo test (pull_request) Failing after 1s
Code Checks / cargo clippy (pull_request) Has started running
2023-05-07 01:53:43 +02:00
885115289e unify actions
Some checks failed
Code Checks / cargo fmt (pull_request) Failing after 13m16s
Code Checks / cargo clippy (pull_request) Has started running
Code Checks / cargo test (pull_request) Has started running
2023-05-07 01:28:13 +02:00
8de50fbe7e remove github prefix again
Some checks failed
Code Checks / cargo fmt (pull_request) Failing after 14m11s
Test Suite / cargo test (pull_request) Failing after 12m23s
Code Checks / cargo clippy (pull_request) Failing after 14m53s
2023-05-07 01:27:08 +02:00
97230c7b1f add github prefix again
Some checks failed
Test Suite / cargo test (pull_request) Failing after 2s
Code Checks / cargo fmt (pull_request) Failing after 2s
Code Checks / cargo clippy (pull_request) Failing after 16s
2023-05-07 01:24:21 +02:00
9b6658dfe2 add version
Some checks failed
Test Suite / cargo test (push) Failing after 2s
Code Checks / cargo fmt (pull_request) Failing after 2s
Code Checks / cargo clippy (pull_request) Failing after 3s
Test Suite / cargo test (pull_request) Failing after 2s
2023-05-07 01:18:56 +02:00
1dba1ab298 remove github prefix?
Some checks failed
Test Suite / cargo test (push) Failing after 14s
Code Checks / cargo fmt (pull_request) Failing after 11s
Code Checks / cargo clippy (pull_request) Failing after 2s
Test Suite / cargo test (pull_request) Failing after 2s
2023-05-07 01:17:59 +02:00
64f10d978d remove github prefix? 2023-05-07 01:17:49 +02:00
5c2049af94 fix actions?
Some checks failed
Code Checks / cargo fmt (push) Failing after 3s
Code Checks / cargo clippy (push) Failing after 2s
Test Suite / cargo test (push) Failing after 2s
Code Checks / cargo fmt (pull_request) Failing after 2s
Code Checks / cargo clippy (pull_request) Failing after 2s
Test Suite / cargo test (pull_request) Failing after 2s
2023-05-07 01:15:55 +02:00
3cae37852a fully specify actions-rust-lang url
Some checks failed
Code Checks / cargo fmt (push) Failing after 27s
Code Checks / cargo clippy (push) Failing after 4s
Test Suite / cargo test (push) Failing after 24s
Test Suite / cargo test (pull_request) Failing after 24s
Code Checks / cargo clippy (pull_request) Failing after 4s
Code Checks / cargo fmt (pull_request) Failing after 25s
2023-05-07 01:07:57 +02:00
eb4c6854f0 add actions
Some checks failed
Code Checks / cargo fmt (push) Failing after 3s
Code Checks / cargo clippy (push) Failing after 3s
Test Suite / cargo test (push) Failing after 2s
Code Checks / cargo fmt (pull_request) Failing after 3s
Code Checks / cargo clippy (pull_request) Failing after 2s
Test Suite / cargo test (pull_request) Failing after 3s
2023-05-07 01:05:34 +02:00
0eed949a27 Delete '.gitea/workflows/build.yaml' 2023-05-07 00:53:56 +02:00
d58c063414 revert 854f694d22
All checks were successful
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 6s
revert Delete '.gitea/workflows/build.yaml'
2023-05-07 00:53:49 +02:00
207a304231 revert 854f694d22
All checks were successful
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 8s
revert Delete '.gitea/workflows/build.yaml'
2023-05-07 00:53:01 +02:00
854f694d22 Delete '.gitea/workflows/build.yaml' 2023-05-07 00:36:55 +02:00
47ac15422c add actions
All checks were successful
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 50s
2023-05-07 00:34:44 +02:00
4627800120 increase log target padding 2023-04-22 18:49:14 +02:00
fb1a2aa1c0 print duration in days if >24h 2023-04-22 18:45:25 +02:00
c48d369854 fix installer 2023-03-28 17:27:54 +02:00
06567d957c disable tokio-console by default 2023-03-28 15:02:24 +02:00
8f978c06f9 look up name in authentication routine 2023-03-28 14:32:48 +02:00
0aab8b16c7 register name changes as updates 2023-03-28 14:21:47 +02:00
1e7b10bc6d change debug display order 2023-03-28 14:16:24 +02:00
a2a2d89912 serialize client names 2023-03-28 14:12:43 +02:00
946bb37096 change debug display order 2023-03-28 14:07:25 +02:00
f22cafa96e show client names in debug server 2023-03-28 14:05:43 +02:00
82838e46dd log client name 2023-03-28 13:53:34 +02:00
c01e18f5f2 log client name 2023-03-28 13:52:58 +02:00
1340e87c15 switch to zerocopy crate. Initial server query debug implementation 2023-03-28 13:46:38 +02:00
eefb943292 update installer 2023-03-28 12:53:20 +02:00
894080500b log connections 2023-03-20 15:55:07 +01:00
eb2f66a7b6 start tracing later 2023-03-20 15:50:13 +01:00
6f1e9836c0 allow ping before RemAck 2023-03-20 15:46:34 +01:00
a5c73993d3 fix log filter 2023-03-19 22:54:26 +01:00
9d8124bd5c change error library 2023-03-19 22:05:42 +01:00
11d37c5b73 minor resructure 2023-03-19 19:30:25 +01:00
e4500cab78 update dependencies 2023-03-19 17:56:33 +01:00
ac72742c2a document possible errors 2023-03-19 17:38:31 +01:00
d917afe58c document panics better 2023-03-19 17:21:00 +01:00
96033a0796 turn on most of clippy::pedantic 2023-03-19 17:17:31 +01:00
11 changed files with 888 additions and 463 deletions

View File

@@ -1,3 +1,2 @@
[build]
rustflags = [ "--cfg", "tokio_unstable" ]
# rustflags = [ "--cfg", "tokio_unstable" ]

View File

@@ -0,0 +1,38 @@
name: "Code Checks"
on:
pull_request:
jobs:
# Check formatting with rustfmt
formatting:
name: cargo fmt
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
# Ensure rustfmt is installed and setup problem matcher
- uses: actions-rust-lang/setup-rust-toolchain@v1
with:
components: rustfmt
- name: Rustfmt Check
uses: actions-rust-lang/rustfmt@v1
# Check code with clippy
clippy:
name: cargo clippy
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
# Ensure clippy is installed and setup problem matcher
- uses: actions-rust-lang/setup-rust-toolchain@v1
with:
components: clippy
- run: cargo clippy -- -D warnings
# Run tests
test:
name: cargo test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions-rust-lang/setup-rust-toolchain@v1
- run: cargo test

108
Cargo.lock generated
View File

@@ -19,12 +19,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "anyhow"
version = "1.0.69"
version = "1.0.70"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "224afbd727c3d6e4b90103ece64b8d1b67fbb1973b1046c2281eed3f3803f800"
dependencies = [
"backtrace",
]
checksum = "7de8ce5e0f9f8d88245311066a578d72b7af3e7088f32783804676302df237e4"
[[package]]
name = "async-stream"
@@ -56,7 +53,7 @@ checksum = "86ea188f25f0255d8f92797797c97ebf5631fa88178beb1a46fdf5622c9a00e4"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.0",
"syn 2.0.2",
]
[[package]]
@@ -179,9 +176,10 @@ checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f"
name = "centralex"
version = "0.1.0"
dependencies = [
"anyhow",
"bytemuck",
"color-eyre",
"console-subscriber",
"eyre",
"futures",
"hyper",
"once_cell",
@@ -190,7 +188,9 @@ dependencies = [
"time",
"tokio",
"tracing",
"tracing-error",
"tracing-subscriber",
"zerocopy",
]
[[package]]
@@ -199,6 +199,33 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "color-eyre"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a667583cca8c4f8436db8de46ea8233c42a7d9ae424a82d338f2e4675229204"
dependencies = [
"backtrace",
"color-spantrace",
"eyre",
"indenter",
"once_cell",
"owo-colors",
"tracing-error",
]
[[package]]
name = "color-spantrace"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ba75b3d9449ecdccb27ecbc479fdc0b87fa2dd43d2f8298f9bf0e59aacc8dce"
dependencies = [
"once_cell",
"owo-colors",
"tracing-core",
"tracing-error",
]
[[package]]
name = "console-api"
version = "0.4.0"
@@ -269,6 +296,16 @@ version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91"
[[package]]
name = "eyre"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c2b6b5a29c02cdc822728b7d7b8ae1bab3e3b05d44522770ddd49722eeac7eb"
dependencies = [
"indenter",
"once_cell",
]
[[package]]
name = "flate2"
version = "1.0.25"
@@ -490,6 +527,12 @@ dependencies = [
"tokio-io-timeout",
]
[[package]]
name = "indenter"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683"
[[package]]
name = "indexmap"
version = "1.9.2"
@@ -659,6 +702,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "owo-colors"
version = "3.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f"
[[package]]
name = "percent-encoding"
version = "2.2.0"
@@ -827,22 +876,22 @@ checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041"
[[package]]
name = "serde"
version = "1.0.156"
version = "1.0.157"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "314b5b092c0ade17c00142951e50ced110ec27cea304b1037c6969246c2469a4"
checksum = "707de5fcf5df2b5788fca98dd7eab490bc2fd9b7ef1404defc462833b83f25ca"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.156"
version = "1.0.157"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7e29c4601e36bcec74a223228dce795f4cd3616341a4af93520ca1a837c087d"
checksum = "78997f4555c22a7971214540c4a661291970619afd56de19f77e0de86296e1e5"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
"syn 2.0.2",
]
[[package]]
@@ -903,9 +952,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.0"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4cff13bb1732bccfe3b246f3fdb09edfd51c01d6f5299b7ccd9457c2e4e37774"
checksum = "59d3276aee1fa0c33612917969b5172b5be2db051232a6e4826f1a1a9191b045"
dependencies = [
"proc-macro2",
"quote",
@@ -1119,6 +1168,16 @@ dependencies = [
"valuable",
]
[[package]]
name = "tracing-error"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d686ec1c0f384b1277f097b2f279a2ecc11afe8c133c1aabf036a27cb4cd206e"
dependencies = [
"tracing",
"tracing-subscriber",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
@@ -1280,3 +1339,24 @@ name = "windows_x86_64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
[[package]]
name = "zerocopy"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "332f188cc1bcf1fe1064b8c58d150f497e697f49774aa846f2dc949d9a25f236"
dependencies = [
"byteorder",
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6505e6815af7de1746a08f69c69606bb45695a17149517680f3b2149713b19a3"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]

View File

@@ -3,23 +3,33 @@ name = "centralex"
version = "0.1.0"
edition = "2021"
[profile.release]
debug = true
[profile.dev.package.backtrace]
opt-level = 3
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1.24.2", features = ["macros", "rt-multi-thread", "net", "io-util", "sync", "time"] }
anyhow = { version = "1.0.68", features = ["backtrace"] }
time = { version = "0.3.20", features = ["local-offset", "macros"] }
bytemuck = { version = "1.13.0", features = ["derive"] }
serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.91"
hyper = { version = "0.14.24", optional = true, features = ["server", "http1", "tcp"] }
futures = { version = "0.3.27", default-features = false, features = ["std"] }
console-subscriber = { version = "0.1.8", optional = true }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.16", features = ["time"] }
time = { version = "0.3.20", features = ["local-offset", "macros"] }
console-subscriber = { version = "0.1.8", optional = true }
once_cell = "1.17.1"
eyre = "0.6.8"
color-eyre = "0.6.2"
tracing-error = "0.2.0"
zerocopy = "0.6.1"
[features]
default = ["debug_server", "tokio_console"]
default = ["debug_server"]
debug_server = ["dep:hyper"]
tokio_console = ["dep:console-subscriber"]

View File

@@ -24,8 +24,8 @@ then
exit 1
fi
user_name="centralex"
install_dir="/var/lib/centralex"
user_name="itelex"
install_dir="/home/itelex/"
service_file="/etc/systemd/system/centralex.service"
if [ $# -lt 1 ]; then
@@ -52,7 +52,7 @@ case "$step" in
rm rustup.sh
echo "cloning source code..."
git clone https://github.com/soruh/centralex centralex
git clone https://gitea.h.glsys.de/soruh/centralex centralex
echo "creating default config..."
cp centralex/config-template.json centralex/config.json
@@ -63,8 +63,8 @@ case "$step" in
exit 1
fi
echo "creating user $user_name..."
useradd -s /usr/sbin/nologin --create-home --home-dir "$install_dir" "$user_name"
# echo "creating user $user_name..."
# useradd -s /usr/sbin/nologin --create-home --home-dir "$install_dir" "$user_name"
echo "creating service file..."
cat > "$service_file" << EOF
@@ -72,10 +72,10 @@ case "$step" in
Description=Centralex
[Service]
Enviroment=RUST_BACKTRACE=1
Environment=RUST_BACKTRACE=1
ExecStart=$install_dir/.cargo/bin/cargo run --release
Type=simple
User=centralex
User=$user_name
WorkingDirectory=$install_dir/centralex
[Install]

View File

@@ -1,16 +1,32 @@
use std::net::SocketAddr;
use anyhow::bail;
use tracing::debug;
use eyre::eyre;
use tracing::{debug, instrument};
use zerocopy::{AsBytes, LittleEndian};
use crate::packets::{Header, Packet, PacketKind};
type U16 = zerocopy::U16<LittleEndian>;
type U32 = zerocopy::U32<LittleEndian>;
#[derive(AsBytes)]
#[repr(C)]
struct DynIpUpdate {
number: U32,
pin: U16,
port: U16,
}
/// # Errors
/// - the dyn ip server returns a malformed response or is unreachable
/// - the authentication fails
#[instrument]
pub async fn dyn_ip_update(
server: &SocketAddr,
number: u32,
pin: u16,
port: u16,
) -> anyhow::Result<std::net::Ipv4Addr> {
) -> eyre::Result<std::net::Ipv4Addr> {
debug!(%number, %port, "starting dyn ip update");
let mut packet = Packet {
@@ -21,24 +37,27 @@ pub async fn dyn_ip_update(
data: Vec::new(),
};
packet.data.clear();
packet.data.reserve(packet.header.length as usize);
packet.data.extend_from_slice(&number.to_le_bytes());
packet.data.extend_from_slice(&pin.to_le_bytes());
packet.data.extend_from_slice(&port.to_le_bytes());
packet.data.resize(packet.header.length as usize, 0);
DynIpUpdate {
number: number.into(),
pin: pin.into(),
port: port.into(),
}
.write_to(packet.data.as_mut_slice())
.unwrap();
let mut socket = tokio::net::TcpStream::connect(server).await?;
let (mut reader, mut writer) = socket.split();
packet.send(&mut writer).await?;
packet.recv_into(&mut reader).await?;
let result = match packet.kind() {
PacketKind::DynIpUpdateResponse => Ok(<[u8; 4]>::try_from(packet.data)
.map_err(|err| {
anyhow::anyhow!(
eyre!(
"too little data for ip address. Need 4 bytes got {}",
err.len()
)
@@ -51,17 +70,13 @@ pub async fn dyn_ip_update(
.enumerate()
.find_map(|(i, x)| (*x == 0).then_some(i));
bail!(
return Err(eyre!(
"{}",
std::str::from_utf8(
first_zero
.map(|i| &packet.data[..i])
.unwrap_or(&packet.data),
)?
)
std::str::from_utf8(first_zero.map_or(&packet.data, |i| &packet.data[..i]),)?
));
}
_ => bail!("server returned unexpected packet"),
_ => return Err(eyre!("server returned unexpected packet")),
};
debug!(?result, "finished dyn ip update");

View File

@@ -1,48 +1,38 @@
use anyhow::{bail, Context};
use eyre::eyre;
use std::{net::SocketAddr, time::Instant};
use tokio::{
io::AsyncWriteExt,
net::{TcpListener, TcpStream},
net::{
tcp::{ReadHalf, WriteHalf},
TcpListener, TcpStream,
},
select,
sync::Mutex,
time::{sleep, timeout},
};
use tracing::{info, trace};
use tracing::{info, instrument, trace};
use crate::{
auth::dyn_ip_update,
constants::{AUTH_TIMEOUT, CALL_ACK_TIMEOUT, CALL_TIMEOUT, PING_TIMEOUT, SEND_PING_INTERVAL},
debug_server::peer_query,
packets::{Header, Packet, PacketKind, RemConnect, REJECT_OOP, REJECT_TIMEOUT},
ports::{PortHandler, PortStatus},
Config, HandlerMetadata,
};
pub async fn connection_handler(
/// # Errors
/// - the client authentication fails
#[instrument(skip(config, port_handler, handler_metadata))]
async fn authenticate(
config: &Config,
handler_metadata: &mut HandlerMetadata,
port_handler: &Mutex<PortHandler>,
stream: &mut TcpStream,
) -> anyhow::Result<()> {
let addr = stream.peer_addr()?;
let (mut reader, mut writer) = stream.split();
let mut packet = Packet::default();
match timeout(AUTH_TIMEOUT, packet.recv_into_cancelation_safe(&mut reader)).await {
Ok(res) => res?,
Err(_) => {
writer.write_all(REJECT_TIMEOUT).await?;
return Ok(());
}
}
let RemConnect { number, pin } = packet.as_rem_connect()?;
handler_metadata.number = Some(number);
handler_metadata: &mut HandlerMetadata,
number: u32,
pin: u16,
) -> eyre::Result<Option<u16>> {
let mut authenticated = false;
let port = loop {
loop {
let mut updated_server = false;
let port = port_handler
@@ -51,87 +41,86 @@ pub async fn connection_handler(
.allocate_port_for_number(config, number);
let Some(port) = port else {
writer.write_all(REJECT_OOP).await?;
return Ok(());
return Ok(None);
};
// make sure the client is authenticated before opening any ports
if !authenticated {
let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port)
.await
.context("dy-ip update")?;
let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port).await?;
authenticated = true;
updated_server = true;
}
let mut port_handler = port_handler.lock().await;
let listener = if let Some((listener, _packet)) = port_handler.stop_rejector(port).await {
let listener = if let Some((listener, _packet)) =
port_handler.lock().await.stop_rejector(port).await
{
Ok(listener)
} else {
TcpListener::bind((config.listen_addr.ip(), port)).await
};
match listener {
Ok(listener) => {
// make sure that if we have an error, we still have access
// to the listener in the error handler.
handler_metadata.listener = Some(listener);
if let Ok(listener) = listener {
// make sure that if we have an error, we still have access
// to the listener in the error handler.
handler_metadata.listener = Some(listener);
// if we authenticated a client for a port we then failed to open
// we need to update the server here once a port that can be opened
// has been found
if !updated_server {
let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port)
.await
.context("dy-ip update")?;
}
port_handler.register_update();
port_handler
.port_state
.entry(port)
.or_default()
.new_state(PortStatus::Idle);
handler_metadata.port = Some(port);
break port;
// if we authenticated a client for a port we then failed to open
// we need to update the server here once a port that can be opened
// has been found
if !updated_server {
let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port).await?;
}
Err(_err) => {
port_handler.mark_port_error(number, port);
continue;
#[cfg(feature = "debug_server")]
let name = peer_query(&config.dyn_ip_server, number).await?;
let mut port_handler = port_handler.lock().await;
#[cfg(feature = "debug_server")]
if let Some(name) = name {
info!(%name, "found client name");
port_handler.names.insert(number, name);
}
};
};
info!(%addr, number, port, "authenticated");
port_handler.register_update();
port_handler
.port_state
.entry(port)
.or_default()
.new_state(PortStatus::Idle);
let listener = handler_metadata.listener.as_mut().unwrap(); // we only break from the loop if this is set
handler_metadata.port = Some(port);
packet.header = Header {
kind: PacketKind::RemConfirm.raw(),
length: 0,
};
packet.data.clear();
packet.send(&mut writer).await?;
break Ok(Some(port));
}
#[derive(Debug)]
enum Result {
Caller {
packet: Packet,
stream: TcpStream,
addr: SocketAddr,
},
Packet {
packet: Packet,
},
port_handler.lock().await.mark_port_error(number, port);
}
}
#[derive(Debug)]
enum IdleResult {
Caller {
packet: Packet,
stream: TcpStream,
addr: SocketAddr,
},
Disconnect {
packet: Packet,
},
}
#[instrument(skip(listener, reader, writer, packet))]
async fn idle(
listener: &mut TcpListener,
mut packet: Packet,
reader: &mut ReadHalf<'_>,
writer: &mut WriteHalf<'_>,
) -> eyre::Result<Option<IdleResult>> {
let mut last_ping_sent_at = Instant::now();
let mut last_ping_received_at = Instant::now();
let result = loop {
loop {
trace!(
seconds = SEND_PING_INTERVAL
.saturating_sub(last_ping_sent_at.elapsed())
@@ -151,16 +140,16 @@ pub async fn connection_handler(
select! {
caller = listener.accept() => {
let (stream, addr) = caller?;
break Result::Caller { packet, stream, addr }
break Ok(Some(IdleResult::Caller { packet, stream, addr }))
},
_ = Packet::peek_packet_kind(&mut reader) => {
packet.recv_into(&mut reader).await?;
_ = Packet::peek_packet_kind(reader) => {
packet.recv_into(reader).await?;
if packet.kind() == PacketKind::Ping {
trace!("received ping");
last_ping_received_at = Instant::now();
} else {
break Result::Packet { packet }
break Ok(Some(IdleResult::Disconnect { packet }))
}
},
_ = sleep(send_next_ping_in) => {
@@ -169,14 +158,24 @@ pub async fn connection_handler(
last_ping_sent_at = Instant::now();
}
_ = sleep(next_ping_expected_in) => {
writer.write_all(REJECT_TIMEOUT).await?;
return Ok(());
break Ok(None);
}
}
};
}
}
let (mut client, mut packet) = match result {
Result::Packet { mut packet } => {
#[instrument(skip(port_handler, handler_metadata, writer))]
async fn notify_or_disconnect(
result: IdleResult,
handler_metadata: &mut HandlerMetadata,
port_handler: &Mutex<PortHandler>,
port: u16,
writer: &mut WriteHalf<'_>,
) -> eyre::Result<Option<(TcpStream, Packet)>> {
match result {
IdleResult::Disconnect { mut packet } => {
if matches!(packet.kind(), PacketKind::End | PacketKind::Reject) {
info!(?packet, "got disconnect packet");
@@ -184,7 +183,7 @@ pub async fn connection_handler(
if packet.data.is_empty() {
packet.data.extend_from_slice(b"nc\0");
packet.header.length = packet.data.len() as u8;
packet.header.length = packet.data.len().try_into().unwrap();
}
port_handler.lock().await.start_rejector(
@@ -194,13 +193,13 @@ pub async fn connection_handler(
.take()
.expect("tried to start rejector twice"),
packet,
)?;
return Ok(());
);
Ok(None)
} else {
bail!("unexpected packet: {:?}", packet.kind())
Err(eyre!("unexpected packet: {:?}", packet.kind()))
}
}
Result::Caller {
IdleResult::Caller {
mut packet,
stream,
addr,
@@ -216,106 +215,199 @@ pub async fn connection_handler(
*/
packet.header = Header {
kind: PacketKind::RemCall.raw(),
length: packet.data.len() as u8,
length: packet.data.len().try_into().unwrap(), // ip addresses are less then 255 bytes long
};
packet.send(&mut writer).await?;
packet.send(writer).await?;
(stream, packet)
}
};
match timeout(
CALL_ACK_TIMEOUT,
packet.recv_into_cancelation_safe(&mut reader),
)
.await
{
Ok(res) => res?,
Err(_) => {
writer.write_all(REJECT_TIMEOUT).await?;
return Ok(());
Ok(Some((stream, packet)))
}
}
}
match packet.kind() {
PacketKind::End | PacketKind::Reject => {
port_handler.lock().await.start_rejector(
port,
handler_metadata
.listener
.take()
.expect("tried to start rejector twice"),
packet,
)?;
fn print_addr(stream: &TcpStream) -> String {
stream
.peer_addr()
.map_or_else(|_| "?".to_owned(), |addr| format!("{addr}"))
}
Ok(())
}
#[instrument(skip(packet, port_handler, handler_metadata, caller, client), fields(client_addr = print_addr(client), caller_addr = print_addr(caller)))]
async fn connect(
mut packet: Packet,
port_handler: &Mutex<PortHandler>,
port: u16,
handler_metadata: &mut HandlerMetadata,
client: &mut TcpStream,
caller: &mut TcpStream,
) -> eyre::Result<()> {
info!(
client_addr = print_addr(client),
caller_addr = print_addr(caller),
"connecting clients"
);
PacketKind::RemAck => {
packet.header = Header {
kind: PacketKind::Reject.raw(),
length: 4,
};
packet.data.clear();
packet.data.extend_from_slice(b"occ");
packet.data.push(0);
packet.header = Header {
kind: PacketKind::Reject.raw(),
length: 4,
};
packet.data.clear();
packet.data.extend_from_slice(b"occ");
packet.data.push(0);
{
let mut port_handler = port_handler.lock().await;
{
let mut port_handler = port_handler.lock().await;
port_handler.register_update();
port_handler
.port_state
.entry(port)
.or_default()
.new_state(PortStatus::InCall);
port_handler.register_update();
port_handler
.port_state
.entry(port)
.or_default()
.new_state(PortStatus::InCall);
port_handler.start_rejector(
port_handler.start_rejector(
port,
handler_metadata
.listener
.take()
.expect("tried to start rejector twice"),
packet,
);
}
client.set_nodelay(true)?;
caller.set_nodelay(true)?;
_ = timeout(CALL_TIMEOUT, tokio::io::copy_bidirectional(client, caller)).await;
{
let mut port_handler = port_handler.lock().await;
port_handler.register_update();
port_handler
.port_state
.entry(port)
.or_default()
.new_state(PortStatus::Disconnected);
port_handler
.change_rejector(port, |packet| {
packet.data.clear();
packet.data.extend_from_slice(b"nc");
packet.data.push(0);
packet.header = Header {
kind: PacketKind::Reject.raw(),
length: packet.data.len().try_into().unwrap(),
};
})
.await?;
}
Ok(())
}
/// # Errors
/// - the connection to the client or the caller is interupted
/// - the clients sends unexpected or malformed packets
/// - accepting a tcp connection fails
/// - settings tcp socket properties fails
/// - the client authentication fails
#[instrument(skip_all)]
pub async fn handler(
client: &mut TcpStream,
addr: SocketAddr,
config: &Config,
handler_metadata: &mut HandlerMetadata,
port_handler: &Mutex<PortHandler>,
) -> eyre::Result<()> {
let (mut reader, mut writer) = client.split();
let mut packet = Packet::default();
let Ok(res) = timeout(AUTH_TIMEOUT, packet.recv_into_cancelation_safe(&mut reader)).await else {
writer.write_all(REJECT_TIMEOUT).await?;
return Ok(());
};
res?;
let RemConnect { number, pin } = packet.as_rem_connect()?;
handler_metadata.number = Some(number);
let Some(port) = authenticate(config, port_handler, handler_metadata, number, pin).await? else {
writer.write_all(REJECT_OOP).await?;
return Ok(());
};
info!(%addr, number, port, "authenticated");
let Some(listener) = handler_metadata.listener.as_mut() else {
unreachable!("client sucessfully authenticated but did not set handler_metadata.listener");
};
packet.header = Header {
kind: PacketKind::RemConfirm.raw(),
length: 0,
};
packet.data.clear();
packet.send(&mut writer).await?;
let Some(idle_result) = idle(
listener,
packet,
&mut reader,
&mut writer,
).await? else {
return Ok(());
};
let Some((mut caller, mut packet)) = notify_or_disconnect(idle_result, handler_metadata, port_handler, port, &mut writer).await? else {
return Ok(());
};
let notify_at = Instant::now();
loop {
let recv = timeout(
CALL_ACK_TIMEOUT.saturating_sub(notify_at.elapsed()),
packet.recv_into_cancelation_safe(&mut reader),
);
let Ok(res) = recv.await else {
writer.write_all(REJECT_TIMEOUT).await?;
return Ok(());
};
res?;
match packet.kind() {
PacketKind::Ping => {}
PacketKind::End | PacketKind::Reject => {
port_handler.lock().await.start_rejector(
port,
handler_metadata
.listener
.take()
.expect("tried to start rejector twice"),
packet,
)?;
);
return Ok(());
}
stream.set_nodelay(true)?;
client.set_nodelay(true)?;
PacketKind::RemAck => {
connect(
packet,
port_handler,
port,
handler_metadata,
client,
&mut caller,
)
.await?;
let _ = timeout(
CALL_TIMEOUT,
tokio::io::copy_bidirectional(stream, &mut client),
)
.await;
{
let mut port_handler = port_handler.lock().await;
port_handler.register_update();
port_handler
.port_state
.entry(port)
.or_default()
.new_state(PortStatus::Disconnected);
port_handler
.change_rejector(port, |packet| {
packet.data.clear();
packet.data.extend_from_slice(b"nc");
packet.data.push(0);
packet.header = Header {
kind: PacketKind::Reject.raw(),
length: packet.data.len() as u8,
};
})
.await?;
return Ok(());
}
Ok(())
kind => return Err(eyre!("unexpected packet: {:?}", kind)),
}
kind => bail!("unexpected packet: {:?}", kind),
}
}

View File

@@ -7,6 +7,11 @@ use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::error;
use zerocopy::{AsBytes, FromBytes, LittleEndian, Unaligned};
use tracing::{debug, instrument};
use crate::packets::{Header, Packet};
use crate::ports::PortHandler;
use crate::spawn;
@@ -41,3 +46,74 @@ pub async fn debug_server(addr: SocketAddr, port_handler: Arc<Mutex<PortHandler>
error!(%error, "debug server error");
}
}
type U16 = zerocopy::U16<LittleEndian>;
type U32 = zerocopy::U32<LittleEndian>;
#[derive(AsBytes)]
#[repr(transparent)]
#[allow(dead_code)]
struct PeerQuery {
number: U32,
}
#[derive(FromBytes, Unaligned, Debug)]
#[repr(packed)]
#[allow(dead_code)]
struct PeerReply {
number: U32,
name: [u8; 40],
flags: U16,
kind: u8,
hostname: [u8; 40],
ipaddress: [u8; 4],
port: U16,
extension: u8,
pin: U16,
timestamp: U32,
}
#[instrument]
pub async fn peer_query(server: &SocketAddr, number: u32) -> eyre::Result<Option<String>> {
debug!(%number, "looking up");
let mut packet = Packet {
header: Header {
kind: 3, // Peer Query
length: 4,
},
data: Vec::new(),
};
packet.data.clear();
packet.data.resize(packet.header.length as usize, 0);
PeerQuery {
number: number.into(),
}
.write_to(packet.data.as_mut_slice())
.unwrap();
let mut socket = tokio::net::TcpStream::connect(server).await?;
let (mut reader, mut writer) = socket.split();
packet.send(&mut writer).await?;
packet.recv_into(&mut reader).await?;
Ok(if packet.kind().raw() == 5 {
// PeerReply
PeerReply::read_from(packet.data.as_slice()).and_then(|x| {
let i = x
.name
.iter()
.enumerate()
.find_map(|(i, c)| (*c == 0).then_some(i))
.unwrap_or(x.name.len());
Some(std::str::from_utf8(&x.name[..i]).ok()?.to_owned())
})
} else {
None
})
}

View File

@@ -1,3 +1,6 @@
#![warn(clippy::pedantic)]
#![allow(clippy::let_underscore_untyped)] // false positive in stable
use std::{
fmt::Debug,
fs::File,
@@ -15,17 +18,15 @@ use serde::{Deserialize, Deserializer};
use time::format_description::OwnedFormatItem;
use tokio::{
io::AsyncWriteExt,
net::TcpListener,
net::{TcpListener, TcpStream},
sync::Mutex,
time::{sleep, Instant},
};
use tracing::{error, info, warn, Level};
use tracing::{error, info, instrument, warn, Level};
use tracing_subscriber::fmt::time::FormatTime;
use crate::{
client::connection_handler,
ports::{AllowedPorts, PortHandler, PortStatus},
};
use crate::{constants::CACHE_STORE_INTERVAL, packets::PacketKind};
use crate::packets::PacketKind;
use crate::ports::{cache_daemon, AllowedList, PortHandler, PortStatus};
pub mod auth;
pub mod client;
@@ -41,11 +42,14 @@ type UnixTimestamp = u64;
#[derive(Debug, Deserialize)]
pub struct Config {
allowed_ports: AllowedPorts,
allowed_ports: AllowedList,
#[serde(deserialize_with = "parse_socket_addr")]
listen_addr: SocketAddr,
#[serde(deserialize_with = "parse_socket_addr")]
dyn_ip_server: SocketAddr,
#[cfg(feature = "debug_server")]
#[serde(deserialize_with = "maybe_parse_socket_addr")]
#[serde(default)]
@@ -109,25 +113,28 @@ impl Config {
}
}
#[cfg(not(feature = "tokio_console"))]
#[track_caller]
fn spawn<T: Send + 'static>(
_name: &str,
future: impl Future<Output = T> + Send + 'static,
) -> tokio::task::JoinHandle<T> {
tokio::spawn(future)
}
#[cfg(feature = "tokio_console")]
#[track_caller]
fn spawn<T: Send + 'static>(
name: &str,
future: impl Future<Output = T> + Send + 'static,
) -> tokio::task::JoinHandle<T> {
tokio::task::Builder::new()
use tracing::Instrument;
let future = future.instrument(tracing::span!(
Level::TRACE,
"spawn",
name = name,
caller = %std::panic::Location::caller().to_string()
));
#[cfg(feature = "tokio_console")]
return tokio::task::Builder::new()
.name(name)
.spawn(future)
.unwrap_or_else(|err| panic!("failed to spawn {name:?}: {err:?}"))
.unwrap_or_else(|err| panic!("failed to spawn {name:?}: {err:?}"));
#[cfg(not(feature = "tokio_console"))]
return tokio::spawn(future);
}
static TIME_ZONE_OFFSET: once_cell::sync::OnceCell<time::UtcOffset> =
@@ -135,13 +142,165 @@ static TIME_ZONE_OFFSET: once_cell::sync::OnceCell<time::UtcOffset> =
static TIME_FORMAT: once_cell::sync::OnceCell<OwnedFormatItem> = once_cell::sync::OnceCell::new();
fn main() -> anyhow::Result<()> {
let config = Arc::new(Config::load("config.json")?);
fn setup_tracing(config: &Config) {
use tracing::Subscriber;
use tracing_error::ErrorLayer;
use tracing_subscriber::prelude::*;
use tracing_subscriber::{
filter,
fmt::{self, FormatEvent, FormatFields},
registry::LookupSpan,
};
if config.allowed_ports.is_empty() {
panic!("no allowed ports");
struct EventFormater;
impl<S, N> FormatEvent<S, N> for EventFormater
where
S: Subscriber + for<'a> LookupSpan<'a>,
N: for<'a> FormatFields<'a> + 'static,
{
fn format_event(
&self,
ctx: &fmt::FmtContext<'_, S, N>,
mut writer: fmt::format::Writer<'_>,
event: &tracing::Event<'_>,
) -> std::fmt::Result {
use color_eyre::owo_colors::OwoColorize;
let meta = event.metadata();
fmt::time::OffsetTime::new(
*TIME_ZONE_OFFSET.get().unwrap(),
TIME_FORMAT.get().unwrap(),
)
.format_time(&mut writer)?;
// TODO: check writer.has_ansi_escapes()
let level = *meta.level();
match level {
Level::TRACE => write!(writer, " {:>5} ", level.purple())?,
Level::DEBUG => write!(writer, " {:>5} ", level.cyan())?,
Level::INFO => write!(writer, " {:>5} ", level.green())?,
Level::WARN => write!(writer, " {:>5} ", level.yellow())?,
Level::ERROR => write!(writer, " {:>5} ", level.red())?,
}
write!(writer, "{:23}{}", meta.target().dimmed(), ":".bold())?;
/*
if let Some(filename) = meta.file() {
write!(writer, " {}{}", filename.bold(), ":".dimmed())?;
}
if let Some(line_number) = meta.line() {
write!(writer, "{}{}", line_number.bold(), ":".dimmed())?;
}
*/
writer.write_char(' ')?;
ctx.format_fields(writer.by_ref(), event)?;
writeln!(writer)
}
}
// build a `Subscriber` by combining layers with a
// `tracing_subscriber::Registry`:
let registry = tracing_subscriber::registry();
#[cfg(feature = "tokio_console")]
let registry = registry.with(console_subscriber::spawn());
registry
.with(ErrorLayer::default())
.with(
fmt::layer()
.with_target(true)
.event_format(EventFormater)
.with_filter(filter::LevelFilter::from_level(config.log_level))
.with_filter(tracing_subscriber::filter::filter_fn(|meta| {
meta.target().starts_with(env!("CARGO_CRATE_NAME"))
})),
)
.init();
}
#[instrument(skip(stream, config, port_handler))]
async fn connection_handler(
mut stream: TcpStream,
addr: SocketAddr,
config: Arc<Config>,
port_handler: Arc<Mutex<PortHandler>>,
) {
use futures::future::FutureExt;
let mut handler_metadata = HandlerMetadata::default();
let res = std::panic::AssertUnwindSafe(client::handler(
&mut stream,
addr,
&config,
&mut handler_metadata,
&port_handler,
))
.catch_unwind()
.await;
let error = match res {
Err(_) => Some("internal server error".to_owned()),
Ok(Err(err)) => Some(err.to_string()),
Ok(Ok(())) => None,
};
if let Some(error) = error {
error!(%addr, %error, "Client had an error");
let mut packet = Packet::default();
packet.data.extend_from_slice(error.as_bytes());
packet.data.truncate((u8::MAX - 1) as usize);
packet.data.push(0);
packet.header = Header {
kind: PacketKind::Error.raw(),
length: packet.data.len().try_into().unwrap(), // this will never fail, as we just truncated the vector
};
let (_, mut writer) = stream.split();
_ = packet.send(&mut writer).await;
}
if let Some(port) = handler_metadata.port {
let mut port_handler = port_handler.lock().await;
if let Some(port_state) = port_handler.port_state.get_mut(&port) {
port_state.new_state(PortStatus::Disconnected);
port_handler.register_update();
}
if let Some(listener) = handler_metadata.listener.take() {
port_handler.start_rejector(
port,
listener,
Packet {
header: Header {
kind: PacketKind::Reject.raw(),
length: 3,
},
data: b"nc\0".to_vec(),
},
);
}
}
sleep(Duration::from_secs(3)).await;
_ = stream.shutdown().await;
}
fn main() -> eyre::Result<()> {
color_eyre::install()?;
let config = Arc::new(Config::load("config.json")?);
TIME_FORMAT.set(config.time_format.clone()).unwrap();
// we need to get this while still single threaded
@@ -151,176 +310,56 @@ fn main() -> anyhow::Result<()> {
.set(time::UtcOffset::current_local_offset()?)
.unwrap();
assert!(!config.allowed_ports.is_empty(), "no allowed ports");
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(async move {
{
use tracing_subscriber::prelude::*;
use tracing_subscriber::*;
.block_on(tokio_main(config))
}
// build a `Subscriber` by combining layers with a
// `tracing_subscriber::Registry`:
let registry = tracing_subscriber::registry();
async fn tokio_main(config: Arc<Config>) -> eyre::Result<()> {
setup_tracing(&config);
#[cfg(feature = "tokio_console")]
let registry = registry.with(console_subscriber::spawn());
let cache_path = PathBuf::from("cache.json");
registry
.with(
fmt::layer()
.with_target(true)
.with_timer(fmt::time::OffsetTime::new(
*TIME_ZONE_OFFSET.get().unwrap(),
TIME_FORMAT.get().unwrap(),
))
.with_filter(filter::LevelFilter::from_level(config.log_level))
.with_filter(tracing_subscriber::filter::filter_fn(|meta| {
meta.target().starts_with("centralex")
})),
)
.init();
}
let (change_sender, change_receiver) = tokio::sync::watch::channel(Instant::now());
let cache_path = PathBuf::from("cache.json");
let mut port_handler = PortHandler::load_or_default(&cache_path, change_sender);
port_handler.update_allowed_ports(&config.allowed_ports);
let (change_sender, mut change_receiver) = tokio::sync::watch::channel(Instant::now());
let port_handler = Arc::new(Mutex::new(port_handler));
let mut port_handler = PortHandler::load_or_default(&cache_path, change_sender);
port_handler.update_allowed_ports(&config.allowed_ports);
spawn(
"cache daemon",
cache_daemon(port_handler.clone(), cache_path, change_receiver),
);
let port_handler = Arc::new(Mutex::new(port_handler));
#[cfg(feature = "debug_server")]
if let Some(listen_addr) = config.debug_server_addr {
warn!(%listen_addr, "debug server listening");
spawn(
"debug server",
debug_server(listen_addr, port_handler.clone()),
);
}
{
let port_handler = port_handler.clone();
spawn("cache daemon", async move {
let mut last_store = Instant::now() - 2 * CACHE_STORE_INTERVAL;
let mut change_timeout = None;
loop {
if let Some(change_timeout) = change_timeout.take() {
tokio::time::timeout(change_timeout, change_receiver.changed())
.await
.unwrap_or(Ok(()))
} else {
change_receiver.changed().await
}
.expect("failed to wait for cache changes");
let listener = TcpListener::bind(config.listen_addr).await?;
warn!(
listen_addr = %config.listen_addr,
"centralex server listening"
);
let time_since_last_store = last_store.elapsed();
while let Ok((stream, addr)) = listener.accept().await {
info!(%addr, "new connection");
if time_since_last_store >= CACHE_STORE_INTERVAL {
let port_handler = port_handler.lock().await;
spawn(
&format!("connection to {addr}"),
connection_handler(stream, addr, config.clone(), port_handler.clone()),
);
}
last_store = Instant::now();
if let Err(err) = port_handler.store(&cache_path) {
error!("failed to store cache: {err:?}");
}
} else {
change_timeout = Some(CACHE_STORE_INTERVAL - time_since_last_store);
}
}
});
}
#[cfg(feature = "debug_server")]
if let Some(listen_addr) = config.debug_server_addr {
warn!(%listen_addr, "debug server listening");
spawn(
"debug server",
debug_server(listen_addr, port_handler.clone()),
);
}
let listener = TcpListener::bind(config.listen_addr).await?;
warn!(
listen_addr = %config.listen_addr,
"centralex server listening"
);
while let Ok((mut stream, addr)) = listener.accept().await {
info!(%addr, "new connection");
let port_handler = port_handler.clone();
let config = config.clone();
let mut handler_metadata = HandlerMetadata::default();
spawn(&format!("connection to {addr}"), async move {
use futures::future::FutureExt;
let res = std::panic::AssertUnwindSafe(connection_handler(
&config,
&mut handler_metadata,
&port_handler,
&mut stream,
))
.catch_unwind()
.await;
let error = match res {
Err(err) => {
let err = err
.downcast::<String>()
.map(|err| *err)
.unwrap_or_else(|_| "?".to_owned());
Some(format!("panic at: {err}"))
}
Ok(Err(err)) => Some(err.to_string()),
Ok(Ok(())) => None,
};
if let Some(error) = error {
error!(%addr, %error, "Client had an error");
let mut packet = Packet::default();
packet.data.extend_from_slice(error.as_bytes());
packet.data.truncate((u8::MAX - 1) as usize);
packet.data.push(0);
packet.header = Header {
kind: PacketKind::Error.raw(),
length: packet.data.len() as u8,
};
let (_, mut writer) = stream.split();
let _ = packet.send(&mut writer).await;
}
if let Some(port) = handler_metadata.port {
let mut port_handler = port_handler.lock().await;
if let Some(port_state) = port_handler.port_state.get_mut(&port) {
port_state.new_state(PortStatus::Disconnected);
port_handler.register_update();
}
if let Some(listener) = handler_metadata.listener.take() {
let res = port_handler.start_rejector(
port,
listener,
Packet {
header: Header {
kind: PacketKind::Reject.raw(),
length: 3,
},
data: b"nc\0".to_vec(),
},
);
if let Err(error) = res {
error!(%port, %error, "failed to start rejector");
}
}
}
sleep(Duration::from_secs(3)).await;
let _ = stream.shutdown().await;
});
}
Ok(())
})
Ok(())
}
#[derive(Debug, Default)]

View File

@@ -1,7 +1,7 @@
use std::fmt::Debug;
use anyhow::bail;
use bytemuck::{Pod, Zeroable};
use eyre::eyre;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::tcp::{ReadHalf, WriteHalf},
@@ -26,7 +26,9 @@ pub enum PacketKind {
Error = 0xff,
}
#[allow(clippy::enum_glob_use)]
impl PacketKind {
#[must_use]
fn from_u8(raw: u8) -> Self {
use PacketKind::*;
@@ -45,6 +47,7 @@ impl PacketKind {
}
}
#[must_use]
pub fn raw(&self) -> u8 {
use PacketKind::*;
@@ -105,12 +108,14 @@ pub struct RemConnect {
}
impl Packet {
#[allow(clippy::missing_errors_doc)]
pub async fn peek_packet_kind(stream: &mut ReadHalf<'_>) -> std::io::Result<PacketKind> {
Self::peek_packet_kind_raw(stream)
.await
.map(PacketKind::from_u8)
}
#[allow(clippy::missing_errors_doc)]
pub async fn peek_packet_kind_raw(stream: &mut ReadHalf<'_>) -> std::io::Result<u8> {
let mut kind = 0;
let n = stream.peek(std::slice::from_mut(&mut kind)).await?;
@@ -122,6 +127,7 @@ impl Packet {
}
}
#[allow(clippy::missing_errors_doc)]
pub async fn recv_into_cancelation_safe(
&mut self,
stream: &mut ReadHalf<'_>,
@@ -136,6 +142,7 @@ impl Packet {
self.recv_into(stream).await
}
#[allow(clippy::missing_errors_doc)]
pub async fn recv_into(&mut self, stream: &mut ReadHalf<'_>) -> std::io::Result<()> {
let header_bytes = bytemuck::bytes_of_mut(&mut self.header);
@@ -148,26 +155,33 @@ impl Packet {
Ok(())
}
#[allow(clippy::missing_errors_doc)]
pub async fn send(&self, stream: &mut WriteHalf<'_>) -> std::io::Result<()> {
stream.write_all(bytemuck::bytes_of(&self.header)).await?;
stream.write_all(&self.data).await?;
Ok(())
}
#[must_use]
pub fn kind(&self) -> PacketKind {
PacketKind::from_u8(self.header.kind)
}
pub fn as_rem_connect(&self) -> anyhow::Result<RemConnect> {
/// # Errors
/// the packet must be a `RemConnect` packet and must contain at least 6 bytes of data
pub fn as_rem_connect(&self) -> eyre::Result<RemConnect> {
if self.kind() != PacketKind::RemConnect {
bail!("Unexpected Packet: {:?} expected RemConnect", self.kind());
return Err(eyre!(
"Unexpected Packet: {:?} expected RemConnect",
self.kind()
));
}
if self.data.len() < 6 {
bail!(
return Err(eyre!(
"Too little data for RemConnect. Need at least 6 Bytes got {}",
self.data.len()
);
));
}
Ok(RemConnect {

View File

@@ -5,18 +5,23 @@ use std::{
fs::File,
io::{BufReader, BufWriter},
ops::RangeInclusive,
path::Path,
path::{Path, PathBuf},
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use anyhow::anyhow;
use eyre::eyre;
use serde::{Deserialize, Serialize};
use tokio::{net::TcpListener, sync::Mutex, task::JoinHandle, time::Instant};
use tracing::{debug, error, info, warn};
use tokio::{
net::TcpListener,
sync::{watch::Receiver, Mutex},
task::JoinHandle,
time::Instant,
};
use tracing::{debug, error, info, instrument, warn};
use crate::{
constants::{PORT_OWNERSHIP_TIMEOUT, PORT_RETRY_TIME},
constants::{CACHE_STORE_INTERVAL, PORT_OWNERSHIP_TIMEOUT, PORT_RETRY_TIME},
packets::Packet,
spawn, Config, Number, Port, UnixTimestamp, TIME_FORMAT, TIME_ZONE_OFFSET,
};
@@ -32,7 +37,7 @@ pub struct PortHandler {
#[serde(skip)]
port_guards: HashMap<Port, Rejector>,
allowed_ports: AllowedPorts,
allowed_ports: AllowedList,
#[serde(skip)]
free_ports: HashSet<Port>,
@@ -40,6 +45,43 @@ pub struct PortHandler {
allocated_ports: HashMap<Number, Port>,
pub port_state: HashMap<Port, PortState>,
#[cfg(feature = "debug_server")]
#[serde(default)]
pub names: HashMap<Number, String>,
}
#[instrument(skip(port_handler, change_receiver))]
pub async fn cache_daemon(
port_handler: Arc<Mutex<PortHandler>>,
cache_path: PathBuf,
mut change_receiver: Receiver<Instant>,
) {
let mut last_store = Instant::now() - 2 * CACHE_STORE_INTERVAL;
let mut change_timeout = None;
loop {
if let Some(change_timeout) = change_timeout.take() {
tokio::time::timeout(change_timeout, change_receiver.changed())
.await
.unwrap_or(Ok(()))
} else {
change_receiver.changed().await
}
.expect("failed to wait for cache changes");
let time_since_last_store = last_store.elapsed();
if time_since_last_store >= CACHE_STORE_INTERVAL {
let port_handler = port_handler.lock().await;
last_store = Instant::now();
if let Err(err) = port_handler.store(&cache_path) {
error!("failed to store cache: {err:?}");
}
} else {
change_timeout = Some(CACHE_STORE_INTERVAL - time_since_last_store);
}
}
}
#[derive(Hash, PartialEq, Eq)]
@@ -50,28 +92,32 @@ impl<T: Display> Debug for DisplayAsDebug<T> {
}
}
fn duration_in_hours(duration: Duration) -> String {
fn duration_string(duration: Duration) -> String {
let seconds_elapsed = duration.as_secs();
let hours = seconds_elapsed / (60 * 60);
let days = seconds_elapsed / (60 * 60 * 24);
let hours = seconds_elapsed / (60 * 60) % 24;
let minutes = (seconds_elapsed / 60) % 60;
let seconds = seconds_elapsed % 60;
match (hours > 0, minutes > 0) {
(true, _) => format!("{hours}h {minutes}min {seconds}s"),
(false, true) => format!("{minutes}min {seconds}s"),
_ => format!("{:.0?}", duration),
match (days > 0, hours > 0, minutes > 0) {
(true, _, _) => format!("{days}d {hours}h {minutes}min {seconds}s"),
(false, true, _) => format!("{hours}h {minutes}min {seconds}s"),
(false, false, true) => format!("{minutes}min {seconds}s"),
_ => format!("{duration:.0?}"),
}
}
fn format_instant(instant: Instant) -> String {
let when = duration_in_hours(instant.elapsed()) + " ago";
let when = duration_string(instant.elapsed()) + " ago";
(|| -> anyhow::Result<_> {
(|| -> eyre::Result<_> {
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)? - instant.elapsed();
let date = time::OffsetDateTime::from_unix_timestamp(timestamp.as_secs() as i64)?
.to_offset(*TIME_ZONE_OFFSET.get().unwrap())
.format(TIME_FORMAT.get().unwrap())?;
let date = time::OffsetDateTime::from_unix_timestamp(
timestamp.as_secs().try_into().expect("timestamp overflow"),
)?
.to_offset(*TIME_ZONE_OFFSET.get().unwrap())
.format(TIME_FORMAT.get().unwrap())?;
Ok(format!("{date} ({when})"))
})()
@@ -82,6 +128,7 @@ fn instant_from_timestamp(timestamp: UnixTimestamp) -> Instant {
Instant::now() - UNIX_EPOCH.elapsed().unwrap() + Duration::from_secs(timestamp)
}
#[cfg(feature = "debug_server")]
impl Debug for PortHandler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
const SHOW_N_FREE_PORTS: usize = 10;
@@ -93,7 +140,7 @@ impl Debug for PortHandler {
let mut free_ports = self.free_ports.iter().copied().collect::<Vec<u16>>();
free_ports.sort();
free_ports.sort_unstable();
let mut free_ports = free_ports
.into_iter()
@@ -123,21 +170,23 @@ impl Debug for PortHandler {
.allocated_ports
.iter()
.map(|(&number, &port)| {
let state = &self.port_state[&port];
#[derive(Debug)]
#[allow(dead_code)]
struct State {
struct State<'n> {
state: PortStatus,
name: &'n str,
number: u32,
port: u16,
last_change: DisplayAsDebug<String>,
}
let state = &self.port_state[&port];
State {
state: state.status,
number,
port,
name: self.names.get(&number).map_or("?", |x| x.as_str()),
last_change: DisplayAsDebug(format_instant(instant_from_timestamp(
state.last_change,
))),
@@ -145,7 +194,7 @@ impl Debug for PortHandler {
})
.collect::<Vec<_>>();
allocated_ports.sort_by(|a, b| {
allocated_ports.sort_unstable_by(|a, b| {
a.state.cmp(&b.state).then(
self.port_state[&a.port]
.last_change
@@ -157,10 +206,10 @@ impl Debug for PortHandler {
writeln!(f, "last update: {last_update}")?;
writeln!(f, "rejectors: {:#?}", self.port_guards)?;
writeln!(f, "allowed ports: {:?}", self.allowed_ports.0)?;
writeln!(f, "free ports: {:?}", free_ports)?;
writeln!(f, "free ports: {free_ports:?}")?;
writeln!(f, "errored ports: {:#?}", errored_ports)?;
writeln!(f, "allocated ports: {:#?}", allocated_ports)?;
writeln!(f, "errored ports: {errored_ports:#?}")?;
writeln!(f, "allocated ports: {allocated_ports:#?}")?;
Ok(())
}
@@ -189,7 +238,7 @@ impl PortState {
pub fn new_state(&mut self, status: PortStatus) {
self.last_change = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.expect("timestamp overflow")
.as_secs();
self.status = status;
@@ -210,18 +259,21 @@ impl Default for PortStatus {
}
#[derive(Default, Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct AllowedPorts(Vec<RangeInclusive<u16>>);
pub struct AllowedList(Vec<RangeInclusive<u16>>);
impl AllowedPorts {
impl AllowedList {
#[must_use]
pub fn is_allowed(&self, port: Port) -> bool {
self.0.iter().any(|range| range.contains(&port))
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}
impl PortHandler {
#[must_use]
pub fn status_string(&self) -> String {
format!("{self:#?}\n")
}
@@ -236,7 +288,9 @@ impl PortHandler {
.expect("failed to notify cache writer");
}
pub fn store(&self, cache: &Path) -> anyhow::Result<()> {
#[allow(clippy::missing_errors_doc)]
#[instrument(skip(self))]
pub fn store(&self, cache: &Path) -> std::io::Result<()> {
debug!("storing cache");
let temp_file = cache.with_extension(".temp");
@@ -246,6 +300,8 @@ impl PortHandler {
Ok(())
}
#[allow(clippy::missing_errors_doc)]
#[instrument(skip(change_sender))]
pub fn load(
cache: &Path,
change_sender: tokio::sync::watch::Sender<Instant>,
@@ -256,6 +312,8 @@ impl PortHandler {
Ok(cache)
}
#[must_use]
#[instrument(skip(change_sender))]
pub fn load_or_default(
path: &Path,
change_sender: tokio::sync::watch::Sender<Instant>,
@@ -266,7 +324,7 @@ impl PortHandler {
})
}
pub fn update_allowed_ports(&mut self, allowed_ports: &AllowedPorts) {
pub fn update_allowed_ports(&mut self, allowed_ports: &AllowedList) {
self.register_update();
self.allowed_ports = allowed_ports.clone();
@@ -301,43 +359,41 @@ impl PortHandler {
});
}
pub fn start_rejector(
&mut self,
port: Port,
listener: TcpListener,
packet: Packet,
) -> anyhow::Result<()> {
#[instrument(skip(self, listener))]
pub fn start_rejector(&mut self, port: Port, listener: TcpListener, packet: Packet) {
info!(port, ?packet, "starting rejector");
let port_guard = Rejector::start(listener, packet);
assert!(
self.port_guards.insert(port, port_guard).is_none(),
"Tried to start rejector that is already running.
This should have been impossible since it requires two listeners on the same port."
);
Ok(())
if self.port_guards.insert(port, port_guard).is_some() {
unreachable!("Tried to start rejector that is already running. This should have been impossible since it requires two listeners on the same port.");
}
}
#[instrument(skip(self))]
pub async fn stop_rejector(&mut self, port: Port) -> Option<(TcpListener, Packet)> {
info!(port, "stopping rejector");
Some(self.port_guards.remove(&port)?.stop().await)
}
/// # Errors
/// - the rejector must be running
pub async fn change_rejector(
&mut self,
port: Port,
f: impl FnOnce(&mut Packet),
) -> anyhow::Result<()> {
) -> eyre::Result<()> {
let (listener, mut packet) = self
.stop_rejector(port)
.await
.ok_or_else(|| anyhow!("tried to stop rejector that is not running"))?;
.ok_or_else(|| eyre!("tried to stop rejector that is not running"))?;
f(&mut packet);
self.start_rejector(port, listener, packet)
self.start_rejector(port, listener, packet);
Ok(())
}
}
@@ -355,6 +411,7 @@ impl Debug for Rejector {
}
impl Rejector {
#[instrument(skip(listener))]
fn start(listener: TcpListener, packet: Packet) -> Self {
let port = listener.local_addr().map(|addr| addr.port()).unwrap_or(0);
let state = Arc::new((Mutex::new(listener), packet));
@@ -370,7 +427,7 @@ impl Rejector {
loop {
if let Ok((mut socket, _)) = listener.accept().await {
let (_, mut writer) = socket.split();
let _ = packet.send(&mut writer).await;
_ = packet.send(&mut writer).await;
}
}
})
@@ -378,22 +435,23 @@ impl Rejector {
Self { state, handle }
}
#[instrument(skip(self))]
async fn stop(self) -> (TcpListener, Packet) {
self.handle.abort();
let _ = self.handle.await;
_ = self.handle.await;
let (listener, packet) = Arc::try_unwrap(self.state).unwrap();
(listener.into_inner(), packet)
}
}
impl PortHandler {
#[instrument(skip(self, config))]
pub fn allocate_port_for_number(&mut self, config: &Config, number: Number) -> Option<Port> {
let port = if let Some(port) = self.allocated_ports.get(&number) {
let already_connected = self
.port_state
.get(port)
.map(|state| state.status != PortStatus::Disconnected)
.unwrap_or(false);
.map_or(false, |state| state.status != PortStatus::Disconnected);
if already_connected {
None
@@ -409,17 +467,20 @@ impl PortHandler {
self.try_recover_port(config)?
};
assert!(self.allocated_ports.insert(number, port).is_none());
if self.allocated_ports.insert(number, port).is_some() {
unreachable!("allocated port twice");
}
Some(port)
};
if let Some(port) = port {
info!(port, "allocated port");
info!(port, "allocated");
}
port
}
#[instrument(skip(self, config))]
fn try_recover_port(&mut self, config: &Config) -> Option<Port> {
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
@@ -463,26 +524,27 @@ impl PortHandler {
}
let removable_entry = self.allocated_ports.iter().find(|(_, port)| {
self.port_state
.get(port)
.map(|port_state| {
port_state.status == PortStatus::Disconnected
&& now.saturating_sub(Duration::from_secs(port_state.last_change))
>= PORT_OWNERSHIP_TIMEOUT
})
.unwrap_or(true)
self.port_state.get(port).map_or(true, |port_state| {
port_state.status == PortStatus::Disconnected
&& now.saturating_sub(Duration::from_secs(port_state.last_change))
>= PORT_OWNERSHIP_TIMEOUT
})
});
if let Some((&old_number, &port)) = removable_entry {
self.register_update();
info!(port, old_number, "reused port");
assert!(self.allocated_ports.remove(&old_number).is_some());
#[cfg(feature = "debug_server")]
self.names.remove(&old_number);
return Some(port);
}
None // TODO
}
#[instrument(skip(self))]
pub fn mark_port_error(&mut self, number: Number, port: Port) {
warn!(port, number, "registering an error on");
self.register_update();
@@ -490,7 +552,7 @@ impl PortHandler {
self.errored_ports.insert((
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.expect("timestamp overflow")
.as_secs(),
port,
));