change error library
This commit is contained in:
parent
11d37c5b73
commit
9d8124bd5c
66
Cargo.lock
generated
66
Cargo.lock
generated
@ -22,9 +22,6 @@ name = "anyhow"
|
|||||||
version = "1.0.70"
|
version = "1.0.70"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "7de8ce5e0f9f8d88245311066a578d72b7af3e7088f32783804676302df237e4"
|
checksum = "7de8ce5e0f9f8d88245311066a578d72b7af3e7088f32783804676302df237e4"
|
||||||
dependencies = [
|
|
||||||
"backtrace",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "async-stream"
|
name = "async-stream"
|
||||||
@ -179,9 +176,10 @@ checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f"
|
|||||||
name = "centralex"
|
name = "centralex"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
|
||||||
"bytemuck",
|
"bytemuck",
|
||||||
|
"color-eyre",
|
||||||
"console-subscriber",
|
"console-subscriber",
|
||||||
|
"eyre",
|
||||||
"futures",
|
"futures",
|
||||||
"hyper",
|
"hyper",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
@ -190,6 +188,7 @@ dependencies = [
|
|||||||
"time",
|
"time",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
"tracing-error",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
]
|
]
|
||||||
|
|
||||||
@ -199,6 +198,33 @@ version = "1.0.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "color-eyre"
|
||||||
|
version = "0.6.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "5a667583cca8c4f8436db8de46ea8233c42a7d9ae424a82d338f2e4675229204"
|
||||||
|
dependencies = [
|
||||||
|
"backtrace",
|
||||||
|
"color-spantrace",
|
||||||
|
"eyre",
|
||||||
|
"indenter",
|
||||||
|
"once_cell",
|
||||||
|
"owo-colors",
|
||||||
|
"tracing-error",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "color-spantrace"
|
||||||
|
version = "0.2.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1ba75b3d9449ecdccb27ecbc479fdc0b87fa2dd43d2f8298f9bf0e59aacc8dce"
|
||||||
|
dependencies = [
|
||||||
|
"once_cell",
|
||||||
|
"owo-colors",
|
||||||
|
"tracing-core",
|
||||||
|
"tracing-error",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "console-api"
|
name = "console-api"
|
||||||
version = "0.4.0"
|
version = "0.4.0"
|
||||||
@ -269,6 +295,16 @@ version = "1.8.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91"
|
checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "eyre"
|
||||||
|
version = "0.6.8"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "4c2b6b5a29c02cdc822728b7d7b8ae1bab3e3b05d44522770ddd49722eeac7eb"
|
||||||
|
dependencies = [
|
||||||
|
"indenter",
|
||||||
|
"once_cell",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "flate2"
|
name = "flate2"
|
||||||
version = "1.0.25"
|
version = "1.0.25"
|
||||||
@ -490,6 +526,12 @@ dependencies = [
|
|||||||
"tokio-io-timeout",
|
"tokio-io-timeout",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "indenter"
|
||||||
|
version = "0.3.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "indexmap"
|
name = "indexmap"
|
||||||
version = "1.9.2"
|
version = "1.9.2"
|
||||||
@ -659,6 +701,12 @@ version = "0.1.1"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
|
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "owo-colors"
|
||||||
|
version = "3.5.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "percent-encoding"
|
name = "percent-encoding"
|
||||||
version = "2.2.0"
|
version = "2.2.0"
|
||||||
@ -1119,6 +1167,16 @@ dependencies = [
|
|||||||
"valuable",
|
"valuable",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tracing-error"
|
||||||
|
version = "0.2.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d686ec1c0f384b1277f097b2f279a2ecc11afe8c133c1aabf036a27cb4cd206e"
|
||||||
|
dependencies = [
|
||||||
|
"tracing",
|
||||||
|
"tracing-subscriber",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tracing-futures"
|
name = "tracing-futures"
|
||||||
version = "0.2.5"
|
version = "0.2.5"
|
||||||
|
15
Cargo.toml
15
Cargo.toml
@ -3,21 +3,30 @@ name = "centralex"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
|
[profile.release]
|
||||||
|
debug = true
|
||||||
|
|
||||||
|
[profile.dev.package.backtrace]
|
||||||
|
opt-level = 3
|
||||||
|
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
tokio = { version = "1.24.2", features = ["macros", "rt-multi-thread", "net", "io-util", "sync", "time"] }
|
tokio = { version = "1.24.2", features = ["macros", "rt-multi-thread", "net", "io-util", "sync", "time"] }
|
||||||
anyhow = { version = "1.0.68", features = ["backtrace"] }
|
time = { version = "0.3.20", features = ["local-offset", "macros"] }
|
||||||
bytemuck = { version = "1.13.0", features = ["derive"] }
|
bytemuck = { version = "1.13.0", features = ["derive"] }
|
||||||
serde = { version = "1.0.152", features = ["derive"] }
|
serde = { version = "1.0.152", features = ["derive"] }
|
||||||
serde_json = "1.0.91"
|
serde_json = "1.0.91"
|
||||||
hyper = { version = "0.14.24", optional = true, features = ["server", "http1", "tcp"] }
|
hyper = { version = "0.14.24", optional = true, features = ["server", "http1", "tcp"] }
|
||||||
futures = { version = "0.3.27", default-features = false, features = ["std"] }
|
futures = { version = "0.3.27", default-features = false, features = ["std"] }
|
||||||
console-subscriber = { version = "0.1.8", optional = true }
|
|
||||||
tracing = "0.1.37"
|
tracing = "0.1.37"
|
||||||
tracing-subscriber = { version = "0.3.16", features = ["time"] }
|
tracing-subscriber = { version = "0.3.16", features = ["time"] }
|
||||||
time = { version = "0.3.20", features = ["local-offset", "macros"] }
|
console-subscriber = { version = "0.1.8", optional = true }
|
||||||
once_cell = "1.17.1"
|
once_cell = "1.17.1"
|
||||||
|
eyre = "0.6.8"
|
||||||
|
color-eyre = "0.6.2"
|
||||||
|
tracing-error = "0.2.0"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["debug_server", "tokio_console"]
|
default = ["debug_server", "tokio_console"]
|
||||||
|
15
src/auth.rs
15
src/auth.rs
@ -1,19 +1,20 @@
|
|||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
use anyhow::bail;
|
use eyre::eyre;
|
||||||
use tracing::debug;
|
use tracing::{debug, instrument};
|
||||||
|
|
||||||
use crate::packets::{Header, Packet, PacketKind};
|
use crate::packets::{Header, Packet, PacketKind};
|
||||||
|
|
||||||
/// # Errors
|
/// # Errors
|
||||||
/// - the dyn ip server returns a malformed response or is unreachable
|
/// - the dyn ip server returns a malformed response or is unreachable
|
||||||
/// - the authentication fails
|
/// - the authentication fails
|
||||||
|
#[instrument]
|
||||||
pub async fn dyn_ip_update(
|
pub async fn dyn_ip_update(
|
||||||
server: &SocketAddr,
|
server: &SocketAddr,
|
||||||
number: u32,
|
number: u32,
|
||||||
pin: u16,
|
pin: u16,
|
||||||
port: u16,
|
port: u16,
|
||||||
) -> anyhow::Result<std::net::Ipv4Addr> {
|
) -> eyre::Result<std::net::Ipv4Addr> {
|
||||||
debug!(%number, %port, "starting dyn ip update");
|
debug!(%number, %port, "starting dyn ip update");
|
||||||
|
|
||||||
let mut packet = Packet {
|
let mut packet = Packet {
|
||||||
@ -41,7 +42,7 @@ pub async fn dyn_ip_update(
|
|||||||
let result = match packet.kind() {
|
let result = match packet.kind() {
|
||||||
PacketKind::DynIpUpdateResponse => Ok(<[u8; 4]>::try_from(packet.data)
|
PacketKind::DynIpUpdateResponse => Ok(<[u8; 4]>::try_from(packet.data)
|
||||||
.map_err(|err| {
|
.map_err(|err| {
|
||||||
anyhow::anyhow!(
|
eyre!(
|
||||||
"too little data for ip address. Need 4 bytes got {}",
|
"too little data for ip address. Need 4 bytes got {}",
|
||||||
err.len()
|
err.len()
|
||||||
)
|
)
|
||||||
@ -54,13 +55,13 @@ pub async fn dyn_ip_update(
|
|||||||
.enumerate()
|
.enumerate()
|
||||||
.find_map(|(i, x)| (*x == 0).then_some(i));
|
.find_map(|(i, x)| (*x == 0).then_some(i));
|
||||||
|
|
||||||
bail!(
|
return Err(eyre!(
|
||||||
"{}",
|
"{}",
|
||||||
std::str::from_utf8(first_zero.map_or(&packet.data, |i| &packet.data[..i]),)?
|
std::str::from_utf8(first_zero.map_or(&packet.data, |i| &packet.data[..i]),)?
|
||||||
)
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
_ => bail!("server returned unexpected packet"),
|
_ => return Err(eyre!("server returned unexpected packet")),
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!(?result, "finished dyn ip update");
|
debug!(?result, "finished dyn ip update");
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
use anyhow::{anyhow, bail, Context};
|
use eyre::eyre;
|
||||||
use std::{net::SocketAddr, time::Instant};
|
use std::{net::SocketAddr, time::Instant};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::AsyncWriteExt,
|
io::AsyncWriteExt,
|
||||||
@ -10,7 +10,7 @@ use tokio::{
|
|||||||
sync::Mutex,
|
sync::Mutex,
|
||||||
time::{sleep, timeout},
|
time::{sleep, timeout},
|
||||||
};
|
};
|
||||||
use tracing::{info, trace};
|
use tracing::{info, instrument, trace};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
auth::dyn_ip_update,
|
auth::dyn_ip_update,
|
||||||
@ -22,13 +22,14 @@ use crate::{
|
|||||||
|
|
||||||
/// # Errors
|
/// # Errors
|
||||||
/// - the client authentication fails
|
/// - the client authentication fails
|
||||||
|
#[instrument(skip(config, port_handler, handler_metadata))]
|
||||||
async fn authenticate(
|
async fn authenticate(
|
||||||
config: &Config,
|
config: &Config,
|
||||||
port_handler: &Mutex<PortHandler>,
|
port_handler: &Mutex<PortHandler>,
|
||||||
handler_metadata: &mut HandlerMetadata,
|
handler_metadata: &mut HandlerMetadata,
|
||||||
number: u32,
|
number: u32,
|
||||||
pin: u16,
|
pin: u16,
|
||||||
) -> anyhow::Result<Option<u16>> {
|
) -> eyre::Result<Option<u16>> {
|
||||||
let mut authenticated = false;
|
let mut authenticated = false;
|
||||||
loop {
|
loop {
|
||||||
let mut updated_server = false;
|
let mut updated_server = false;
|
||||||
@ -44,9 +45,7 @@ async fn authenticate(
|
|||||||
|
|
||||||
// make sure the client is authenticated before opening any ports
|
// make sure the client is authenticated before opening any ports
|
||||||
if !authenticated {
|
if !authenticated {
|
||||||
let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port)
|
let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port).await?;
|
||||||
.await
|
|
||||||
.context("dy-ip update")?;
|
|
||||||
authenticated = true;
|
authenticated = true;
|
||||||
updated_server = true;
|
updated_server = true;
|
||||||
}
|
}
|
||||||
@ -68,9 +67,7 @@ async fn authenticate(
|
|||||||
// we need to update the server here once a port that can be opened
|
// we need to update the server here once a port that can be opened
|
||||||
// has been found
|
// has been found
|
||||||
if !updated_server {
|
if !updated_server {
|
||||||
let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port)
|
let _ip = dyn_ip_update(&config.dyn_ip_server, number, pin, port).await?;
|
||||||
.await
|
|
||||||
.context("dy-ip update")?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
port_handler.register_update();
|
port_handler.register_update();
|
||||||
@ -101,12 +98,13 @@ enum IdleResult {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip(listener, reader, writer, packet))]
|
||||||
async fn idle(
|
async fn idle(
|
||||||
listener: &mut TcpListener,
|
listener: &mut TcpListener,
|
||||||
mut packet: Packet,
|
mut packet: Packet,
|
||||||
reader: &mut ReadHalf<'_>,
|
reader: &mut ReadHalf<'_>,
|
||||||
writer: &mut WriteHalf<'_>,
|
writer: &mut WriteHalf<'_>,
|
||||||
) -> anyhow::Result<Option<IdleResult>> {
|
) -> eyre::Result<Option<IdleResult>> {
|
||||||
let mut last_ping_sent_at = Instant::now();
|
let mut last_ping_sent_at = Instant::now();
|
||||||
let mut last_ping_received_at = Instant::now();
|
let mut last_ping_received_at = Instant::now();
|
||||||
|
|
||||||
@ -132,8 +130,8 @@ async fn idle(
|
|||||||
let (stream, addr) = caller?;
|
let (stream, addr) = caller?;
|
||||||
break Ok(Some(IdleResult::Caller { packet, stream, addr }))
|
break Ok(Some(IdleResult::Caller { packet, stream, addr }))
|
||||||
},
|
},
|
||||||
_ = Packet::peek_packet_kind( reader) => {
|
_ = Packet::peek_packet_kind(reader) => {
|
||||||
packet.recv_into( reader).await?;
|
packet.recv_into(reader).await?;
|
||||||
|
|
||||||
if packet.kind() == PacketKind::Ping {
|
if packet.kind() == PacketKind::Ping {
|
||||||
trace!("received ping");
|
trace!("received ping");
|
||||||
@ -148,6 +146,7 @@ async fn idle(
|
|||||||
last_ping_sent_at = Instant::now();
|
last_ping_sent_at = Instant::now();
|
||||||
}
|
}
|
||||||
_ = sleep(next_ping_expected_in) => {
|
_ = sleep(next_ping_expected_in) => {
|
||||||
|
|
||||||
writer.write_all(REJECT_TIMEOUT).await?;
|
writer.write_all(REJECT_TIMEOUT).await?;
|
||||||
break Ok(None);
|
break Ok(None);
|
||||||
}
|
}
|
||||||
@ -155,13 +154,14 @@ async fn idle(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip(port_handler, handler_metadata, writer))]
|
||||||
async fn notify_or_disconnect(
|
async fn notify_or_disconnect(
|
||||||
result: IdleResult,
|
result: IdleResult,
|
||||||
handler_metadata: &mut HandlerMetadata,
|
handler_metadata: &mut HandlerMetadata,
|
||||||
port_handler: &Mutex<PortHandler>,
|
port_handler: &Mutex<PortHandler>,
|
||||||
port: u16,
|
port: u16,
|
||||||
writer: &mut WriteHalf<'_>,
|
writer: &mut WriteHalf<'_>,
|
||||||
) -> anyhow::Result<Option<(TcpStream, Packet)>> {
|
) -> eyre::Result<Option<(TcpStream, Packet)>> {
|
||||||
match result {
|
match result {
|
||||||
IdleResult::Disconnect { mut packet } => {
|
IdleResult::Disconnect { mut packet } => {
|
||||||
if matches!(packet.kind(), PacketKind::End | PacketKind::Reject) {
|
if matches!(packet.kind(), PacketKind::End | PacketKind::Reject) {
|
||||||
@ -184,7 +184,7 @@ async fn notify_or_disconnect(
|
|||||||
);
|
);
|
||||||
Ok(None)
|
Ok(None)
|
||||||
} else {
|
} else {
|
||||||
Err(anyhow!("unexpected packet: {:?}", packet.kind()))
|
Err(eyre!("unexpected packet: {:?}", packet.kind()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
IdleResult::Caller {
|
IdleResult::Caller {
|
||||||
@ -213,14 +213,21 @@ async fn notify_or_disconnect(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn print_addr(stream: &TcpStream) -> String {
|
||||||
|
stream
|
||||||
|
.peer_addr()
|
||||||
|
.map_or_else(|_| "?".to_owned(), |addr| format!("{addr}"))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(skip(packet, port_handler, handler_metadata, caller, client), fields(client_addr = print_addr(client), caller_addr = print_addr(caller)))]
|
||||||
async fn connect(
|
async fn connect(
|
||||||
mut packet: Packet,
|
mut packet: Packet,
|
||||||
port_handler: &Mutex<PortHandler>,
|
port_handler: &Mutex<PortHandler>,
|
||||||
port: u16,
|
port: u16,
|
||||||
handler_metadata: &mut HandlerMetadata,
|
handler_metadata: &mut HandlerMetadata,
|
||||||
stream: &mut TcpStream,
|
|
||||||
client: &mut TcpStream,
|
client: &mut TcpStream,
|
||||||
) -> anyhow::Result<()> {
|
caller: &mut TcpStream,
|
||||||
|
) -> eyre::Result<()> {
|
||||||
packet.header = Header {
|
packet.header = Header {
|
||||||
kind: PacketKind::Reject.raw(),
|
kind: PacketKind::Reject.raw(),
|
||||||
length: 4,
|
length: 4,
|
||||||
@ -249,10 +256,10 @@ async fn connect(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
stream.set_nodelay(true)?;
|
|
||||||
client.set_nodelay(true)?;
|
client.set_nodelay(true)?;
|
||||||
|
caller.set_nodelay(true)?;
|
||||||
|
|
||||||
let _ = timeout(CALL_TIMEOUT, tokio::io::copy_bidirectional(stream, client)).await;
|
let _ = timeout(CALL_TIMEOUT, tokio::io::copy_bidirectional(client, caller)).await;
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut port_handler = port_handler.lock().await;
|
let mut port_handler = port_handler.lock().await;
|
||||||
@ -286,14 +293,15 @@ async fn connect(
|
|||||||
/// - accepting a tcp connection fails
|
/// - accepting a tcp connection fails
|
||||||
/// - settings tcp socket properties fails
|
/// - settings tcp socket properties fails
|
||||||
/// - the client authentication fails
|
/// - the client authentication fails
|
||||||
|
#[instrument(skip_all)]
|
||||||
pub async fn handler(
|
pub async fn handler(
|
||||||
stream: &mut TcpStream,
|
client: &mut TcpStream,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
config: &Config,
|
config: &Config,
|
||||||
handler_metadata: &mut HandlerMetadata,
|
handler_metadata: &mut HandlerMetadata,
|
||||||
port_handler: &Mutex<PortHandler>,
|
port_handler: &Mutex<PortHandler>,
|
||||||
) -> anyhow::Result<()> {
|
) -> eyre::Result<()> {
|
||||||
let (mut reader, mut writer) = stream.split();
|
let (mut reader, mut writer) = client.split();
|
||||||
|
|
||||||
let mut packet = Packet::default();
|
let mut packet = Packet::default();
|
||||||
|
|
||||||
@ -334,7 +342,7 @@ pub async fn handler(
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
|
|
||||||
let Some((mut client, mut packet)) = notify_or_disconnect(idle_result, handler_metadata, port_handler, port, &mut writer).await? else {
|
let Some((mut caller, mut packet)) = notify_or_disconnect(idle_result, handler_metadata, port_handler, port, &mut writer).await? else {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -369,12 +377,12 @@ pub async fn handler(
|
|||||||
port_handler,
|
port_handler,
|
||||||
port,
|
port,
|
||||||
handler_metadata,
|
handler_metadata,
|
||||||
stream,
|
client,
|
||||||
&mut client,
|
&mut caller,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
kind => bail!("unexpected packet: {:?}", kind),
|
kind => Err(eyre!("unexpected packet: {:?}", kind)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
111
src/main.rs
111
src/main.rs
@ -22,7 +22,8 @@ use tokio::{
|
|||||||
sync::Mutex,
|
sync::Mutex,
|
||||||
time::{sleep, Instant},
|
time::{sleep, Instant},
|
||||||
};
|
};
|
||||||
use tracing::{error, info, warn, Level};
|
use tracing::{error, info, instrument, warn, Level};
|
||||||
|
use tracing_subscriber::fmt::time::FormatTime;
|
||||||
|
|
||||||
use crate::packets::PacketKind;
|
use crate::packets::PacketKind;
|
||||||
use crate::ports::{cache_daemon, AllowedList, PortHandler, PortStatus};
|
use crate::ports::{cache_daemon, AllowedList, PortHandler, PortStatus};
|
||||||
@ -112,25 +113,28 @@ impl Config {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(feature = "tokio_console"))]
|
|
||||||
#[track_caller]
|
|
||||||
fn spawn<T: Send + 'static>(
|
|
||||||
_name: &str,
|
|
||||||
future: impl Future<Output = T> + Send + 'static,
|
|
||||||
) -> tokio::task::JoinHandle<T> {
|
|
||||||
tokio::spawn(future)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(feature = "tokio_console")]
|
|
||||||
#[track_caller]
|
#[track_caller]
|
||||||
fn spawn<T: Send + 'static>(
|
fn spawn<T: Send + 'static>(
|
||||||
name: &str,
|
name: &str,
|
||||||
future: impl Future<Output = T> + Send + 'static,
|
future: impl Future<Output = T> + Send + 'static,
|
||||||
) -> tokio::task::JoinHandle<T> {
|
) -> tokio::task::JoinHandle<T> {
|
||||||
tokio::task::Builder::new()
|
use tracing::Instrument;
|
||||||
|
|
||||||
|
let future = future.instrument(tracing::span!(
|
||||||
|
Level::TRACE,
|
||||||
|
"spawn",
|
||||||
|
name = name,
|
||||||
|
caller = %std::panic::Location::caller().to_string()
|
||||||
|
));
|
||||||
|
|
||||||
|
#[cfg(feature = "tokio_console")]
|
||||||
|
return tokio::task::Builder::new()
|
||||||
.name(name)
|
.name(name)
|
||||||
.spawn(future)
|
.spawn(future)
|
||||||
.unwrap_or_else(|err| panic!("failed to spawn {name:?}: {err:?}"))
|
.unwrap_or_else(|err| panic!("failed to spawn {name:?}: {err:?}"));
|
||||||
|
|
||||||
|
#[cfg(not(feature = "tokio_console"))]
|
||||||
|
return tokio::spawn(future);
|
||||||
}
|
}
|
||||||
|
|
||||||
static TIME_ZONE_OFFSET: once_cell::sync::OnceCell<time::UtcOffset> =
|
static TIME_ZONE_OFFSET: once_cell::sync::OnceCell<time::UtcOffset> =
|
||||||
@ -139,8 +143,66 @@ static TIME_ZONE_OFFSET: once_cell::sync::OnceCell<time::UtcOffset> =
|
|||||||
static TIME_FORMAT: once_cell::sync::OnceCell<OwnedFormatItem> = once_cell::sync::OnceCell::new();
|
static TIME_FORMAT: once_cell::sync::OnceCell<OwnedFormatItem> = once_cell::sync::OnceCell::new();
|
||||||
|
|
||||||
fn setup_tracing(config: &Config) {
|
fn setup_tracing(config: &Config) {
|
||||||
|
use tracing::Subscriber;
|
||||||
|
use tracing_error::ErrorLayer;
|
||||||
use tracing_subscriber::prelude::*;
|
use tracing_subscriber::prelude::*;
|
||||||
use tracing_subscriber::{filter, fmt};
|
use tracing_subscriber::{
|
||||||
|
filter,
|
||||||
|
fmt::{self, FormatEvent, FormatFields},
|
||||||
|
registry::LookupSpan,
|
||||||
|
};
|
||||||
|
|
||||||
|
struct EventFormater;
|
||||||
|
impl<S, N> FormatEvent<S, N> for EventFormater
|
||||||
|
where
|
||||||
|
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||||
|
N: for<'a> FormatFields<'a> + 'static,
|
||||||
|
{
|
||||||
|
fn format_event(
|
||||||
|
&self,
|
||||||
|
ctx: &fmt::FmtContext<'_, S, N>,
|
||||||
|
mut writer: fmt::format::Writer<'_>,
|
||||||
|
event: &tracing::Event<'_>,
|
||||||
|
) -> std::fmt::Result {
|
||||||
|
use color_eyre::owo_colors::OwoColorize;
|
||||||
|
|
||||||
|
let meta = event.metadata();
|
||||||
|
|
||||||
|
fmt::time::OffsetTime::new(
|
||||||
|
*TIME_ZONE_OFFSET.get().unwrap(),
|
||||||
|
TIME_FORMAT.get().unwrap(),
|
||||||
|
)
|
||||||
|
.format_time(&mut writer)?;
|
||||||
|
|
||||||
|
// TODO: check writer.has_ansi_escapes()
|
||||||
|
|
||||||
|
let level = *meta.level();
|
||||||
|
match level {
|
||||||
|
Level::TRACE => write!(writer, " {:>5} ", level.purple())?,
|
||||||
|
Level::DEBUG => write!(writer, " {:>5} ", level.cyan())?,
|
||||||
|
Level::INFO => write!(writer, " {:>5} ", level.green())?,
|
||||||
|
Level::WARN => write!(writer, " {:>5} ", level.yellow())?,
|
||||||
|
Level::ERROR => write!(writer, " {:>5} ", level.red())?,
|
||||||
|
}
|
||||||
|
|
||||||
|
write!(writer, "{:17}{}", meta.target().dimmed(), ":".bold())?;
|
||||||
|
|
||||||
|
/*
|
||||||
|
if let Some(filename) = meta.file() {
|
||||||
|
write!(writer, " {}{}", filename.bold(), ":".dimmed())?;
|
||||||
|
}
|
||||||
|
if let Some(line_number) = meta.line() {
|
||||||
|
write!(writer, "{}{}", line_number.bold(), ":".dimmed())?;
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
writer.write_char(' ')?;
|
||||||
|
|
||||||
|
ctx.format_fields(writer.by_ref(), event)?;
|
||||||
|
|
||||||
|
writeln!(writer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// build a `Subscriber` by combining layers with a
|
// build a `Subscriber` by combining layers with a
|
||||||
// `tracing_subscriber::Registry`:
|
// `tracing_subscriber::Registry`:
|
||||||
@ -150,13 +212,11 @@ fn setup_tracing(config: &Config) {
|
|||||||
let registry = registry.with(console_subscriber::spawn());
|
let registry = registry.with(console_subscriber::spawn());
|
||||||
|
|
||||||
registry
|
registry
|
||||||
|
.with(ErrorLayer::default())
|
||||||
.with(
|
.with(
|
||||||
fmt::layer()
|
fmt::layer()
|
||||||
.with_target(true)
|
.with_target(true)
|
||||||
.with_timer(fmt::time::OffsetTime::new(
|
.event_format(EventFormater)
|
||||||
*TIME_ZONE_OFFSET.get().unwrap(),
|
|
||||||
TIME_FORMAT.get().unwrap(),
|
|
||||||
))
|
|
||||||
.with_filter(filter::LevelFilter::from_level(config.log_level))
|
.with_filter(filter::LevelFilter::from_level(config.log_level))
|
||||||
.with_filter(tracing_subscriber::filter::filter_fn(|meta| {
|
.with_filter(tracing_subscriber::filter::filter_fn(|meta| {
|
||||||
meta.target().starts_with("centralex")
|
meta.target().starts_with("centralex")
|
||||||
@ -165,6 +225,7 @@ fn setup_tracing(config: &Config) {
|
|||||||
.init();
|
.init();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip(stream, config, port_handler))]
|
||||||
async fn connection_handler(
|
async fn connection_handler(
|
||||||
mut stream: TcpStream,
|
mut stream: TcpStream,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
@ -186,13 +247,7 @@ async fn connection_handler(
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
let error = match res {
|
let error = match res {
|
||||||
Err(err) => {
|
Err(_) => Some("internal server error".to_owned()),
|
||||||
let err = err
|
|
||||||
.downcast::<String>()
|
|
||||||
.map_or_else(|_| "?".to_owned(), |err| *err);
|
|
||||||
|
|
||||||
Some(format!("panic at: {err}"))
|
|
||||||
}
|
|
||||||
Ok(Err(err)) => Some(err.to_string()),
|
Ok(Err(err)) => Some(err.to_string()),
|
||||||
Ok(Ok(())) => None,
|
Ok(Ok(())) => None,
|
||||||
};
|
};
|
||||||
@ -241,7 +296,9 @@ async fn connection_handler(
|
|||||||
let _ = stream.shutdown().await;
|
let _ = stream.shutdown().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() -> anyhow::Result<()> {
|
fn main() -> eyre::Result<()> {
|
||||||
|
color_eyre::install()?;
|
||||||
|
|
||||||
let config = Arc::new(Config::load("config.json")?);
|
let config = Arc::new(Config::load("config.json")?);
|
||||||
|
|
||||||
assert!(!config.allowed_ports.is_empty(), "no allowed ports");
|
assert!(!config.allowed_ports.is_empty(), "no allowed ports");
|
||||||
@ -261,7 +318,7 @@ fn main() -> anyhow::Result<()> {
|
|||||||
.block_on(tokio_main(config))
|
.block_on(tokio_main(config))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn tokio_main(config: Arc<Config>) -> anyhow::Result<()> {
|
async fn tokio_main(config: Arc<Config>) -> eyre::Result<()> {
|
||||||
setup_tracing(&config);
|
setup_tracing(&config);
|
||||||
|
|
||||||
let cache_path = PathBuf::from("cache.json");
|
let cache_path = PathBuf::from("cache.json");
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
|
|
||||||
use anyhow::bail;
|
|
||||||
use bytemuck::{Pod, Zeroable};
|
use bytemuck::{Pod, Zeroable};
|
||||||
|
use eyre::eyre;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::{AsyncReadExt, AsyncWriteExt},
|
io::{AsyncReadExt, AsyncWriteExt},
|
||||||
net::tcp::{ReadHalf, WriteHalf},
|
net::tcp::{ReadHalf, WriteHalf},
|
||||||
@ -169,16 +169,19 @@ impl Packet {
|
|||||||
|
|
||||||
/// # Errors
|
/// # Errors
|
||||||
/// the packet must be a `RemConnect` packet and must contain at least 6 bytes of data
|
/// the packet must be a `RemConnect` packet and must contain at least 6 bytes of data
|
||||||
pub fn as_rem_connect(&self) -> anyhow::Result<RemConnect> {
|
pub fn as_rem_connect(&self) -> eyre::Result<RemConnect> {
|
||||||
if self.kind() != PacketKind::RemConnect {
|
if self.kind() != PacketKind::RemConnect {
|
||||||
bail!("Unexpected Packet: {:?} expected RemConnect", self.kind());
|
return Err(eyre!(
|
||||||
|
"Unexpected Packet: {:?} expected RemConnect",
|
||||||
|
self.kind()
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.data.len() < 6 {
|
if self.data.len() < 6 {
|
||||||
bail!(
|
return Err(eyre!(
|
||||||
"Too little data for RemConnect. Need at least 6 Bytes got {}",
|
"Too little data for RemConnect. Need at least 6 Bytes got {}",
|
||||||
self.data.len()
|
self.data.len()
|
||||||
);
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(RemConnect {
|
Ok(RemConnect {
|
||||||
|
21
src/ports.rs
21
src/ports.rs
@ -10,7 +10,7 @@ use std::{
|
|||||||
time::{Duration, SystemTime, UNIX_EPOCH},
|
time::{Duration, SystemTime, UNIX_EPOCH},
|
||||||
};
|
};
|
||||||
|
|
||||||
use anyhow::anyhow;
|
use eyre::eyre;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
net::TcpListener,
|
net::TcpListener,
|
||||||
@ -18,7 +18,7 @@ use tokio::{
|
|||||||
task::JoinHandle,
|
task::JoinHandle,
|
||||||
time::Instant,
|
time::Instant,
|
||||||
};
|
};
|
||||||
use tracing::{debug, error, info, warn};
|
use tracing::{debug, error, info, instrument, warn};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
constants::{CACHE_STORE_INTERVAL, PORT_OWNERSHIP_TIMEOUT, PORT_RETRY_TIME},
|
constants::{CACHE_STORE_INTERVAL, PORT_OWNERSHIP_TIMEOUT, PORT_RETRY_TIME},
|
||||||
@ -47,6 +47,7 @@ pub struct PortHandler {
|
|||||||
pub port_state: HashMap<Port, PortState>,
|
pub port_state: HashMap<Port, PortState>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip(port_handler, change_receiver))]
|
||||||
pub async fn cache_daemon(
|
pub async fn cache_daemon(
|
||||||
port_handler: Arc<Mutex<PortHandler>>,
|
port_handler: Arc<Mutex<PortHandler>>,
|
||||||
cache_path: PathBuf,
|
cache_path: PathBuf,
|
||||||
@ -104,7 +105,7 @@ fn duration_in_hours(duration: Duration) -> String {
|
|||||||
fn format_instant(instant: Instant) -> String {
|
fn format_instant(instant: Instant) -> String {
|
||||||
let when = duration_in_hours(instant.elapsed()) + " ago";
|
let when = duration_in_hours(instant.elapsed()) + " ago";
|
||||||
|
|
||||||
(|| -> anyhow::Result<_> {
|
(|| -> eyre::Result<_> {
|
||||||
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)? - instant.elapsed();
|
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH)? - instant.elapsed();
|
||||||
let date = time::OffsetDateTime::from_unix_timestamp(
|
let date = time::OffsetDateTime::from_unix_timestamp(
|
||||||
timestamp.as_secs().try_into().expect("timestamp overflow"),
|
timestamp.as_secs().try_into().expect("timestamp overflow"),
|
||||||
@ -279,6 +280,7 @@ impl PortHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::missing_errors_doc)]
|
#[allow(clippy::missing_errors_doc)]
|
||||||
|
#[instrument(skip(self))]
|
||||||
pub fn store(&self, cache: &Path) -> std::io::Result<()> {
|
pub fn store(&self, cache: &Path) -> std::io::Result<()> {
|
||||||
debug!("storing cache");
|
debug!("storing cache");
|
||||||
let temp_file = cache.with_extension(".temp");
|
let temp_file = cache.with_extension(".temp");
|
||||||
@ -290,6 +292,7 @@ impl PortHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::missing_errors_doc)]
|
#[allow(clippy::missing_errors_doc)]
|
||||||
|
#[instrument(skip(change_sender))]
|
||||||
pub fn load(
|
pub fn load(
|
||||||
cache: &Path,
|
cache: &Path,
|
||||||
change_sender: tokio::sync::watch::Sender<Instant>,
|
change_sender: tokio::sync::watch::Sender<Instant>,
|
||||||
@ -301,6 +304,7 @@ impl PortHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[must_use]
|
#[must_use]
|
||||||
|
#[instrument(skip(change_sender))]
|
||||||
pub fn load_or_default(
|
pub fn load_or_default(
|
||||||
path: &Path,
|
path: &Path,
|
||||||
change_sender: tokio::sync::watch::Sender<Instant>,
|
change_sender: tokio::sync::watch::Sender<Instant>,
|
||||||
@ -346,6 +350,7 @@ impl PortHandler {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip(self, listener))]
|
||||||
pub fn start_rejector(&mut self, port: Port, listener: TcpListener, packet: Packet) {
|
pub fn start_rejector(&mut self, port: Port, listener: TcpListener, packet: Packet) {
|
||||||
info!(port, ?packet, "starting rejector");
|
info!(port, ?packet, "starting rejector");
|
||||||
|
|
||||||
@ -356,6 +361,7 @@ impl PortHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip(self))]
|
||||||
pub async fn stop_rejector(&mut self, port: Port) -> Option<(TcpListener, Packet)> {
|
pub async fn stop_rejector(&mut self, port: Port) -> Option<(TcpListener, Packet)> {
|
||||||
info!(port, "stopping rejector");
|
info!(port, "stopping rejector");
|
||||||
|
|
||||||
@ -368,11 +374,11 @@ impl PortHandler {
|
|||||||
&mut self,
|
&mut self,
|
||||||
port: Port,
|
port: Port,
|
||||||
f: impl FnOnce(&mut Packet),
|
f: impl FnOnce(&mut Packet),
|
||||||
) -> anyhow::Result<()> {
|
) -> eyre::Result<()> {
|
||||||
let (listener, mut packet) = self
|
let (listener, mut packet) = self
|
||||||
.stop_rejector(port)
|
.stop_rejector(port)
|
||||||
.await
|
.await
|
||||||
.ok_or_else(|| anyhow!("tried to stop rejector that is not running"))?;
|
.ok_or_else(|| eyre!("tried to stop rejector that is not running"))?;
|
||||||
|
|
||||||
f(&mut packet);
|
f(&mut packet);
|
||||||
|
|
||||||
@ -396,6 +402,7 @@ impl Debug for Rejector {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Rejector {
|
impl Rejector {
|
||||||
|
#[instrument(skip(listener))]
|
||||||
fn start(listener: TcpListener, packet: Packet) -> Self {
|
fn start(listener: TcpListener, packet: Packet) -> Self {
|
||||||
let port = listener.local_addr().map(|addr| addr.port()).unwrap_or(0);
|
let port = listener.local_addr().map(|addr| addr.port()).unwrap_or(0);
|
||||||
let state = Arc::new((Mutex::new(listener), packet));
|
let state = Arc::new((Mutex::new(listener), packet));
|
||||||
@ -419,6 +426,7 @@ impl Rejector {
|
|||||||
|
|
||||||
Self { state, handle }
|
Self { state, handle }
|
||||||
}
|
}
|
||||||
|
#[instrument(skip(self))]
|
||||||
async fn stop(self) -> (TcpListener, Packet) {
|
async fn stop(self) -> (TcpListener, Packet) {
|
||||||
self.handle.abort();
|
self.handle.abort();
|
||||||
let _ = self.handle.await;
|
let _ = self.handle.await;
|
||||||
@ -428,6 +436,7 @@ impl Rejector {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl PortHandler {
|
impl PortHandler {
|
||||||
|
#[instrument(skip(self, config))]
|
||||||
pub fn allocate_port_for_number(&mut self, config: &Config, number: Number) -> Option<Port> {
|
pub fn allocate_port_for_number(&mut self, config: &Config, number: Number) -> Option<Port> {
|
||||||
let port = if let Some(port) = self.allocated_ports.get(&number) {
|
let port = if let Some(port) = self.allocated_ports.get(&number) {
|
||||||
let already_connected = self
|
let already_connected = self
|
||||||
@ -462,6 +471,7 @@ impl PortHandler {
|
|||||||
port
|
port
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip(self, config))]
|
||||||
fn try_recover_port(&mut self, config: &Config) -> Option<Port> {
|
fn try_recover_port(&mut self, config: &Config) -> Option<Port> {
|
||||||
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
|
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
|
||||||
|
|
||||||
@ -522,6 +532,7 @@ impl PortHandler {
|
|||||||
None // TODO
|
None // TODO
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip(self))]
|
||||||
pub fn mark_port_error(&mut self, number: Number, port: Port) {
|
pub fn mark_port_error(&mut self, number: Number, port: Port) {
|
||||||
warn!(port, number, "registering an error on");
|
warn!(port, number, "registering an error on");
|
||||||
self.register_update();
|
self.register_update();
|
||||||
|
Loading…
Reference in New Issue
Block a user