Compare commits
10 Commits
4023b5bad4
...
894080500b
Author | SHA1 | Date | |
---|---|---|---|
894080500b | |||
eb2f66a7b6 | |||
6f1e9836c0 | |||
a5c73993d3 | |||
9d8124bd5c | |||
11d37c5b73 | |||
e4500cab78 | |||
ac72742c2a | |||
d917afe58c | |||
96033a0796 |
86
Cargo.lock
generated
86
Cargo.lock
generated
@ -19,12 +19,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
|
||||
|
||||
[[package]]
|
||||
name = "anyhow"
|
||||
version = "1.0.69"
|
||||
version = "1.0.70"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "224afbd727c3d6e4b90103ece64b8d1b67fbb1973b1046c2281eed3f3803f800"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
]
|
||||
checksum = "7de8ce5e0f9f8d88245311066a578d72b7af3e7088f32783804676302df237e4"
|
||||
|
||||
[[package]]
|
||||
name = "async-stream"
|
||||
@ -56,7 +53,7 @@ checksum = "86ea188f25f0255d8f92797797c97ebf5631fa88178beb1a46fdf5622c9a00e4"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.0",
|
||||
"syn 2.0.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -179,9 +176,10 @@ checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f"
|
||||
name = "centralex"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytemuck",
|
||||
"color-eyre",
|
||||
"console-subscriber",
|
||||
"eyre",
|
||||
"futures",
|
||||
"hyper",
|
||||
"once_cell",
|
||||
@ -190,6 +188,7 @@ dependencies = [
|
||||
"time",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-error",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
@ -199,6 +198,33 @@ version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||
|
||||
[[package]]
|
||||
name = "color-eyre"
|
||||
version = "0.6.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5a667583cca8c4f8436db8de46ea8233c42a7d9ae424a82d338f2e4675229204"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"color-spantrace",
|
||||
"eyre",
|
||||
"indenter",
|
||||
"once_cell",
|
||||
"owo-colors",
|
||||
"tracing-error",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "color-spantrace"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1ba75b3d9449ecdccb27ecbc479fdc0b87fa2dd43d2f8298f9bf0e59aacc8dce"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
"owo-colors",
|
||||
"tracing-core",
|
||||
"tracing-error",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "console-api"
|
||||
version = "0.4.0"
|
||||
@ -269,6 +295,16 @@ version = "1.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91"
|
||||
|
||||
[[package]]
|
||||
name = "eyre"
|
||||
version = "0.6.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4c2b6b5a29c02cdc822728b7d7b8ae1bab3e3b05d44522770ddd49722eeac7eb"
|
||||
dependencies = [
|
||||
"indenter",
|
||||
"once_cell",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "flate2"
|
||||
version = "1.0.25"
|
||||
@ -490,6 +526,12 @@ dependencies = [
|
||||
"tokio-io-timeout",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indenter"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683"
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "1.9.2"
|
||||
@ -659,6 +701,12 @@ version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
|
||||
|
||||
[[package]]
|
||||
name = "owo-colors"
|
||||
version = "3.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f"
|
||||
|
||||
[[package]]
|
||||
name = "percent-encoding"
|
||||
version = "2.2.0"
|
||||
@ -827,22 +875,22 @@ checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041"
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.156"
|
||||
version = "1.0.157"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "314b5b092c0ade17c00142951e50ced110ec27cea304b1037c6969246c2469a4"
|
||||
checksum = "707de5fcf5df2b5788fca98dd7eab490bc2fd9b7ef1404defc462833b83f25ca"
|
||||
dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.156"
|
||||
version = "1.0.157"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d7e29c4601e36bcec74a223228dce795f4cd3616341a4af93520ca1a837c087d"
|
||||
checksum = "78997f4555c22a7971214540c4a661291970619afd56de19f77e0de86296e1e5"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 1.0.109",
|
||||
"syn 2.0.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -903,9 +951,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "2.0.0"
|
||||
version = "2.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4cff13bb1732bccfe3b246f3fdb09edfd51c01d6f5299b7ccd9457c2e4e37774"
|
||||
checksum = "59d3276aee1fa0c33612917969b5172b5be2db051232a6e4826f1a1a9191b045"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@ -1119,6 +1167,16 @@ dependencies = [
|
||||
"valuable",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-error"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d686ec1c0f384b1277f097b2f279a2ecc11afe8c133c1aabf036a27cb4cd206e"
|
||||
dependencies = [
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-futures"
|
||||
version = "0.2.5"
|
||||
|
15
Cargo.toml
15
Cargo.toml
@ -3,21 +3,30 @@ name = "centralex"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[profile.release]
|
||||
debug = true
|
||||
|
||||
[profile.dev.package.backtrace]
|
||||
opt-level = 3
|
||||
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1.24.2", features = ["macros", "rt-multi-thread", "net", "io-util", "sync", "time"] }
|
||||
anyhow = { version = "1.0.68", features = ["backtrace"] }
|
||||
time = { version = "0.3.20", features = ["local-offset", "macros"] }
|
||||
bytemuck = { version = "1.13.0", features = ["derive"] }
|
||||
serde = { version = "1.0.152", features = ["derive"] }
|
||||
serde_json = "1.0.91"
|
||||
hyper = { version = "0.14.24", optional = true, features = ["server", "http1", "tcp"] }
|
||||
futures = { version = "0.3.27", default-features = false, features = ["std"] }
|
||||
console-subscriber = { version = "0.1.8", optional = true }
|
||||
tracing = "0.1.37"
|
||||
tracing-subscriber = { version = "0.3.16", features = ["time"] }
|
||||
time = { version = "0.3.20", features = ["local-offset", "macros"] }
|
||||
console-subscriber = { version = "0.1.8", optional = true }
|
||||
once_cell = "1.17.1"
|
||||
eyre = "0.6.8"
|
||||
color-eyre = "0.6.2"
|
||||
tracing-error = "0.2.0"
|
||||
|
||||
[features]
|
||||
default = ["debug_server", "tokio_console"]
|
||||
|
24
src/auth.rs
24
src/auth.rs
@ -1,16 +1,20 @@
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use anyhow::bail;
|
||||
use tracing::debug;
|
||||
use eyre::eyre;
|
||||
use tracing::{debug, instrument};
|
||||
|
||||
use crate::packets::{Header, Packet, PacketKind};
|
||||
|
||||
/// # Errors
|
||||
/// - the dyn ip server returns a malformed response or is unreachable
|
||||
/// - the authentication fails
|
||||
#[instrument]
|
||||
pub async fn dyn_ip_update(
|
||||
server: &SocketAddr,
|
||||
number: u32,
|
||||
pin: u16,
|
||||
port: u16,
|
||||
) -> anyhow::Result<std::net::Ipv4Addr> {
|
||||
) -> eyre::Result<std::net::Ipv4Addr> {
|
||||
debug!(%number, %port, "starting dyn ip update");
|
||||
|
||||
let mut packet = Packet {
|
||||
@ -38,7 +42,7 @@ pub async fn dyn_ip_update(
|
||||
let result = match packet.kind() {
|
||||
PacketKind::DynIpUpdateResponse => Ok(<[u8; 4]>::try_from(packet.data)
|
||||
.map_err(|err| {
|
||||
anyhow::anyhow!(
|
||||
eyre!(
|
||||
"too little data for ip address. Need 4 bytes got {}",
|
||||
err.len()
|
||||
)
|
||||
@ -51,17 +55,13 @@ pub async fn dyn_ip_update(
|
||||
.enumerate()
|
||||
.find_map(|(i, x)| (*x == 0).then_some(i));
|
||||
|
||||
bail!(
|
||||
return Err(eyre!(
|
||||
"{}",
|
||||
std::str::from_utf8(
|
||||
first_zero
|
||||
.map(|i| &packet.data[..i])
|
||||
.unwrap_or(&packet.data),
|
||||
)?
|
||||
)
|
||||
std::str::from_utf8(first_zero.map_or(&packet.data, |i| &packet.data[..i]),)?
|
||||
));
|
||||
}
|
||||
|
||||
_ => bail!("server returned unexpected packet"),
|
||||
_ => return Err(eyre!("server returned unexpected packet")),
|
||||
};
|
||||
|
||||
debug!(?result, "finished dyn ip update");
|
||||
|
296
src/client.rs
296
src/client.rs
@ -1,13 +1,16 @@
|
||||
use anyhow::{bail, Context};
|
||||
use eyre::eyre;
|
||||
use std::{net::SocketAddr, time::Instant};
|
||||
use tokio::{
|
||||
io::AsyncWriteExt,
|
||||
net::{TcpListener, TcpStream},
|
||||
net::{
|
||||
tcp::{ReadHalf, WriteHalf},
|
||||
TcpListener, TcpStream,
|
||||
},
|
||||
select,
|
||||
sync::Mutex,
|
||||
time::{sleep, timeout},
|
||||
};
|
||||
use tracing::{info, trace};
|
||||
use tracing::{info, instrument, trace};
|
||||
|
||||
use crate::{
|
||||
auth::dyn_ip_update,
|
||||
@ -17,32 +20,18 @@ use crate::{
|
||||
Config, HandlerMetadata,
|
||||
};
|
||||
|
||||
pub async fn connection_handler(
|
||||
/// # Errors
|
||||
/// - the client authentication fails
|
||||
#[instrument(skip(config, port_handler, handler_metadata))]
|
||||
async fn authenticate(
|
||||
config: &Config,
|
||||
handler_metadata: &mut HandlerMetadata,
|
||||
port_handler: &Mutex<PortHandler>,
|
||||
stream: &mut TcpStream,
|
||||
) -> anyhow::Result<()> {
|
||||
let addr = stream.peer_addr()?;
|
||||
|
||||
let (mut reader, mut writer) = stream.split();
|
||||
|
||||
let mut packet = Packet::default();
|
||||
|
||||
match timeout(AUTH_TIMEOUT, packet.recv_into_cancelation_safe(&mut reader)).await {
|
||||
Ok(res) => res?,
|
||||
Err(_) => {
|
||||
writer.write_all(REJECT_TIMEOUT).await?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
let RemConnect { number, pin } = packet.as_rem_connect()?;
|
||||
|
||||
handler_metadata.number = Some(number);
|
||||
|
||||
handler_metadata: &mut HandlerMetadata,
|
||||
number: u32,
|
||||
pin: u16,
|
||||
) -> eyre::Result<Option<u16>> {
|
||||
let mut authenticated = false;
|
||||
let port = loop {
|
||||
loop {
|
||||
let mut updated_server = false;
|
||||
|
||||
let port = port_handler
|
||||
@ -51,15 +40,12 @@ pub async fn connection_handler(
|
||||
.allocate_port_for_number(config, number);
|
||||
|
||||
let Some(port) = port else {
|
||||
writer.write_all(REJECT_OOP).await?;
|
||||
return Ok(());
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// make sure the client is authenticated before opening any ports
|
||||
if !authenticated {
|
||||
let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port)
|
||||
.await
|
||||
.context("dy-ip update")?;
|
||||
let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port).await?;
|
||||
authenticated = true;
|
||||
updated_server = true;
|
||||
}
|
||||
@ -72,8 +58,7 @@ pub async fn connection_handler(
|
||||
TcpListener::bind((config.listen_addr.ip(), port)).await
|
||||
};
|
||||
|
||||
match listener {
|
||||
Ok(listener) => {
|
||||
if let Ok(listener) = listener {
|
||||
// make sure that if we have an error, we still have access
|
||||
// to the listener in the error handler.
|
||||
handler_metadata.listener = Some(listener);
|
||||
@ -82,9 +67,7 @@ pub async fn connection_handler(
|
||||
// we need to update the server here once a port that can be opened
|
||||
// has been found
|
||||
if !updated_server {
|
||||
let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port)
|
||||
.await
|
||||
.context("dy-ip update")?;
|
||||
let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port).await?;
|
||||
}
|
||||
|
||||
port_handler.register_update();
|
||||
@ -96,42 +79,36 @@ pub async fn connection_handler(
|
||||
|
||||
handler_metadata.port = Some(port);
|
||||
|
||||
break port;
|
||||
break Ok(Some(port));
|
||||
}
|
||||
Err(_err) => {
|
||||
|
||||
port_handler.mark_port_error(number, port);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
info!(%addr, number, port, "authenticated");
|
||||
|
||||
let listener = handler_metadata.listener.as_mut().unwrap(); // we only break from the loop if this is set
|
||||
|
||||
packet.header = Header {
|
||||
kind: PacketKind::RemConfirm.raw(),
|
||||
length: 0,
|
||||
};
|
||||
packet.data.clear();
|
||||
packet.send(&mut writer).await?;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum Result {
|
||||
enum IdleResult {
|
||||
Caller {
|
||||
packet: Packet,
|
||||
stream: TcpStream,
|
||||
addr: SocketAddr,
|
||||
},
|
||||
Packet {
|
||||
Disconnect {
|
||||
packet: Packet,
|
||||
},
|
||||
}
|
||||
|
||||
#[instrument(skip(listener, reader, writer, packet))]
|
||||
async fn idle(
|
||||
listener: &mut TcpListener,
|
||||
mut packet: Packet,
|
||||
reader: &mut ReadHalf<'_>,
|
||||
writer: &mut WriteHalf<'_>,
|
||||
) -> eyre::Result<Option<IdleResult>> {
|
||||
let mut last_ping_sent_at = Instant::now();
|
||||
let mut last_ping_received_at = Instant::now();
|
||||
|
||||
let result = loop {
|
||||
loop {
|
||||
trace!(
|
||||
seconds = SEND_PING_INTERVAL
|
||||
.saturating_sub(last_ping_sent_at.elapsed())
|
||||
@ -151,16 +128,16 @@ pub async fn connection_handler(
|
||||
select! {
|
||||
caller = listener.accept() => {
|
||||
let (stream, addr) = caller?;
|
||||
break Result::Caller { packet, stream, addr }
|
||||
break Ok(Some(IdleResult::Caller { packet, stream, addr }))
|
||||
},
|
||||
_ = Packet::peek_packet_kind(&mut reader) => {
|
||||
packet.recv_into(&mut reader).await?;
|
||||
_ = Packet::peek_packet_kind(reader) => {
|
||||
packet.recv_into(reader).await?;
|
||||
|
||||
if packet.kind() == PacketKind::Ping {
|
||||
trace!("received ping");
|
||||
last_ping_received_at = Instant::now();
|
||||
} else {
|
||||
break Result::Packet { packet }
|
||||
break Ok(Some(IdleResult::Disconnect { packet }))
|
||||
}
|
||||
},
|
||||
_ = sleep(send_next_ping_in) => {
|
||||
@ -169,14 +146,24 @@ pub async fn connection_handler(
|
||||
last_ping_sent_at = Instant::now();
|
||||
}
|
||||
_ = sleep(next_ping_expected_in) => {
|
||||
writer.write_all(REJECT_TIMEOUT).await?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let (mut client, mut packet) = match result {
|
||||
Result::Packet { mut packet } => {
|
||||
writer.write_all(REJECT_TIMEOUT).await?;
|
||||
break Ok(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(port_handler, handler_metadata, writer))]
|
||||
async fn notify_or_disconnect(
|
||||
result: IdleResult,
|
||||
handler_metadata: &mut HandlerMetadata,
|
||||
port_handler: &Mutex<PortHandler>,
|
||||
port: u16,
|
||||
writer: &mut WriteHalf<'_>,
|
||||
) -> eyre::Result<Option<(TcpStream, Packet)>> {
|
||||
match result {
|
||||
IdleResult::Disconnect { mut packet } => {
|
||||
if matches!(packet.kind(), PacketKind::End | PacketKind::Reject) {
|
||||
info!(?packet, "got disconnect packet");
|
||||
|
||||
@ -184,7 +171,7 @@ pub async fn connection_handler(
|
||||
|
||||
if packet.data.is_empty() {
|
||||
packet.data.extend_from_slice(b"nc\0");
|
||||
packet.header.length = packet.data.len() as u8;
|
||||
packet.header.length = packet.data.len().try_into().unwrap();
|
||||
}
|
||||
|
||||
port_handler.lock().await.start_rejector(
|
||||
@ -194,13 +181,13 @@ pub async fn connection_handler(
|
||||
.take()
|
||||
.expect("tried to start rejector twice"),
|
||||
packet,
|
||||
)?;
|
||||
return Ok(());
|
||||
);
|
||||
Ok(None)
|
||||
} else {
|
||||
bail!("unexpected packet: {:?}", packet.kind())
|
||||
Err(eyre!("unexpected packet: {:?}", packet.kind()))
|
||||
}
|
||||
}
|
||||
Result::Caller {
|
||||
IdleResult::Caller {
|
||||
mut packet,
|
||||
stream,
|
||||
addr,
|
||||
@ -216,43 +203,37 @@ pub async fn connection_handler(
|
||||
*/
|
||||
packet.header = Header {
|
||||
kind: PacketKind::RemCall.raw(),
|
||||
length: packet.data.len() as u8,
|
||||
length: packet.data.len().try_into().unwrap(), // ip addresses are less then 255 bytes long
|
||||
};
|
||||
|
||||
packet.send(&mut writer).await?;
|
||||
packet.send(writer).await?;
|
||||
|
||||
(stream, packet)
|
||||
Ok(Some((stream, packet)))
|
||||
}
|
||||
};
|
||||
|
||||
match timeout(
|
||||
CALL_ACK_TIMEOUT,
|
||||
packet.recv_into_cancelation_safe(&mut reader),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => res?,
|
||||
Err(_) => {
|
||||
writer.write_all(REJECT_TIMEOUT).await?;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
match packet.kind() {
|
||||
PacketKind::End | PacketKind::Reject => {
|
||||
port_handler.lock().await.start_rejector(
|
||||
port,
|
||||
handler_metadata
|
||||
.listener
|
||||
.take()
|
||||
.expect("tried to start rejector twice"),
|
||||
packet,
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
fn print_addr(stream: &TcpStream) -> String {
|
||||
stream
|
||||
.peer_addr()
|
||||
.map_or_else(|_| "?".to_owned(), |addr| format!("{addr}"))
|
||||
}
|
||||
|
||||
PacketKind::RemAck => {
|
||||
#[instrument(skip(packet, port_handler, handler_metadata, caller, client), fields(client_addr = print_addr(client), caller_addr = print_addr(caller)))]
|
||||
async fn connect(
|
||||
mut packet: Packet,
|
||||
port_handler: &Mutex<PortHandler>,
|
||||
port: u16,
|
||||
handler_metadata: &mut HandlerMetadata,
|
||||
client: &mut TcpStream,
|
||||
caller: &mut TcpStream,
|
||||
) -> eyre::Result<()> {
|
||||
info!(
|
||||
client_addr = print_addr(client),
|
||||
caller_addr = print_addr(caller),
|
||||
"connecting clients"
|
||||
);
|
||||
|
||||
packet.header = Header {
|
||||
kind: PacketKind::Reject.raw(),
|
||||
length: 4,
|
||||
@ -278,17 +259,13 @@ pub async fn connection_handler(
|
||||
.take()
|
||||
.expect("tried to start rejector twice"),
|
||||
packet,
|
||||
)?;
|
||||
);
|
||||
}
|
||||
|
||||
stream.set_nodelay(true)?;
|
||||
client.set_nodelay(true)?;
|
||||
caller.set_nodelay(true)?;
|
||||
|
||||
let _ = timeout(
|
||||
CALL_TIMEOUT,
|
||||
tokio::io::copy_bidirectional(stream, &mut client),
|
||||
)
|
||||
.await;
|
||||
let _ = timeout(CALL_TIMEOUT, tokio::io::copy_bidirectional(client, caller)).await;
|
||||
|
||||
{
|
||||
let mut port_handler = port_handler.lock().await;
|
||||
@ -307,7 +284,7 @@ pub async fn connection_handler(
|
||||
packet.data.push(0);
|
||||
packet.header = Header {
|
||||
kind: PacketKind::Reject.raw(),
|
||||
length: packet.data.len() as u8,
|
||||
length: packet.data.len().try_into().unwrap(),
|
||||
};
|
||||
})
|
||||
.await?;
|
||||
@ -316,6 +293,109 @@ pub async fn connection_handler(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
kind => bail!("unexpected packet: {:?}", kind),
|
||||
/// # Errors
|
||||
/// - the connection to the client or the caller is interupted
|
||||
/// - the clients sends unexpected or malformed packets
|
||||
/// - accepting a tcp connection fails
|
||||
/// - settings tcp socket properties fails
|
||||
/// - the client authentication fails
|
||||
#[instrument(skip_all)]
|
||||
pub async fn handler(
|
||||
client: &mut TcpStream,
|
||||
addr: SocketAddr,
|
||||
config: &Config,
|
||||
handler_metadata: &mut HandlerMetadata,
|
||||
port_handler: &Mutex<PortHandler>,
|
||||
) -> eyre::Result<()> {
|
||||
let (mut reader, mut writer) = client.split();
|
||||
|
||||
let mut packet = Packet::default();
|
||||
|
||||
let Ok(res) = timeout(AUTH_TIMEOUT, packet.recv_into_cancelation_safe(&mut reader)).await else {
|
||||
writer.write_all(REJECT_TIMEOUT).await?;
|
||||
return Ok(());
|
||||
};
|
||||
res?;
|
||||
|
||||
let RemConnect { number, pin } = packet.as_rem_connect()?;
|
||||
|
||||
handler_metadata.number = Some(number);
|
||||
|
||||
let Some(port) = authenticate(config, port_handler, handler_metadata, number, pin).await? else {
|
||||
writer.write_all(REJECT_OOP).await?;
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
info!(%addr, number, port, "authenticated");
|
||||
|
||||
let Some(listener) = handler_metadata.listener.as_mut() else {
|
||||
unreachable!("client sucessfully authenticated but did not set handler_metadata.listener");
|
||||
};
|
||||
|
||||
packet.header = Header {
|
||||
kind: PacketKind::RemConfirm.raw(),
|
||||
length: 0,
|
||||
};
|
||||
packet.data.clear();
|
||||
packet.send(&mut writer).await?;
|
||||
|
||||
let Some(idle_result) = idle(
|
||||
listener,
|
||||
packet,
|
||||
&mut reader,
|
||||
&mut writer,
|
||||
).await? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let Some((mut caller, mut packet)) = notify_or_disconnect(idle_result, handler_metadata, port_handler, port, &mut writer).await? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let notify_at = Instant::now();
|
||||
|
||||
loop {
|
||||
let recv = timeout(
|
||||
CALL_ACK_TIMEOUT.saturating_sub(notify_at.elapsed()),
|
||||
packet.recv_into_cancelation_safe(&mut reader),
|
||||
);
|
||||
|
||||
let Ok(res) = recv.await else {
|
||||
writer.write_all(REJECT_TIMEOUT).await?;
|
||||
return Ok(());
|
||||
};
|
||||
res?;
|
||||
|
||||
match packet.kind() {
|
||||
PacketKind::Ping => {}
|
||||
PacketKind::End | PacketKind::Reject => {
|
||||
port_handler.lock().await.start_rejector(
|
||||
port,
|
||||
handler_metadata
|
||||
.listener
|
||||
.take()
|
||||
.expect("tried to start rejector twice"),
|
||||
packet,
|
||||
);
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
PacketKind::RemAck => {
|
||||
connect(
|
||||
packet,
|
||||
port_handler,
|
||||
port,
|
||||
handler_metadata,
|
||||
client,
|
||||
&mut caller,
|
||||
)
|
||||
.await?;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
kind => return Err(eyre!("unexpected packet: {:?}", kind)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
357
src/main.rs
357
src/main.rs
@ -1,3 +1,6 @@
|
||||
#![warn(clippy::pedantic)]
|
||||
// #![allow(clippy::missing_errors_doc)]
|
||||
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
fs::File,
|
||||
@ -15,17 +18,15 @@ use serde::{Deserialize, Deserializer};
|
||||
use time::format_description::OwnedFormatItem;
|
||||
use tokio::{
|
||||
io::AsyncWriteExt,
|
||||
net::TcpListener,
|
||||
net::{TcpListener, TcpStream},
|
||||
sync::Mutex,
|
||||
time::{sleep, Instant},
|
||||
};
|
||||
use tracing::{error, info, warn, Level};
|
||||
use tracing::{error, info, instrument, warn, Level};
|
||||
use tracing_subscriber::fmt::time::FormatTime;
|
||||
|
||||
use crate::{
|
||||
client::connection_handler,
|
||||
ports::{AllowedPorts, PortHandler, PortStatus},
|
||||
};
|
||||
use crate::{constants::CACHE_STORE_INTERVAL, packets::PacketKind};
|
||||
use crate::packets::PacketKind;
|
||||
use crate::ports::{cache_daemon, AllowedList, PortHandler, PortStatus};
|
||||
|
||||
pub mod auth;
|
||||
pub mod client;
|
||||
@ -41,11 +42,14 @@ type UnixTimestamp = u64;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct Config {
|
||||
allowed_ports: AllowedPorts,
|
||||
allowed_ports: AllowedList,
|
||||
|
||||
#[serde(deserialize_with = "parse_socket_addr")]
|
||||
listen_addr: SocketAddr,
|
||||
|
||||
#[serde(deserialize_with = "parse_socket_addr")]
|
||||
dyn_ip_server: SocketAddr,
|
||||
|
||||
#[cfg(feature = "debug_server")]
|
||||
#[serde(deserialize_with = "maybe_parse_socket_addr")]
|
||||
#[serde(default)]
|
||||
@ -109,25 +113,28 @@ impl Config {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "tokio_console"))]
|
||||
#[track_caller]
|
||||
fn spawn<T: Send + 'static>(
|
||||
_name: &str,
|
||||
future: impl Future<Output = T> + Send + 'static,
|
||||
) -> tokio::task::JoinHandle<T> {
|
||||
tokio::spawn(future)
|
||||
}
|
||||
|
||||
#[cfg(feature = "tokio_console")]
|
||||
#[track_caller]
|
||||
fn spawn<T: Send + 'static>(
|
||||
name: &str,
|
||||
future: impl Future<Output = T> + Send + 'static,
|
||||
) -> tokio::task::JoinHandle<T> {
|
||||
tokio::task::Builder::new()
|
||||
use tracing::Instrument;
|
||||
|
||||
let future = future.instrument(tracing::span!(
|
||||
Level::TRACE,
|
||||
"spawn",
|
||||
name = name,
|
||||
caller = %std::panic::Location::caller().to_string()
|
||||
));
|
||||
|
||||
#[cfg(feature = "tokio_console")]
|
||||
return tokio::task::Builder::new()
|
||||
.name(name)
|
||||
.spawn(future)
|
||||
.unwrap_or_else(|err| panic!("failed to spawn {name:?}: {err:?}"))
|
||||
.unwrap_or_else(|err| panic!("failed to spawn {name:?}: {err:?}"));
|
||||
|
||||
#[cfg(not(feature = "tokio_console"))]
|
||||
return tokio::spawn(future);
|
||||
}
|
||||
|
||||
static TIME_ZONE_OFFSET: once_cell::sync::OnceCell<time::UtcOffset> =
|
||||
@ -135,13 +142,165 @@ static TIME_ZONE_OFFSET: once_cell::sync::OnceCell<time::UtcOffset> =
|
||||
|
||||
static TIME_FORMAT: once_cell::sync::OnceCell<OwnedFormatItem> = once_cell::sync::OnceCell::new();
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
let config = Arc::new(Config::load("config.json")?);
|
||||
fn setup_tracing(config: &Config) {
|
||||
use tracing::Subscriber;
|
||||
use tracing_error::ErrorLayer;
|
||||
use tracing_subscriber::prelude::*;
|
||||
use tracing_subscriber::{
|
||||
filter,
|
||||
fmt::{self, FormatEvent, FormatFields},
|
||||
registry::LookupSpan,
|
||||
};
|
||||
|
||||
if config.allowed_ports.is_empty() {
|
||||
panic!("no allowed ports");
|
||||
struct EventFormater;
|
||||
impl<S, N> FormatEvent<S, N> for EventFormater
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
N: for<'a> FormatFields<'a> + 'static,
|
||||
{
|
||||
fn format_event(
|
||||
&self,
|
||||
ctx: &fmt::FmtContext<'_, S, N>,
|
||||
mut writer: fmt::format::Writer<'_>,
|
||||
event: &tracing::Event<'_>,
|
||||
) -> std::fmt::Result {
|
||||
use color_eyre::owo_colors::OwoColorize;
|
||||
|
||||
let meta = event.metadata();
|
||||
|
||||
fmt::time::OffsetTime::new(
|
||||
*TIME_ZONE_OFFSET.get().unwrap(),
|
||||
TIME_FORMAT.get().unwrap(),
|
||||
)
|
||||
.format_time(&mut writer)?;
|
||||
|
||||
// TODO: check writer.has_ansi_escapes()
|
||||
|
||||
let level = *meta.level();
|
||||
match level {
|
||||
Level::TRACE => write!(writer, " {:>5} ", level.purple())?,
|
||||
Level::DEBUG => write!(writer, " {:>5} ", level.cyan())?,
|
||||
Level::INFO => write!(writer, " {:>5} ", level.green())?,
|
||||
Level::WARN => write!(writer, " {:>5} ", level.yellow())?,
|
||||
Level::ERROR => write!(writer, " {:>5} ", level.red())?,
|
||||
}
|
||||
|
||||
write!(writer, "{:17}{}", meta.target().dimmed(), ":".bold())?;
|
||||
|
||||
/*
|
||||
if let Some(filename) = meta.file() {
|
||||
write!(writer, " {}{}", filename.bold(), ":".dimmed())?;
|
||||
}
|
||||
if let Some(line_number) = meta.line() {
|
||||
write!(writer, "{}{}", line_number.bold(), ":".dimmed())?;
|
||||
}
|
||||
*/
|
||||
|
||||
writer.write_char(' ')?;
|
||||
|
||||
ctx.format_fields(writer.by_ref(), event)?;
|
||||
|
||||
writeln!(writer)
|
||||
}
|
||||
}
|
||||
|
||||
// build a `Subscriber` by combining layers with a
|
||||
// `tracing_subscriber::Registry`:
|
||||
let registry = tracing_subscriber::registry();
|
||||
|
||||
#[cfg(feature = "tokio_console")]
|
||||
let registry = registry.with(console_subscriber::spawn());
|
||||
|
||||
registry
|
||||
.with(ErrorLayer::default())
|
||||
.with(
|
||||
fmt::layer()
|
||||
.with_target(true)
|
||||
.event_format(EventFormater)
|
||||
.with_filter(filter::LevelFilter::from_level(config.log_level))
|
||||
.with_filter(tracing_subscriber::filter::filter_fn(|meta| {
|
||||
meta.target().starts_with(env!("CARGO_CRATE_NAME"))
|
||||
})),
|
||||
)
|
||||
.init();
|
||||
}
|
||||
|
||||
#[instrument(skip(stream, config, port_handler))]
|
||||
async fn connection_handler(
|
||||
mut stream: TcpStream,
|
||||
addr: SocketAddr,
|
||||
config: Arc<Config>,
|
||||
port_handler: Arc<Mutex<PortHandler>>,
|
||||
) {
|
||||
use futures::future::FutureExt;
|
||||
|
||||
let mut handler_metadata = HandlerMetadata::default();
|
||||
|
||||
let res = std::panic::AssertUnwindSafe(client::handler(
|
||||
&mut stream,
|
||||
addr,
|
||||
&config,
|
||||
&mut handler_metadata,
|
||||
&port_handler,
|
||||
))
|
||||
.catch_unwind()
|
||||
.await;
|
||||
|
||||
let error = match res {
|
||||
Err(_) => Some("internal server error".to_owned()),
|
||||
Ok(Err(err)) => Some(err.to_string()),
|
||||
Ok(Ok(())) => None,
|
||||
};
|
||||
|
||||
if let Some(error) = error {
|
||||
error!(%addr, %error, "Client had an error");
|
||||
|
||||
let mut packet = Packet::default();
|
||||
|
||||
packet.data.extend_from_slice(error.as_bytes());
|
||||
packet.data.truncate((u8::MAX - 1) as usize);
|
||||
packet.data.push(0);
|
||||
packet.header = Header {
|
||||
kind: PacketKind::Error.raw(),
|
||||
length: packet.data.len().try_into().unwrap(), // this will never fail, as we just truncated the vector
|
||||
};
|
||||
|
||||
let (_, mut writer) = stream.split();
|
||||
let _ = packet.send(&mut writer).await;
|
||||
}
|
||||
|
||||
if let Some(port) = handler_metadata.port {
|
||||
let mut port_handler = port_handler.lock().await;
|
||||
|
||||
if let Some(port_state) = port_handler.port_state.get_mut(&port) {
|
||||
port_state.new_state(PortStatus::Disconnected);
|
||||
port_handler.register_update();
|
||||
}
|
||||
|
||||
if let Some(listener) = handler_metadata.listener.take() {
|
||||
port_handler.start_rejector(
|
||||
port,
|
||||
listener,
|
||||
Packet {
|
||||
header: Header {
|
||||
kind: PacketKind::Reject.raw(),
|
||||
length: 3,
|
||||
},
|
||||
data: b"nc\0".to_vec(),
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
sleep(Duration::from_secs(3)).await;
|
||||
let _ = stream.shutdown().await;
|
||||
}
|
||||
|
||||
fn main() -> eyre::Result<()> {
|
||||
color_eyre::install()?;
|
||||
|
||||
let config = Arc::new(Config::load("config.json")?);
|
||||
|
||||
TIME_FORMAT.set(config.time_format.clone()).unwrap();
|
||||
|
||||
// we need to get this while still single threaded
|
||||
@ -151,76 +310,30 @@ fn main() -> anyhow::Result<()> {
|
||||
.set(time::UtcOffset::current_local_offset()?)
|
||||
.unwrap();
|
||||
|
||||
assert!(!config.allowed_ports.is_empty(), "no allowed ports");
|
||||
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()?
|
||||
.block_on(async move {
|
||||
{
|
||||
use tracing_subscriber::prelude::*;
|
||||
use tracing_subscriber::*;
|
||||
|
||||
// build a `Subscriber` by combining layers with a
|
||||
// `tracing_subscriber::Registry`:
|
||||
let registry = tracing_subscriber::registry();
|
||||
|
||||
#[cfg(feature = "tokio_console")]
|
||||
let registry = registry.with(console_subscriber::spawn());
|
||||
|
||||
registry
|
||||
.with(
|
||||
fmt::layer()
|
||||
.with_target(true)
|
||||
.with_timer(fmt::time::OffsetTime::new(
|
||||
*TIME_ZONE_OFFSET.get().unwrap(),
|
||||
TIME_FORMAT.get().unwrap(),
|
||||
))
|
||||
.with_filter(filter::LevelFilter::from_level(config.log_level))
|
||||
.with_filter(tracing_subscriber::filter::filter_fn(|meta| {
|
||||
meta.target().starts_with("centralex")
|
||||
})),
|
||||
)
|
||||
.init();
|
||||
.block_on(tokio_main(config))
|
||||
}
|
||||
|
||||
async fn tokio_main(config: Arc<Config>) -> eyre::Result<()> {
|
||||
setup_tracing(&config);
|
||||
|
||||
let cache_path = PathBuf::from("cache.json");
|
||||
|
||||
let (change_sender, mut change_receiver) = tokio::sync::watch::channel(Instant::now());
|
||||
let (change_sender, change_receiver) = tokio::sync::watch::channel(Instant::now());
|
||||
|
||||
let mut port_handler = PortHandler::load_or_default(&cache_path, change_sender);
|
||||
port_handler.update_allowed_ports(&config.allowed_ports);
|
||||
|
||||
let port_handler = Arc::new(Mutex::new(port_handler));
|
||||
|
||||
{
|
||||
let port_handler = port_handler.clone();
|
||||
spawn("cache daemon", async move {
|
||||
let mut last_store = Instant::now() - 2 * CACHE_STORE_INTERVAL;
|
||||
let mut change_timeout = None;
|
||||
loop {
|
||||
if let Some(change_timeout) = change_timeout.take() {
|
||||
tokio::time::timeout(change_timeout, change_receiver.changed())
|
||||
.await
|
||||
.unwrap_or(Ok(()))
|
||||
} else {
|
||||
change_receiver.changed().await
|
||||
}
|
||||
.expect("failed to wait for cache changes");
|
||||
|
||||
let time_since_last_store = last_store.elapsed();
|
||||
|
||||
if time_since_last_store >= CACHE_STORE_INTERVAL {
|
||||
let port_handler = port_handler.lock().await;
|
||||
|
||||
last_store = Instant::now();
|
||||
if let Err(err) = port_handler.store(&cache_path) {
|
||||
error!("failed to store cache: {err:?}");
|
||||
}
|
||||
} else {
|
||||
change_timeout = Some(CACHE_STORE_INTERVAL - time_since_last_store);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
spawn(
|
||||
"cache daemon",
|
||||
cache_daemon(port_handler.clone(), cache_path, change_receiver),
|
||||
);
|
||||
|
||||
#[cfg(feature = "debug_server")]
|
||||
if let Some(listen_addr) = config.debug_server_addr {
|
||||
@ -237,90 +350,16 @@ fn main() -> anyhow::Result<()> {
|
||||
"centralex server listening"
|
||||
);
|
||||
|
||||
while let Ok((mut stream, addr)) = listener.accept().await {
|
||||
while let Ok((stream, addr)) = listener.accept().await {
|
||||
info!(%addr, "new connection");
|
||||
|
||||
let port_handler = port_handler.clone();
|
||||
let config = config.clone();
|
||||
|
||||
let mut handler_metadata = HandlerMetadata::default();
|
||||
|
||||
spawn(&format!("connection to {addr}"), async move {
|
||||
use futures::future::FutureExt;
|
||||
|
||||
let res = std::panic::AssertUnwindSafe(connection_handler(
|
||||
&config,
|
||||
&mut handler_metadata,
|
||||
&port_handler,
|
||||
&mut stream,
|
||||
))
|
||||
.catch_unwind()
|
||||
.await;
|
||||
|
||||
let error = match res {
|
||||
Err(err) => {
|
||||
let err = err
|
||||
.downcast::<String>()
|
||||
.map(|err| *err)
|
||||
.unwrap_or_else(|_| "?".to_owned());
|
||||
|
||||
Some(format!("panic at: {err}"))
|
||||
}
|
||||
Ok(Err(err)) => Some(err.to_string()),
|
||||
Ok(Ok(())) => None,
|
||||
};
|
||||
|
||||
if let Some(error) = error {
|
||||
error!(%addr, %error, "Client had an error");
|
||||
|
||||
let mut packet = Packet::default();
|
||||
|
||||
packet.data.extend_from_slice(error.as_bytes());
|
||||
packet.data.truncate((u8::MAX - 1) as usize);
|
||||
packet.data.push(0);
|
||||
packet.header = Header {
|
||||
kind: PacketKind::Error.raw(),
|
||||
length: packet.data.len() as u8,
|
||||
};
|
||||
|
||||
let (_, mut writer) = stream.split();
|
||||
let _ = packet.send(&mut writer).await;
|
||||
}
|
||||
|
||||
if let Some(port) = handler_metadata.port {
|
||||
let mut port_handler = port_handler.lock().await;
|
||||
|
||||
if let Some(port_state) = port_handler.port_state.get_mut(&port) {
|
||||
port_state.new_state(PortStatus::Disconnected);
|
||||
port_handler.register_update();
|
||||
}
|
||||
|
||||
if let Some(listener) = handler_metadata.listener.take() {
|
||||
let res = port_handler.start_rejector(
|
||||
port,
|
||||
listener,
|
||||
Packet {
|
||||
header: Header {
|
||||
kind: PacketKind::Reject.raw(),
|
||||
length: 3,
|
||||
},
|
||||
data: b"nc\0".to_vec(),
|
||||
},
|
||||
spawn(
|
||||
&format!("connection to {addr}"),
|
||||
connection_handler(stream, addr, config.clone(), port_handler.clone()),
|
||||
);
|
||||
|
||||
if let Err(error) = res {
|
||||
error!(%port, %error, "failed to start rejector");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sleep(Duration::from_secs(3)).await;
|
||||
let _ = stream.shutdown().await;
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
|
@ -1,7 +1,7 @@
|
||||
use std::fmt::Debug;
|
||||
|
||||
use anyhow::bail;
|
||||
use bytemuck::{Pod, Zeroable};
|
||||
use eyre::eyre;
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
net::tcp::{ReadHalf, WriteHalf},
|
||||
@ -26,7 +26,9 @@ pub enum PacketKind {
|
||||
Error = 0xff,
|
||||
}
|
||||
|
||||
#[allow(clippy::enum_glob_use)]
|
||||
impl PacketKind {
|
||||
#[must_use]
|
||||
fn from_u8(raw: u8) -> Self {
|
||||
use PacketKind::*;
|
||||
|
||||
@ -45,6 +47,7 @@ impl PacketKind {
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn raw(&self) -> u8 {
|
||||
use PacketKind::*;
|
||||
|
||||
@ -105,12 +108,14 @@ pub struct RemConnect {
|
||||
}
|
||||
|
||||
impl Packet {
|
||||
#[allow(clippy::missing_errors_doc)]
|
||||
pub async fn peek_packet_kind(stream: &mut ReadHalf<'_>) -> std::io::Result<PacketKind> {
|
||||
Self::peek_packet_kind_raw(stream)
|
||||
.await
|
||||
.map(PacketKind::from_u8)
|
||||
}
|
||||
|
||||
#[allow(clippy::missing_errors_doc)]
|
||||
pub async fn peek_packet_kind_raw(stream: &mut ReadHalf<'_>) -> std::io::Result<u8> {
|
||||
let mut kind = 0;
|
||||
let n = stream.peek(std::slice::from_mut(&mut kind)).await?;
|
||||
@ -122,6 +127,7 @@ impl Packet {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::missing_errors_doc)]
|
||||
pub async fn recv_into_cancelation_safe(
|
||||
&mut self,
|
||||
stream: &mut ReadHalf<'_>,
|
||||
@ -136,6 +142,7 @@ impl Packet {
|
||||
self.recv_into(stream).await
|
||||
}
|
||||
|
||||
#[allow(clippy::missing_errors_doc)]
|
||||
pub async fn recv_into(&mut self, stream: &mut ReadHalf<'_>) -> std::io::Result<()> {
|
||||
let header_bytes = bytemuck::bytes_of_mut(&mut self.header);
|
||||
|
||||
@ -148,26 +155,33 @@ impl Packet {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::missing_errors_doc)]
|
||||
pub async fn send(&self, stream: &mut WriteHalf<'_>) -> std::io::Result<()> {
|
||||
stream.write_all(bytemuck::bytes_of(&self.header)).await?;
|
||||
stream.write_all(&self.data).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn kind(&self) -> PacketKind {
|
||||
PacketKind::from_u8(self.header.kind)
|
||||
}
|
||||
|
||||
pub fn as_rem_connect(&self) -> anyhow::Result<RemConnect> {
|
||||
/// # Errors
|
||||
/// the packet must be a `RemConnect` packet and must contain at least 6 bytes of data
|
||||
pub fn as_rem_connect(&self) -> eyre::Result<RemConnect> {
|
||||
if self.kind() != PacketKind::RemConnect {
|
||||
bail!("Unexpected Packet: {:?} expected RemConnect", self.kind());
|
||||
return Err(eyre!(
|
||||
"Unexpected Packet: {:?} expected RemConnect",
|
||||
self.kind()
|
||||
));
|
||||
}
|
||||
|
||||
if self.data.len() < 6 {
|
||||
bail!(
|
||||
return Err(eyre!(
|
||||
"Too little data for RemConnect. Need at least 6 Bytes got {}",
|
||||
self.data.len()
|
||||
);
|
||||
));
|
||||
}
|
||||
|
||||
Ok(RemConnect {
|
||||
|
138
src/ports.rs
138
src/ports.rs
@ -5,18 +5,23 @@ use std::{
|
||||
fs::File,
|
||||
io::{BufReader, BufWriter},
|
||||
ops::RangeInclusive,
|
||||
path::Path,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
time::{Duration, SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
|
||||
use anyhow::anyhow;
|
||||
use eyre::eyre;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::{net::TcpListener, sync::Mutex, task::JoinHandle, time::Instant};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tokio::{
|
||||
net::TcpListener,
|
||||
sync::{watch::Receiver, Mutex},
|
||||
task::JoinHandle,
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
|
||||
use crate::{
|
||||
constants::{PORT_OWNERSHIP_TIMEOUT, PORT_RETRY_TIME},
|
||||
constants::{CACHE_STORE_INTERVAL, PORT_OWNERSHIP_TIMEOUT, PORT_RETRY_TIME},
|
||||
packets::Packet,
|
||||
spawn, Config, Number, Port, UnixTimestamp, TIME_FORMAT, TIME_ZONE_OFFSET,
|
||||
};
|
||||
@ -32,7 +37,7 @@ pub struct PortHandler {
|
||||
#[serde(skip)]
|
||||
port_guards: HashMap<Port, Rejector>,
|
||||
|
||||
allowed_ports: AllowedPorts,
|
||||
allowed_ports: AllowedList,
|
||||
|
||||
#[serde(skip)]
|
||||
free_ports: HashSet<Port>,
|
||||
@ -42,6 +47,39 @@ pub struct PortHandler {
|
||||
pub port_state: HashMap<Port, PortState>,
|
||||
}
|
||||
|
||||
#[instrument(skip(port_handler, change_receiver))]
|
||||
pub async fn cache_daemon(
|
||||
port_handler: Arc<Mutex<PortHandler>>,
|
||||
cache_path: PathBuf,
|
||||
mut change_receiver: Receiver<Instant>,
|
||||
) {
|
||||
let mut last_store = Instant::now() - 2 * CACHE_STORE_INTERVAL;
|
||||
let mut change_timeout = None;
|
||||
loop {
|
||||
if let Some(change_timeout) = change_timeout.take() {
|
||||
tokio::time::timeout(change_timeout, change_receiver.changed())
|
||||
.await
|
||||
.unwrap_or(Ok(()))
|
||||
} else {
|
||||
change_receiver.changed().await
|
||||
}
|
||||
.expect("failed to wait for cache changes");
|
||||
|
||||
let time_since_last_store = last_store.elapsed();
|
||||
|
||||
if time_since_last_store >= CACHE_STORE_INTERVAL {
|
||||
let port_handler = port_handler.lock().await;
|
||||
|
||||
last_store = Instant::now();
|
||||
if let Err(err) = port_handler.store(&cache_path) {
|
||||
error!("failed to store cache: {err:?}");
|
||||
}
|
||||
} else {
|
||||
change_timeout = Some(CACHE_STORE_INTERVAL - time_since_last_store);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Hash, PartialEq, Eq)]
|
||||
struct DisplayAsDebug<T: Display>(T);
|
||||
impl<T: Display> Debug for DisplayAsDebug<T> {
|
||||
@ -60,16 +98,18 @@ fn duration_in_hours(duration: Duration) -> String {
|
||||
match (hours > 0, minutes > 0) {
|
||||
(true, _) => format!("{hours}h {minutes}min {seconds}s"),
|
||||
(false, true) => format!("{minutes}min {seconds}s"),
|
||||
_ => format!("{:.0?}", duration),
|
||||
_ => format!("{duration:.0?}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn format_instant(instant: Instant) -> String {
|
||||
let when = duration_in_hours(instant.elapsed()) + " ago";
|
||||
|
||||
(|| -> anyhow::Result<_> {
|
||||
(|| -> eyre::Result<_> {
|
||||
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)? - instant.elapsed();
|
||||
let date = time::OffsetDateTime::from_unix_timestamp(timestamp.as_secs() as i64)?
|
||||
let date = time::OffsetDateTime::from_unix_timestamp(
|
||||
timestamp.as_secs().try_into().expect("timestamp overflow"),
|
||||
)?
|
||||
.to_offset(*TIME_ZONE_OFFSET.get().unwrap())
|
||||
.format(TIME_FORMAT.get().unwrap())?;
|
||||
|
||||
@ -93,7 +133,7 @@ impl Debug for PortHandler {
|
||||
|
||||
let mut free_ports = self.free_ports.iter().copied().collect::<Vec<u16>>();
|
||||
|
||||
free_ports.sort();
|
||||
free_ports.sort_unstable();
|
||||
|
||||
let mut free_ports = free_ports
|
||||
.into_iter()
|
||||
@ -123,8 +163,6 @@ impl Debug for PortHandler {
|
||||
.allocated_ports
|
||||
.iter()
|
||||
.map(|(&number, &port)| {
|
||||
let state = &self.port_state[&port];
|
||||
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
struct State {
|
||||
@ -134,6 +172,8 @@ impl Debug for PortHandler {
|
||||
last_change: DisplayAsDebug<String>,
|
||||
}
|
||||
|
||||
let state = &self.port_state[&port];
|
||||
|
||||
State {
|
||||
state: state.status,
|
||||
number,
|
||||
@ -145,7 +185,7 @@ impl Debug for PortHandler {
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
allocated_ports.sort_by(|a, b| {
|
||||
allocated_ports.sort_unstable_by(|a, b| {
|
||||
a.state.cmp(&b.state).then(
|
||||
self.port_state[&a.port]
|
||||
.last_change
|
||||
@ -157,10 +197,10 @@ impl Debug for PortHandler {
|
||||
writeln!(f, "last update: {last_update}")?;
|
||||
writeln!(f, "rejectors: {:#?}", self.port_guards)?;
|
||||
writeln!(f, "allowed ports: {:?}", self.allowed_ports.0)?;
|
||||
writeln!(f, "free ports: {:?}", free_ports)?;
|
||||
writeln!(f, "free ports: {free_ports:?}")?;
|
||||
|
||||
writeln!(f, "errored ports: {:#?}", errored_ports)?;
|
||||
writeln!(f, "allocated ports: {:#?}", allocated_ports)?;
|
||||
writeln!(f, "errored ports: {errored_ports:#?}")?;
|
||||
writeln!(f, "allocated ports: {allocated_ports:#?}")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -189,7 +229,7 @@ impl PortState {
|
||||
pub fn new_state(&mut self, status: PortStatus) {
|
||||
self.last_change = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.expect("timestamp overflow")
|
||||
.as_secs();
|
||||
|
||||
self.status = status;
|
||||
@ -210,18 +250,21 @@ impl Default for PortStatus {
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
|
||||
pub struct AllowedPorts(Vec<RangeInclusive<u16>>);
|
||||
pub struct AllowedList(Vec<RangeInclusive<u16>>);
|
||||
|
||||
impl AllowedPorts {
|
||||
impl AllowedList {
|
||||
#[must_use]
|
||||
pub fn is_allowed(&self, port: Port) -> bool {
|
||||
self.0.iter().any(|range| range.contains(&port))
|
||||
}
|
||||
#[must_use]
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.0.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
impl PortHandler {
|
||||
#[must_use]
|
||||
pub fn status_string(&self) -> String {
|
||||
format!("{self:#?}\n")
|
||||
}
|
||||
@ -236,7 +279,9 @@ impl PortHandler {
|
||||
.expect("failed to notify cache writer");
|
||||
}
|
||||
|
||||
pub fn store(&self, cache: &Path) -> anyhow::Result<()> {
|
||||
#[allow(clippy::missing_errors_doc)]
|
||||
#[instrument(skip(self))]
|
||||
pub fn store(&self, cache: &Path) -> std::io::Result<()> {
|
||||
debug!("storing cache");
|
||||
let temp_file = cache.with_extension(".temp");
|
||||
|
||||
@ -246,6 +291,8 @@ impl PortHandler {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::missing_errors_doc)]
|
||||
#[instrument(skip(change_sender))]
|
||||
pub fn load(
|
||||
cache: &Path,
|
||||
change_sender: tokio::sync::watch::Sender<Instant>,
|
||||
@ -256,6 +303,8 @@ impl PortHandler {
|
||||
Ok(cache)
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
#[instrument(skip(change_sender))]
|
||||
pub fn load_or_default(
|
||||
path: &Path,
|
||||
change_sender: tokio::sync::watch::Sender<Instant>,
|
||||
@ -266,7 +315,7 @@ impl PortHandler {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn update_allowed_ports(&mut self, allowed_ports: &AllowedPorts) {
|
||||
pub fn update_allowed_ports(&mut self, allowed_ports: &AllowedList) {
|
||||
self.register_update();
|
||||
|
||||
self.allowed_ports = allowed_ports.clone();
|
||||
@ -301,43 +350,41 @@ impl PortHandler {
|
||||
});
|
||||
}
|
||||
|
||||
pub fn start_rejector(
|
||||
&mut self,
|
||||
port: Port,
|
||||
listener: TcpListener,
|
||||
packet: Packet,
|
||||
) -> anyhow::Result<()> {
|
||||
#[instrument(skip(self, listener))]
|
||||
pub fn start_rejector(&mut self, port: Port, listener: TcpListener, packet: Packet) {
|
||||
info!(port, ?packet, "starting rejector");
|
||||
|
||||
let port_guard = Rejector::start(listener, packet);
|
||||
|
||||
assert!(
|
||||
self.port_guards.insert(port, port_guard).is_none(),
|
||||
"Tried to start rejector that is already running.
|
||||
This should have been impossible since it requires two listeners on the same port."
|
||||
);
|
||||
Ok(())
|
||||
if self.port_guards.insert(port, port_guard).is_some() {
|
||||
unreachable!("Tried to start rejector that is already running. This should have been impossible since it requires two listeners on the same port.");
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn stop_rejector(&mut self, port: Port) -> Option<(TcpListener, Packet)> {
|
||||
info!(port, "stopping rejector");
|
||||
|
||||
Some(self.port_guards.remove(&port)?.stop().await)
|
||||
}
|
||||
|
||||
/// # Errors
|
||||
/// - the rejector must be running
|
||||
pub async fn change_rejector(
|
||||
&mut self,
|
||||
port: Port,
|
||||
f: impl FnOnce(&mut Packet),
|
||||
) -> anyhow::Result<()> {
|
||||
) -> eyre::Result<()> {
|
||||
let (listener, mut packet) = self
|
||||
.stop_rejector(port)
|
||||
.await
|
||||
.ok_or_else(|| anyhow!("tried to stop rejector that is not running"))?;
|
||||
.ok_or_else(|| eyre!("tried to stop rejector that is not running"))?;
|
||||
|
||||
f(&mut packet);
|
||||
|
||||
self.start_rejector(port, listener, packet)
|
||||
self.start_rejector(port, listener, packet);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@ -355,6 +402,7 @@ impl Debug for Rejector {
|
||||
}
|
||||
|
||||
impl Rejector {
|
||||
#[instrument(skip(listener))]
|
||||
fn start(listener: TcpListener, packet: Packet) -> Self {
|
||||
let port = listener.local_addr().map(|addr| addr.port()).unwrap_or(0);
|
||||
let state = Arc::new((Mutex::new(listener), packet));
|
||||
@ -378,6 +426,7 @@ impl Rejector {
|
||||
|
||||
Self { state, handle }
|
||||
}
|
||||
#[instrument(skip(self))]
|
||||
async fn stop(self) -> (TcpListener, Packet) {
|
||||
self.handle.abort();
|
||||
let _ = self.handle.await;
|
||||
@ -387,13 +436,13 @@ impl Rejector {
|
||||
}
|
||||
|
||||
impl PortHandler {
|
||||
#[instrument(skip(self, config))]
|
||||
pub fn allocate_port_for_number(&mut self, config: &Config, number: Number) -> Option<Port> {
|
||||
let port = if let Some(port) = self.allocated_ports.get(&number) {
|
||||
let already_connected = self
|
||||
.port_state
|
||||
.get(port)
|
||||
.map(|state| state.status != PortStatus::Disconnected)
|
||||
.unwrap_or(false);
|
||||
.map_or(false, |state| state.status != PortStatus::Disconnected);
|
||||
|
||||
if already_connected {
|
||||
None
|
||||
@ -409,7 +458,9 @@ impl PortHandler {
|
||||
self.try_recover_port(config)?
|
||||
};
|
||||
|
||||
assert!(self.allocated_ports.insert(number, port).is_none());
|
||||
if self.allocated_ports.insert(number, port).is_some() {
|
||||
unreachable!("allocated port twice");
|
||||
}
|
||||
Some(port)
|
||||
};
|
||||
|
||||
@ -420,6 +471,7 @@ impl PortHandler {
|
||||
port
|
||||
}
|
||||
|
||||
#[instrument(skip(self, config))]
|
||||
fn try_recover_port(&mut self, config: &Config) -> Option<Port> {
|
||||
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
|
||||
|
||||
@ -463,14 +515,11 @@ impl PortHandler {
|
||||
}
|
||||
|
||||
let removable_entry = self.allocated_ports.iter().find(|(_, port)| {
|
||||
self.port_state
|
||||
.get(port)
|
||||
.map(|port_state| {
|
||||
self.port_state.get(port).map_or(true, |port_state| {
|
||||
port_state.status == PortStatus::Disconnected
|
||||
&& now.saturating_sub(Duration::from_secs(port_state.last_change))
|
||||
>= PORT_OWNERSHIP_TIMEOUT
|
||||
})
|
||||
.unwrap_or(true)
|
||||
});
|
||||
|
||||
if let Some((&old_number, &port)) = removable_entry {
|
||||
@ -483,6 +532,7 @@ impl PortHandler {
|
||||
None // TODO
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub fn mark_port_error(&mut self, number: Number, port: Port) {
|
||||
warn!(port, number, "registering an error on");
|
||||
self.register_update();
|
||||
@ -490,7 +540,7 @@ impl PortHandler {
|
||||
self.errored_ports.insert((
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.expect("timestamp overflow")
|
||||
.as_secs(),
|
||||
port,
|
||||
));
|
||||
|
Loading…
Reference in New Issue
Block a user