Compare commits

...

23 Commits

Author SHA1 Message Date
15672536f6 fix c string formatting 2023-06-10 20:18:52 +02:00
b5d2a63909 fix start without exisiting cache 2023-06-09 20:23:33 +02:00
904091c455 remove unnecessary .ok() 2023-06-09 15:43:53 +02:00
02f44cfab6 continue if accept fails 2023-06-09 15:43:01 +02:00
0eed949a27 Delete '.gitea/workflows/build.yaml' 2023-05-07 00:53:56 +02:00
d58c063414 revert 854f694d22
All checks were successful
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 6s
revert Delete '.gitea/workflows/build.yaml'
2023-05-07 00:53:49 +02:00
207a304231 revert 854f694d22
All checks were successful
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 8s
revert Delete '.gitea/workflows/build.yaml'
2023-05-07 00:53:01 +02:00
854f694d22 Delete '.gitea/workflows/build.yaml' 2023-05-07 00:36:55 +02:00
47ac15422c add actions
All checks were successful
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 50s
2023-05-07 00:34:44 +02:00
4627800120 increase log target padding 2023-04-22 18:49:14 +02:00
fb1a2aa1c0 print duration in days if >24h 2023-04-22 18:45:25 +02:00
c48d369854 fix installer 2023-03-28 17:27:54 +02:00
06567d957c disable tokio-console by default 2023-03-28 15:02:24 +02:00
8f978c06f9 look up name in authentication routine 2023-03-28 14:32:48 +02:00
0aab8b16c7 register name changes as updates 2023-03-28 14:21:47 +02:00
1e7b10bc6d change debug display order 2023-03-28 14:16:24 +02:00
a2a2d89912 serialize client names 2023-03-28 14:12:43 +02:00
946bb37096 change debug display order 2023-03-28 14:07:25 +02:00
f22cafa96e show client names in debug server 2023-03-28 14:05:43 +02:00
82838e46dd log client name 2023-03-28 13:53:34 +02:00
c01e18f5f2 log client name 2023-03-28 13:52:58 +02:00
1340e87c15 switch to zerocopy crate. Initial server query debug implementation 2023-03-28 13:46:38 +02:00
eefb943292 update installer 2023-03-28 12:53:20 +02:00
10 changed files with 205 additions and 63 deletions

View File

@@ -1,3 +1,2 @@
[build]
rustflags = [ "--cfg", "tokio_unstable" ]
# rustflags = [ "--cfg", "tokio_unstable" ]

22
Cargo.lock generated
View File

@@ -190,6 +190,7 @@ dependencies = [
"tracing",
"tracing-error",
"tracing-subscriber",
"zerocopy",
]
[[package]]
@@ -1338,3 +1339,24 @@ name = "windows_x86_64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
[[package]]
name = "zerocopy"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "332f188cc1bcf1fe1064b8c58d150f497e697f49774aa846f2dc949d9a25f236"
dependencies = [
"byteorder",
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6505e6815af7de1746a08f69c69606bb45695a17149517680f3b2149713b19a3"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]

View File

@@ -27,8 +27,9 @@ once_cell = "1.17.1"
eyre = "0.6.8"
color-eyre = "0.6.2"
tracing-error = "0.2.0"
zerocopy = "0.6.1"
[features]
default = ["debug_server", "tokio_console"]
default = ["debug_server"]
debug_server = ["dep:hyper"]
tokio_console = ["dep:console-subscriber"]

View File

@@ -24,8 +24,8 @@ then
exit 1
fi
user_name="centralex"
install_dir="/var/lib/centralex"
user_name="itelex"
install_dir="/home/itelex/"
service_file="/etc/systemd/system/centralex.service"
if [ $# -lt 1 ]; then
@@ -52,7 +52,7 @@ case "$step" in
rm rustup.sh
echo "cloning source code..."
git clone https://github.com/soruh/centralex centralex
git clone https://gitea.h.glsys.de/soruh/centralex centralex
echo "creating default config..."
cp centralex/config-template.json centralex/config.json
@@ -63,8 +63,8 @@ case "$step" in
exit 1
fi
echo "creating user $user_name..."
useradd -s /usr/sbin/nologin --create-home --home-dir "$install_dir" "$user_name"
# echo "creating user $user_name..."
# useradd -s /usr/sbin/nologin --create-home --home-dir "$install_dir" "$user_name"
echo "creating service file..."
cat > "$service_file" << EOF
@@ -72,10 +72,10 @@ case "$step" in
Description=Centralex
[Service]
Enviroment=RUST_BACKTRACE=1
Environment=RUST_BACKTRACE=1
ExecStart=$install_dir/.cargo/bin/cargo run --release
Type=simple
User=centralex
User=$user_name
WorkingDirectory=$install_dir/centralex
[Install]

View File

@@ -2,9 +2,21 @@ use std::net::SocketAddr;
use eyre::eyre;
use tracing::{debug, instrument};
use zerocopy::{AsBytes, LittleEndian};
use crate::packets::{Header, Packet, PacketKind};
type U16 = zerocopy::U16<LittleEndian>;
type U32 = zerocopy::U32<LittleEndian>;
#[derive(AsBytes)]
#[repr(C)]
struct DynIpUpdate {
number: U32,
pin: U16,
port: U16,
}
/// # Errors
/// - the dyn ip server returns a malformed response or is unreachable
/// - the authentication fails
@@ -25,18 +37,21 @@ pub async fn dyn_ip_update(
data: Vec::new(),
};
packet.data.clear();
packet.data.reserve(packet.header.length as usize);
packet.data.extend_from_slice(&number.to_le_bytes());
packet.data.extend_from_slice(&pin.to_le_bytes());
packet.data.extend_from_slice(&port.to_le_bytes());
packet.data.resize(packet.header.length as usize, 0);
DynIpUpdate {
number: number.into(),
pin: pin.into(),
port: port.into(),
}
.write_to(packet.data.as_mut_slice())
.unwrap();
let mut socket = tokio::net::TcpStream::connect(server).await?;
let (mut reader, mut writer) = socket.split();
packet.send(&mut writer).await?;
packet.recv_into(&mut reader).await?;
let result = match packet.kind() {

View File

@@ -15,6 +15,7 @@ use tracing::{info, instrument, trace};
use crate::{
auth::dyn_ip_update,
constants::{AUTH_TIMEOUT, CALL_ACK_TIMEOUT, CALL_TIMEOUT, PING_TIMEOUT, SEND_PING_INTERVAL},
debug_server::peer_query,
packets::{Header, Packet, PacketKind, RemConnect, REJECT_OOP, REJECT_TIMEOUT},
ports::{PortHandler, PortStatus},
Config, HandlerMetadata,
@@ -50,9 +51,9 @@ async fn authenticate(
updated_server = true;
}
let mut port_handler = port_handler.lock().await;
let listener = if let Some((listener, _packet)) = port_handler.stop_rejector(port).await {
let listener = if let Some((listener, _packet)) =
port_handler.lock().await.stop_rejector(port).await
{
Ok(listener)
} else {
TcpListener::bind((config.listen_addr.ip(), port)).await
@@ -70,6 +71,17 @@ async fn authenticate(
let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port).await?;
}
#[cfg(feature = "debug_server")]
let name = peer_query(&config.dyn_ip_server, number).await?;
let mut port_handler = port_handler.lock().await;
#[cfg(feature = "debug_server")]
if let Some(name) = name {
info!(%name, "found client name");
port_handler.names.insert(number, name);
}
port_handler.register_update();
port_handler
.port_state
@@ -82,7 +94,7 @@ async fn authenticate(
break Ok(Some(port));
}
port_handler.mark_port_error(number, port);
port_handler.lock().await.mark_port_error(number, port);
}
}
@@ -265,7 +277,7 @@ async fn connect(
client.set_nodelay(true)?;
caller.set_nodelay(true)?;
let _ = timeout(CALL_TIMEOUT, tokio::io::copy_bidirectional(client, caller)).await;
_ = timeout(CALL_TIMEOUT, tokio::io::copy_bidirectional(client, caller)).await;
{
let mut port_handler = port_handler.lock().await;

View File

@@ -7,6 +7,11 @@ use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::error;
use zerocopy::{AsBytes, FromBytes, LittleEndian, Unaligned};
use tracing::{debug, instrument};
use crate::packets::{Header, Packet};
use crate::ports::PortHandler;
use crate::spawn;
@@ -41,3 +46,74 @@ pub async fn debug_server(addr: SocketAddr, port_handler: Arc<Mutex<PortHandler>
error!(%error, "debug server error");
}
}
type U16 = zerocopy::U16<LittleEndian>;
type U32 = zerocopy::U32<LittleEndian>;
#[derive(AsBytes)]
#[repr(transparent)]
#[allow(dead_code)]
struct PeerQuery {
number: U32,
}
#[derive(FromBytes, Unaligned, Debug)]
#[repr(packed)]
#[allow(dead_code)]
struct PeerReply {
number: U32,
name: [u8; 40],
flags: U16,
kind: u8,
hostname: [u8; 40],
ipaddress: [u8; 4],
port: U16,
extension: u8,
pin: U16,
timestamp: U32,
}
#[instrument]
pub async fn peer_query(server: &SocketAddr, number: u32) -> eyre::Result<Option<String>> {
debug!(%number, "looking up");
let mut packet = Packet {
header: Header {
kind: 3, // Peer Query
length: 4,
},
data: Vec::new(),
};
packet.data.clear();
packet.data.resize(packet.header.length as usize, 0);
PeerQuery {
number: number.into(),
}
.write_to(packet.data.as_mut_slice())
.unwrap();
let mut socket = tokio::net::TcpStream::connect(server).await?;
let (mut reader, mut writer) = socket.split();
packet.send(&mut writer).await?;
packet.recv_into(&mut reader).await?;
Ok(if packet.kind().raw() == 5 {
// PeerReply
PeerReply::read_from(packet.data.as_slice()).and_then(|x| {
let i = x
.name
.iter()
.enumerate()
.find_map(|(i, c)| (*c == 0).then_some(i))
.unwrap_or(x.name.len());
Some(std::str::from_utf8(&x.name[..i]).ok()?.to_owned())
})
} else {
None
})
}

View File

@@ -185,7 +185,7 @@ fn setup_tracing(config: &Config) {
Level::ERROR => write!(writer, " {:>5} ", level.red())?,
}
write!(writer, "{:17}{}", meta.target().dimmed(), ":".bold())?;
write!(writer, "{:23}{}", meta.target().dimmed(), ":".bold())?;
/*
if let Some(filename) = meta.file() {
@@ -266,7 +266,7 @@ async fn connection_handler(
};
let (_, mut writer) = stream.split();
let _ = packet.send(&mut writer).await;
_ = packet.send(&mut writer).await;
}
if let Some(port) = handler_metadata.port {
@@ -293,7 +293,7 @@ async fn connection_handler(
}
sleep(Duration::from_secs(3)).await;
let _ = stream.shutdown().await;
_ = stream.shutdown().await;
}
fn main() -> eyre::Result<()> {
@@ -350,16 +350,23 @@ async fn tokio_main(config: Arc<Config>) -> eyre::Result<()> {
"centralex server listening"
);
while let Ok((stream, addr)) = listener.accept().await {
info!(%addr, "new connection");
loop {
let connection = listener.accept().await;
spawn(
&format!("connection to {addr}"),
connection_handler(stream, addr, config.clone(), port_handler.clone()),
);
match connection {
Ok((stream, addr)) => {
info!(%addr, "new connection");
spawn(
&format!("connection to {addr}"),
connection_handler(stream, addr, config.clone(), port_handler.clone()),
);
}
Err(err) => {
error!(%err, "failed to accept connection");
}
}
}
Ok(())
}
#[derive(Debug, Default)]

View File

@@ -1,4 +1,4 @@
use std::fmt::Debug;
use std::{ffi::CStr, fmt::Debug};
use bytemuck::{Pod, Zeroable};
use eyre::eyre;
@@ -85,18 +85,18 @@ impl Debug for Packet {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let data = &self.data;
let str_data = std::str::from_utf8(&data[..data.len().saturating_sub(1)]).ok();
let mut debugger = f.debug_struct("Packet");
let data = if let Some(str_data) = str_data.as_ref() {
str_data as &dyn Debug
debugger.field("kind", &PacketKind::from_u8(self.header.kind));
let c_str = CStr::from_bytes_until_nul(data).ok();
if let Some(str_data) = c_str.as_ref().and_then(|x| x.to_str().ok()) {
debugger.field("data", &str_data);
} else {
&data as &dyn Debug
};
debugger.field("data", &data);
}
f.debug_struct("Packet")
.field("kind", &PacketKind::from_u8(self.header.kind))
.field("data", &data)
.finish()
debugger.finish()
}
}

View File

@@ -45,6 +45,10 @@ pub struct PortHandler {
allocated_ports: HashMap<Number, Port>,
pub port_state: HashMap<Port, PortState>,
#[cfg(feature = "debug_server")]
#[serde(default)]
pub names: HashMap<Number, String>,
}
#[instrument(skip(port_handler, change_receiver))]
@@ -88,22 +92,24 @@ impl<T: Display> Debug for DisplayAsDebug<T> {
}
}
fn duration_in_hours(duration: Duration) -> String {
fn duration_string(duration: Duration) -> String {
let seconds_elapsed = duration.as_secs();
let hours = seconds_elapsed / (60 * 60);
let days = seconds_elapsed / (60 * 60 * 24);
let hours = seconds_elapsed / (60 * 60) % 24;
let minutes = (seconds_elapsed / 60) % 60;
let seconds = seconds_elapsed % 60;
match (hours > 0, minutes > 0) {
(true, _) => format!("{hours}h {minutes}min {seconds}s"),
(false, true) => format!("{minutes}min {seconds}s"),
match (days > 0, hours > 0, minutes > 0) {
(true, _, _) => format!("{days}d {hours}h {minutes}min {seconds}s"),
(false, true, _) => format!("{hours}h {minutes}min {seconds}s"),
(false, false, true) => format!("{minutes}min {seconds}s"),
_ => format!("{duration:.0?}"),
}
}
fn format_instant(instant: Instant) -> String {
let when = duration_in_hours(instant.elapsed()) + " ago";
let when = duration_string(instant.elapsed()) + " ago";
(|| -> eyre::Result<_> {
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)? - instant.elapsed();
@@ -122,6 +128,7 @@ fn instant_from_timestamp(timestamp: UnixTimestamp) -> Instant {
Instant::now() - UNIX_EPOCH.elapsed().unwrap() + Duration::from_secs(timestamp)
}
#[cfg(feature = "debug_server")]
impl Debug for PortHandler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
const SHOW_N_FREE_PORTS: usize = 10;
@@ -165,8 +172,9 @@ impl Debug for PortHandler {
.map(|(&number, &port)| {
#[derive(Debug)]
#[allow(dead_code)]
struct State {
struct State<'n> {
state: PortStatus,
name: &'n str,
number: u32,
port: u16,
last_change: DisplayAsDebug<String>,
@@ -178,6 +186,7 @@ impl Debug for PortHandler {
state: state.status,
number,
port,
name: self.names.get(&number).map_or("?", |x| x.as_str()),
last_change: DisplayAsDebug(format_instant(instant_from_timestamp(
state.last_change,
))),
@@ -274,7 +283,7 @@ impl PortHandler {
self.last_update = Some(now);
self.change_sender
.as_ref()
.expect("PortHandler is missing it's change_sender")
.expect("PortHandler is missing its change_sender")
.send(now)
.expect("failed to notify cache writer");
}
@@ -292,15 +301,9 @@ impl PortHandler {
}
#[allow(clippy::missing_errors_doc)]
#[instrument(skip(change_sender))]
pub fn load(
cache: &Path,
change_sender: tokio::sync::watch::Sender<Instant>,
) -> std::io::Result<Self> {
pub fn load(cache: &Path) -> std::io::Result<Self> {
info!("loading cache");
let mut cache: Self = serde_json::from_reader(BufReader::new(File::open(cache)?))?;
cache.change_sender = Some(change_sender);
Ok(cache)
Ok(serde_json::from_reader(BufReader::new(File::open(cache)?))?)
}
#[must_use]
@@ -309,10 +312,14 @@ impl PortHandler {
path: &Path,
change_sender: tokio::sync::watch::Sender<Instant>,
) -> Self {
Self::load(path, change_sender).unwrap_or_else(|error| {
let mut this = Self::load(path).unwrap_or_else(|error| {
error!(?path, %error, "failed to parse cache file");
Self::default()
})
});
this.change_sender = Some(change_sender);
this
}
pub fn update_allowed_ports(&mut self, allowed_ports: &AllowedList) {
@@ -418,7 +425,7 @@ impl Rejector {
loop {
if let Ok((mut socket, _)) = listener.accept().await {
let (_, mut writer) = socket.split();
let _ = packet.send(&mut writer).await;
_ = packet.send(&mut writer).await;
}
}
})
@@ -429,7 +436,7 @@ impl Rejector {
#[instrument(skip(self))]
async fn stop(self) -> (TcpListener, Packet) {
self.handle.abort();
let _ = self.handle.await;
_ = self.handle.await;
let (listener, packet) = Arc::try_unwrap(self.state).unwrap();
(listener.into_inner(), packet)
}
@@ -465,7 +472,7 @@ impl PortHandler {
};
if let Some(port) = port {
info!(port, "allocated port");
info!(port, "allocated");
}
port
@@ -526,6 +533,9 @@ impl PortHandler {
self.register_update();
info!(port, old_number, "reused port");
assert!(self.allocated_ports.remove(&old_number).is_some());
#[cfg(feature = "debug_server")]
self.names.remove(&old_number);
return Some(port);
}