Compare commits

...

22 Commits

Author SHA1 Message Date
5c2049af94 fix actions?
Some checks failed
Code Checks / cargo fmt (push) Failing after 3s
Code Checks / cargo clippy (push) Failing after 2s
Test Suite / cargo test (push) Failing after 2s
Code Checks / cargo fmt (pull_request) Failing after 2s
Code Checks / cargo clippy (pull_request) Failing after 2s
Test Suite / cargo test (pull_request) Failing after 2s
2023-05-07 01:15:55 +02:00
3cae37852a fully specify actions-rust-lang url
Some checks failed
Code Checks / cargo fmt (push) Failing after 27s
Code Checks / cargo clippy (push) Failing after 4s
Test Suite / cargo test (push) Failing after 24s
Test Suite / cargo test (pull_request) Failing after 24s
Code Checks / cargo clippy (pull_request) Failing after 4s
Code Checks / cargo fmt (pull_request) Failing after 25s
2023-05-07 01:07:57 +02:00
eb4c6854f0 add actions
Some checks failed
Code Checks / cargo fmt (push) Failing after 3s
Code Checks / cargo clippy (push) Failing after 3s
Test Suite / cargo test (push) Failing after 2s
Code Checks / cargo fmt (pull_request) Failing after 3s
Code Checks / cargo clippy (pull_request) Failing after 2s
Test Suite / cargo test (pull_request) Failing after 3s
2023-05-07 01:05:34 +02:00
0eed949a27 Delete '.gitea/workflows/build.yaml' 2023-05-07 00:53:56 +02:00
d58c063414 revert 854f694d22
All checks were successful
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 6s
revert Delete '.gitea/workflows/build.yaml'
2023-05-07 00:53:49 +02:00
207a304231 revert 854f694d22
All checks were successful
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 8s
revert Delete '.gitea/workflows/build.yaml'
2023-05-07 00:53:01 +02:00
854f694d22 Delete '.gitea/workflows/build.yaml' 2023-05-07 00:36:55 +02:00
47ac15422c add actions
All checks were successful
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 50s
2023-05-07 00:34:44 +02:00
4627800120 increase log target padding 2023-04-22 18:49:14 +02:00
fb1a2aa1c0 print duration in days if >24h 2023-04-22 18:45:25 +02:00
c48d369854 fix installer 2023-03-28 17:27:54 +02:00
06567d957c disable tokio-console by default 2023-03-28 15:02:24 +02:00
8f978c06f9 look up name in authentication routine 2023-03-28 14:32:48 +02:00
0aab8b16c7 register name changes as updates 2023-03-28 14:21:47 +02:00
1e7b10bc6d change debug display order 2023-03-28 14:16:24 +02:00
a2a2d89912 serialize client names 2023-03-28 14:12:43 +02:00
946bb37096 change debug display order 2023-03-28 14:07:25 +02:00
f22cafa96e show client names in debug server 2023-03-28 14:05:43 +02:00
82838e46dd log client name 2023-03-28 13:53:34 +02:00
c01e18f5f2 log client name 2023-03-28 13:52:58 +02:00
1340e87c15 switch to zerocopy crate. Initial server query debug implementation 2023-03-28 13:46:38 +02:00
eefb943292 update installer 2023-03-28 12:53:20 +02:00
11 changed files with 214 additions and 34 deletions

View File

@@ -1,3 +1,2 @@
[build] [build]
rustflags = [ "--cfg", "tokio_unstable" ] # rustflags = [ "--cfg", "tokio_unstable" ]

View File

@@ -0,0 +1,30 @@
name: "Code Checks"
on:
push:
pull_request:
jobs:
# Check formatting with rustfmt
formatting:
name: cargo fmt
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
# Ensure rustfmt is installed and setup problem matcher
- uses: http://github.com/actions-rust-lang/setup-rust-toolchain
with:
components: rustfmt
- name: Rustfmt Check
uses: http://github.com/actions-rust-lang/rustfmt@v1
# Check code with clippy
clippy:
name: cargo clippy
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
# Ensure clippy is installed and setup problem matcher
- uses: http://github.com/actions-rust-lang/setup-rust-toolchain
with:
components: clippy
- run: cargo clippy -- -D warnings

View File

@@ -0,0 +1,13 @@
name: "Test Suite"
on:
push:
pull_request:
jobs:
test:
name: cargo test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: http://github.com/actions-rust-lang/setup-rust-toolchain
- run: cargo test --all-features

22
Cargo.lock generated
View File

@@ -190,6 +190,7 @@ dependencies = [
"tracing", "tracing",
"tracing-error", "tracing-error",
"tracing-subscriber", "tracing-subscriber",
"zerocopy",
] ]
[[package]] [[package]]
@@ -1338,3 +1339,24 @@ name = "windows_x86_64_msvc"
version = "0.42.2" version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
[[package]]
name = "zerocopy"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "332f188cc1bcf1fe1064b8c58d150f497e697f49774aa846f2dc949d9a25f236"
dependencies = [
"byteorder",
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6505e6815af7de1746a08f69c69606bb45695a17149517680f3b2149713b19a3"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]

View File

@@ -27,8 +27,9 @@ once_cell = "1.17.1"
eyre = "0.6.8" eyre = "0.6.8"
color-eyre = "0.6.2" color-eyre = "0.6.2"
tracing-error = "0.2.0" tracing-error = "0.2.0"
zerocopy = "0.6.1"
[features] [features]
default = ["debug_server", "tokio_console"] default = ["debug_server"]
debug_server = ["dep:hyper"] debug_server = ["dep:hyper"]
tokio_console = ["dep:console-subscriber"] tokio_console = ["dep:console-subscriber"]

View File

@@ -24,8 +24,8 @@ then
exit 1 exit 1
fi fi
user_name="centralex" user_name="itelex"
install_dir="/var/lib/centralex" install_dir="/home/itelex/"
service_file="/etc/systemd/system/centralex.service" service_file="/etc/systemd/system/centralex.service"
if [ $# -lt 1 ]; then if [ $# -lt 1 ]; then
@@ -52,7 +52,7 @@ case "$step" in
rm rustup.sh rm rustup.sh
echo "cloning source code..." echo "cloning source code..."
git clone https://github.com/soruh/centralex centralex git clone https://gitea.h.glsys.de/soruh/centralex centralex
echo "creating default config..." echo "creating default config..."
cp centralex/config-template.json centralex/config.json cp centralex/config-template.json centralex/config.json
@@ -63,8 +63,8 @@ case "$step" in
exit 1 exit 1
fi fi
echo "creating user $user_name..." # echo "creating user $user_name..."
useradd -s /usr/sbin/nologin --create-home --home-dir "$install_dir" "$user_name" # useradd -s /usr/sbin/nologin --create-home --home-dir "$install_dir" "$user_name"
echo "creating service file..." echo "creating service file..."
cat > "$service_file" << EOF cat > "$service_file" << EOF
@@ -72,10 +72,10 @@ case "$step" in
Description=Centralex Description=Centralex
[Service] [Service]
Enviroment=RUST_BACKTRACE=1 Environment=RUST_BACKTRACE=1
ExecStart=$install_dir/.cargo/bin/cargo run --release ExecStart=$install_dir/.cargo/bin/cargo run --release
Type=simple Type=simple
User=centralex User=$user_name
WorkingDirectory=$install_dir/centralex WorkingDirectory=$install_dir/centralex
[Install] [Install]

View File

@@ -2,9 +2,21 @@ use std::net::SocketAddr;
use eyre::eyre; use eyre::eyre;
use tracing::{debug, instrument}; use tracing::{debug, instrument};
use zerocopy::{AsBytes, LittleEndian};
use crate::packets::{Header, Packet, PacketKind}; use crate::packets::{Header, Packet, PacketKind};
type U16 = zerocopy::U16<LittleEndian>;
type U32 = zerocopy::U32<LittleEndian>;
#[derive(AsBytes)]
#[repr(C)]
struct DynIpUpdate {
number: U32,
pin: U16,
port: U16,
}
/// # Errors /// # Errors
/// - the dyn ip server returns a malformed response or is unreachable /// - the dyn ip server returns a malformed response or is unreachable
/// - the authentication fails /// - the authentication fails
@@ -25,18 +37,21 @@ pub async fn dyn_ip_update(
data: Vec::new(), data: Vec::new(),
}; };
packet.data.clear(); packet.data.resize(packet.header.length as usize, 0);
packet.data.reserve(packet.header.length as usize);
packet.data.extend_from_slice(&number.to_le_bytes()); DynIpUpdate {
packet.data.extend_from_slice(&pin.to_le_bytes()); number: number.into(),
packet.data.extend_from_slice(&port.to_le_bytes()); pin: pin.into(),
port: port.into(),
}
.write_to(packet.data.as_mut_slice())
.unwrap();
let mut socket = tokio::net::TcpStream::connect(server).await?; let mut socket = tokio::net::TcpStream::connect(server).await?;
let (mut reader, mut writer) = socket.split(); let (mut reader, mut writer) = socket.split();
packet.send(&mut writer).await?; packet.send(&mut writer).await?;
packet.recv_into(&mut reader).await?; packet.recv_into(&mut reader).await?;
let result = match packet.kind() { let result = match packet.kind() {

View File

@@ -15,6 +15,7 @@ use tracing::{info, instrument, trace};
use crate::{ use crate::{
auth::dyn_ip_update, auth::dyn_ip_update,
constants::{AUTH_TIMEOUT, CALL_ACK_TIMEOUT, CALL_TIMEOUT, PING_TIMEOUT, SEND_PING_INTERVAL}, constants::{AUTH_TIMEOUT, CALL_ACK_TIMEOUT, CALL_TIMEOUT, PING_TIMEOUT, SEND_PING_INTERVAL},
debug_server::peer_query,
packets::{Header, Packet, PacketKind, RemConnect, REJECT_OOP, REJECT_TIMEOUT}, packets::{Header, Packet, PacketKind, RemConnect, REJECT_OOP, REJECT_TIMEOUT},
ports::{PortHandler, PortStatus}, ports::{PortHandler, PortStatus},
Config, HandlerMetadata, Config, HandlerMetadata,
@@ -50,9 +51,9 @@ async fn authenticate(
updated_server = true; updated_server = true;
} }
let mut port_handler = port_handler.lock().await; let listener = if let Some((listener, _packet)) =
port_handler.lock().await.stop_rejector(port).await
let listener = if let Some((listener, _packet)) = port_handler.stop_rejector(port).await { {
Ok(listener) Ok(listener)
} else { } else {
TcpListener::bind((config.listen_addr.ip(), port)).await TcpListener::bind((config.listen_addr.ip(), port)).await
@@ -70,6 +71,17 @@ async fn authenticate(
let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port).await?; let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port).await?;
} }
#[cfg(feature = "debug_server")]
let name = peer_query(&config.dyn_ip_server, number).await?;
let mut port_handler = port_handler.lock().await;
#[cfg(feature = "debug_server")]
if let Some(name) = name {
info!(%name, "found client name");
port_handler.names.insert(number, name);
}
port_handler.register_update(); port_handler.register_update();
port_handler port_handler
.port_state .port_state
@@ -82,7 +94,7 @@ async fn authenticate(
break Ok(Some(port)); break Ok(Some(port));
} }
port_handler.mark_port_error(number, port); port_handler.lock().await.mark_port_error(number, port);
} }
} }
@@ -265,7 +277,7 @@ async fn connect(
client.set_nodelay(true)?; client.set_nodelay(true)?;
caller.set_nodelay(true)?; caller.set_nodelay(true)?;
let _ = timeout(CALL_TIMEOUT, tokio::io::copy_bidirectional(client, caller)).await; _ = timeout(CALL_TIMEOUT, tokio::io::copy_bidirectional(client, caller)).await;
{ {
let mut port_handler = port_handler.lock().await; let mut port_handler = port_handler.lock().await;

View File

@@ -7,6 +7,11 @@ use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing::error; use tracing::error;
use zerocopy::{AsBytes, FromBytes, LittleEndian, Unaligned};
use tracing::{debug, instrument};
use crate::packets::{Header, Packet};
use crate::ports::PortHandler; use crate::ports::PortHandler;
use crate::spawn; use crate::spawn;
@@ -41,3 +46,74 @@ pub async fn debug_server(addr: SocketAddr, port_handler: Arc<Mutex<PortHandler>
error!(%error, "debug server error"); error!(%error, "debug server error");
} }
} }
type U16 = zerocopy::U16<LittleEndian>;
type U32 = zerocopy::U32<LittleEndian>;
#[derive(AsBytes)]
#[repr(transparent)]
#[allow(dead_code)]
struct PeerQuery {
number: U32,
}
#[derive(FromBytes, Unaligned, Debug)]
#[repr(packed)]
#[allow(dead_code)]
struct PeerReply {
number: U32,
name: [u8; 40],
flags: U16,
kind: u8,
hostname: [u8; 40],
ipaddress: [u8; 4],
port: U16,
extension: u8,
pin: U16,
timestamp: U32,
}
#[instrument]
pub async fn peer_query(server: &SocketAddr, number: u32) -> eyre::Result<Option<String>> {
debug!(%number, "looking up");
let mut packet = Packet {
header: Header {
kind: 3, // Peer Query
length: 4,
},
data: Vec::new(),
};
packet.data.clear();
packet.data.resize(packet.header.length as usize, 0);
PeerQuery {
number: number.into(),
}
.write_to(packet.data.as_mut_slice())
.unwrap();
let mut socket = tokio::net::TcpStream::connect(server).await?;
let (mut reader, mut writer) = socket.split();
packet.send(&mut writer).await?;
packet.recv_into(&mut reader).await?;
Ok(if packet.kind().raw() == 5 {
// PeerReply
PeerReply::read_from(packet.data.as_slice()).and_then(|x| {
let i = x
.name
.iter()
.enumerate()
.find_map(|(i, c)| (*c == 0).then_some(i))
.unwrap_or(x.name.len());
Some(std::str::from_utf8(&x.name[..i]).ok()?.to_owned())
})
} else {
None
})
}

View File

@@ -185,7 +185,7 @@ fn setup_tracing(config: &Config) {
Level::ERROR => write!(writer, " {:>5} ", level.red())?, Level::ERROR => write!(writer, " {:>5} ", level.red())?,
} }
write!(writer, "{:17}{}", meta.target().dimmed(), ":".bold())?; write!(writer, "{:23}{}", meta.target().dimmed(), ":".bold())?;
/* /*
if let Some(filename) = meta.file() { if let Some(filename) = meta.file() {
@@ -266,7 +266,7 @@ async fn connection_handler(
}; };
let (_, mut writer) = stream.split(); let (_, mut writer) = stream.split();
let _ = packet.send(&mut writer).await; _ = packet.send(&mut writer).await;
} }
if let Some(port) = handler_metadata.port { if let Some(port) = handler_metadata.port {
@@ -293,7 +293,7 @@ async fn connection_handler(
} }
sleep(Duration::from_secs(3)).await; sleep(Duration::from_secs(3)).await;
let _ = stream.shutdown().await; _ = stream.shutdown().await;
} }
fn main() -> eyre::Result<()> { fn main() -> eyre::Result<()> {

View File

@@ -45,6 +45,10 @@ pub struct PortHandler {
allocated_ports: HashMap<Number, Port>, allocated_ports: HashMap<Number, Port>,
pub port_state: HashMap<Port, PortState>, pub port_state: HashMap<Port, PortState>,
#[cfg(feature = "debug_server")]
#[serde(default)]
pub names: HashMap<Number, String>,
} }
#[instrument(skip(port_handler, change_receiver))] #[instrument(skip(port_handler, change_receiver))]
@@ -88,22 +92,24 @@ impl<T: Display> Debug for DisplayAsDebug<T> {
} }
} }
fn duration_in_hours(duration: Duration) -> String { fn duration_string(duration: Duration) -> String {
let seconds_elapsed = duration.as_secs(); let seconds_elapsed = duration.as_secs();
let hours = seconds_elapsed / (60 * 60); let days = seconds_elapsed / (60 * 60 * 24);
let hours = seconds_elapsed / (60 * 60) % 24;
let minutes = (seconds_elapsed / 60) % 60; let minutes = (seconds_elapsed / 60) % 60;
let seconds = seconds_elapsed % 60; let seconds = seconds_elapsed % 60;
match (hours > 0, minutes > 0) { match (days > 0, hours > 0, minutes > 0) {
(true, _) => format!("{hours}h {minutes}min {seconds}s"), (true, _, _) => format!("{days}d {hours}h {minutes}min {seconds}s"),
(false, true) => format!("{minutes}min {seconds}s"), (false, true, _) => format!("{hours}h {minutes}min {seconds}s"),
(false, false, true) => format!("{minutes}min {seconds}s"),
_ => format!("{duration:.0?}"), _ => format!("{duration:.0?}"),
} }
} }
fn format_instant(instant: Instant) -> String { fn format_instant(instant: Instant) -> String {
let when = duration_in_hours(instant.elapsed()) + " ago"; let when = duration_string(instant.elapsed()) + " ago";
(|| -> eyre::Result<_> { (|| -> eyre::Result<_> {
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)? - instant.elapsed(); let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)? - instant.elapsed();
@@ -122,6 +128,7 @@ fn instant_from_timestamp(timestamp: UnixTimestamp) -> Instant {
Instant::now() - UNIX_EPOCH.elapsed().unwrap() + Duration::from_secs(timestamp) Instant::now() - UNIX_EPOCH.elapsed().unwrap() + Duration::from_secs(timestamp)
} }
#[cfg(feature = "debug_server")]
impl Debug for PortHandler { impl Debug for PortHandler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
const SHOW_N_FREE_PORTS: usize = 10; const SHOW_N_FREE_PORTS: usize = 10;
@@ -165,8 +172,9 @@ impl Debug for PortHandler {
.map(|(&number, &port)| { .map(|(&number, &port)| {
#[derive(Debug)] #[derive(Debug)]
#[allow(dead_code)] #[allow(dead_code)]
struct State { struct State<'n> {
state: PortStatus, state: PortStatus,
name: &'n str,
number: u32, number: u32,
port: u16, port: u16,
last_change: DisplayAsDebug<String>, last_change: DisplayAsDebug<String>,
@@ -178,6 +186,7 @@ impl Debug for PortHandler {
state: state.status, state: state.status,
number, number,
port, port,
name: self.names.get(&number).map_or("?", |x| x.as_str()),
last_change: DisplayAsDebug(format_instant(instant_from_timestamp( last_change: DisplayAsDebug(format_instant(instant_from_timestamp(
state.last_change, state.last_change,
))), ))),
@@ -418,7 +427,7 @@ impl Rejector {
loop { loop {
if let Ok((mut socket, _)) = listener.accept().await { if let Ok((mut socket, _)) = listener.accept().await {
let (_, mut writer) = socket.split(); let (_, mut writer) = socket.split();
let _ = packet.send(&mut writer).await; _ = packet.send(&mut writer).await;
} }
} }
}) })
@@ -429,7 +438,7 @@ impl Rejector {
#[instrument(skip(self))] #[instrument(skip(self))]
async fn stop(self) -> (TcpListener, Packet) { async fn stop(self) -> (TcpListener, Packet) {
self.handle.abort(); self.handle.abort();
let _ = self.handle.await; _ = self.handle.await;
let (listener, packet) = Arc::try_unwrap(self.state).unwrap(); let (listener, packet) = Arc::try_unwrap(self.state).unwrap();
(listener.into_inner(), packet) (listener.into_inner(), packet)
} }
@@ -465,7 +474,7 @@ impl PortHandler {
}; };
if let Some(port) = port { if let Some(port) = port {
info!(port, "allocated port"); info!(port, "allocated");
} }
port port
@@ -526,6 +535,9 @@ impl PortHandler {
self.register_update(); self.register_update();
info!(port, old_number, "reused port"); info!(port, old_number, "reused port");
assert!(self.allocated_ports.remove(&old_number).is_some()); assert!(self.allocated_ports.remove(&old_number).is_some());
#[cfg(feature = "debug_server")]
self.names.remove(&old_number);
return Some(port); return Some(port);
} }