Compare commits

..

No commits in common. "bcc332bd7bfdcdf4a34fc263127a7cd94a877b0d" and "7530ce3aaeea6ea7be9f6045df935e8eb1c5e34d" have entirely different histories.

10 changed files with 184 additions and 534 deletions

1
.gitignore vendored
View File

@ -1,3 +1,2 @@
/target /target
config.yaml
config.json config.json

288
Cargo.lock generated
View File

@ -2,15 +2,6 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 3
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.1.0" version = "1.1.0"
@ -23,114 +14,18 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bumpalo"
version = "3.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535"
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.1.0" version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]]
name = "cc"
version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f"
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
version = "1.0.0" version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16b0a3d9ed01224b22057780a37bb8c5dbfe1be8ba48678e7bf57ec4b385411f"
dependencies = [
"iana-time-zone",
"js-sys",
"num-integer",
"num-traits",
"time",
"wasm-bindgen",
"winapi",
]
[[package]]
name = "codespan-reporting"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3538270d33cc669650c4b093848450d380def10c331d38c768e34cac80576e6e"
dependencies = [
"termcolor",
"unicode-width",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
[[package]]
name = "cxx"
version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc831ee6a32dd495436e317595e639a587aa9907bef96fe6e6abc290ab6204e9"
dependencies = [
"cc",
"cxxbridge-flags",
"cxxbridge-macro",
"link-cplusplus",
]
[[package]]
name = "cxx-build"
version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94331d54f1b1a8895cd81049f7eaaaef9d05a7dcb4d1fd08bf3ff0806246789d"
dependencies = [
"cc",
"codespan-reporting",
"once_cell",
"proc-macro2",
"quote",
"scratch",
"syn",
]
[[package]]
name = "cxxbridge-flags"
version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48dcd35ba14ca9b40d6e4b4b39961f23d835dbb8eed74565ded361d93e1feb8a"
[[package]]
name = "cxxbridge-macro"
version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81bbeb29798b407ccd82a3324ade1a7286e0d29851475990b612670f6f5124d2"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "fern"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3bdd7b0849075e79ee9a1836df22c717d1eba30451796fdc631b04565dd11e2a"
dependencies = [
"log",
]
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.21" version = "0.3.21"
@ -228,7 +123,7 @@ checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
"wasi 0.11.0+wasi-snapshot-preview1", "wasi",
] ]
[[package]] [[package]]
@ -246,30 +141,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "iana-time-zone"
version = "0.1.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64c122667b287044802d6ce17ee2ddf13207ed924c712de9a66a5814d5b64765"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"winapi",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0703ae284fc167426161c2e3f1da3ea71d94b21bedbcc9494e92b28e334e3dca"
dependencies = [
"cxx",
"cxx-build",
]
[[package]] [[package]]
name = "indexmap" name = "indexmap"
version = "1.9.1" version = "1.9.1"
@ -286,29 +157,11 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
[[package]]
name = "js-sys"
version = "0.3.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "445dde2150c55e483f3d8416706b97ec8e8237c307e5b7b4b8dd15e6af2a0730"
dependencies = [
"wasm-bindgen",
]
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.139" version = "0.2.121"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f"
[[package]]
name = "link-cplusplus"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecd207c9c713c34f95a097a5b029ac2ce6010530c7b49d7fea24d977dede04f5"
dependencies = [
"cc",
]
[[package]] [[package]]
name = "lock_api" name = "lock_api"
@ -321,9 +174,9 @@ dependencies = [
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.17" version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
] ]
@ -342,29 +195,10 @@ checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf"
dependencies = [ dependencies = [
"libc", "libc",
"log", "log",
"wasi 0.11.0+wasi-snapshot-preview1", "wasi",
"windows-sys 0.36.1", "windows-sys 0.36.1",
] ]
[[package]]
name = "num-integer"
version = "0.1.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd"
dependencies = [
"autocfg",
]
[[package]] [[package]]
name = "num_cpus" name = "num_cpus"
version = "1.13.1" version = "1.13.1"
@ -377,9 +211,9 @@ dependencies = [
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.17.0" version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66" checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9"
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
@ -481,12 +315,9 @@ dependencies = [
[[package]] [[package]]
name = "rustocat" name = "rustocat"
version = "0.1.0" version = "0.0.3"
dependencies = [ dependencies = [
"chrono",
"fern",
"futures", "futures",
"log",
"rand", "rand",
"serde", "serde",
"serde_json", "serde_json",
@ -507,12 +338,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "scratch"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.144" version = "1.0.144"
@ -605,26 +430,6 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "termcolor"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6"
dependencies = [
"winapi-util",
]
[[package]]
name = "time"
version = "0.1.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a"
dependencies = [
"libc",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi",
]
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.21.1" version = "1.21.1"
@ -663,84 +468,18 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcc811dc4066ac62f84f11307873c4850cb653bfa9b1719cee2bd2204a4bc5dd" checksum = "dcc811dc4066ac62f84f11307873c4850cb653bfa9b1719cee2bd2204a4bc5dd"
[[package]]
name = "unicode-width"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b"
[[package]] [[package]]
name = "unsafe-libyaml" name = "unsafe-libyaml"
version = "0.2.4" version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1e5fa573d8ac5f1a856f8d7be41d390ee973daf97c806b2c1a465e4e1406e68" checksum = "c1e5fa573d8ac5f1a856f8d7be41d390ee973daf97c806b2c1a465e4e1406e68"
[[package]]
name = "wasi"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]] [[package]]
name = "wasi" name = "wasi"
version = "0.11.0+wasi-snapshot-preview1" version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31f8dcbc21f30d9b8f2ea926ecb58f6b91192c17e9d33594b3df58b2007ca53b"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95ce90fd5bcc06af55a641a86428ee4229e44e07033963a2290a8e241607ccb9"
dependencies = [
"bumpalo",
"log",
"once_cell",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c21f77c0bedc37fd5dc21f897894a5ca01e7bb159884559461862ae90c0b4c5"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2aff81306fcac3c7515ad4e177f521b5c9a15f2b08f4e32d823066102f35a5f6"
dependencies = [
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0046fef7e28c3804e5e38bfa31ea2a0f73905319b677e57ebe37e49358989b5d"
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.3.9" version = "0.3.9"
@ -757,15 +496,6 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
]
[[package]] [[package]]
name = "winapi-x86_64-pc-windows-gnu" name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0" version = "0.4.0"

View File

@ -1,6 +1,6 @@
[package] [package]
name = "rustocat" name = "rustocat"
version = "0.1.0" version = "0.0.3"
edition = "2021" edition = "2021"
description = "Socat in rust with many less features and a configuration file" description = "Socat in rust with many less features and a configuration file"
license = "ISC" license = "ISC"
@ -8,16 +8,13 @@ license = "ISC"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
tokio = { version = "1", features = ["full"] } tokio = { version = "1.21.1", features = ["full"] }
futures = "0.3.21" futures = "0.3.21"
serde = { version = "1", features = ["derive"] } serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1" serde_json = "1.0.79"
serde_yaml = "0.9.13" serde_yaml = "0.9.13"
simple-error = "0.2.3" simple-error = "0.2.3"
rand = "0.8.5" rand = "0.8.5"
log = "0.4.17"
fern = "0.6.1"
chrono = "0.4.23"
[profile.release] [profile.release]
opt-level = 3 # Optimize for size. opt-level = 3 # Optimize for size.

View File

@ -7,13 +7,12 @@ Rustocat is a simple socat alternative with way less features, but it has a conf
Configs can be either yaml or json and can be located in /etc/rustocat.{yaml|json} or in the current working directory as config.{yaml|json}. Configs can be either yaml or json and can be located in /etc/rustocat.{yaml|json} or in the current working directory as config.{yaml|json}.
```yaml ```yaml
mappings: tcp:
- udp: false - source: 0.0.0.0:2222
source: 0.0.0.0:2222
targets: [127.0.0.1:22] targets: [127.0.0.1:22]
``` ```
There is support for UDP and TCP sockets. Each socket can have multiple targets. Currently only TCP is supported, UDP/Unix Socket support might be added later.
When multiple targets are set, it will randomly pick one of them. When multiple targets are set, it will randomly pick one of them.

5
config.yaml Normal file
View File

@ -0,0 +1,5 @@
tcp:
- source: 127.0.0.1:4422
targets: [127.0.0.1:22, fury.infra.stamm.me:22]
- source: 127.0.0.1:4423
targets: [127.0.0.1:22, fury.infra.stamm.me:22]

View File

@ -1,9 +1,9 @@
use crate::shutdown::ShutdownReceiver; use crate::shutdown::Shutdown;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
pub(crate) struct Listener { pub(crate) struct Listener {
pub(crate) shutdown: ShutdownReceiver, pub(crate) shutdown: Shutdown,
pub(crate) source: String, pub(crate) source: String,
pub(crate) targets: Arc<RwLock<Vec<String>>>, pub(crate) targets: Arc<RwLock<Vec<String>>>,
} }

View File

@ -3,7 +3,6 @@ mod shutdown;
mod tcp; mod tcp;
mod udp; mod udp;
use log::{debug, error, info, warn};
use serde::Deserialize; use serde::Deserialize;
use simple_error::bail; use simple_error::bail;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
@ -14,8 +13,6 @@ use std::sync::Arc;
use tokio::signal::unix::{signal, SignalKind}; use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{broadcast, RwLock}; use tokio::sync::{broadcast, RwLock};
pub type Result<T> = std::result::Result<T, Box<dyn Error>>;
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
struct Target { struct Target {
udp: Option<bool>, udp: Option<bool>,
@ -28,21 +25,21 @@ struct Config {
mappings: Vec<Target>, mappings: Vec<Target>,
} }
fn load_yaml(path: &Path) -> Result<Config> { fn load_yaml(path: &Path) -> Result<Config, Box<dyn Error>> {
let file = File::open(path)?; let file = File::open(path)?;
let config: Config = serde_yaml::from_reader(file).expect("Failed to parse!"); //TODO: Print path let config: Config = serde_yaml::from_reader(file).expect("Failed to parse!"); //TODO: Print path
return Ok(config); return Ok(config);
} }
fn load_json(path: &Path) -> Result<Config> { fn load_json(path: &Path) -> Result<Config, Box<dyn Error>> {
let file = File::open(path)?; let file = File::open(path)?;
let config: Config = serde_json::from_reader(file).expect("Failed to parse!"); //TODO: Print path let config: Config = serde_json::from_reader(file).expect("Failed to parse!"); //TODO: Print path
return Ok(config); return Ok(config);
} }
fn load_config() -> Result<Config> { fn load_config() -> Result<Config, Box<dyn Error>> {
for path in [ for path in [
"config.yaml", "config.yaml",
"config.json", "config.json",
@ -66,28 +63,13 @@ fn load_config() -> Result<Config> {
// #[derive(Debug)] // #[derive(Debug)]
struct ActiveListener { struct ActiveListener {
udp: bool,
notify_shutdown: broadcast::Sender<()>, notify_shutdown: broadcast::Sender<()>,
targets: Arc<RwLock<Vec<String>>>, targets: Arc<RwLock<Vec<String>>>,
} }
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
fern::Dispatch::new()
.format(|out, message, record| {
out.finish(format_args!(
"{}[{}][{}] {}",
chrono::Local::now().format("[%Y-%m-%d][%H:%M:%S]"),
record.target(),
record.level(),
message
))
})
// Add blanket level filter -
.level(log::LevelFilter::Info)
.level_for("rustocat", log::LevelFilter::Trace)
.chain(std::io::stdout())
.apply()?;
let mut listeners: HashMap<String, ActiveListener> = HashMap::new(); let mut listeners: HashMap<String, ActiveListener> = HashMap::new();
let mut sighup_stream = signal(SignalKind::hangup())?; let mut sighup_stream = signal(SignalKind::hangup())?;
@ -126,7 +108,7 @@ async fn main() -> Result<()> {
} }
if invalid { if invalid {
warn!("Found invalid targets! Adjusting!"); println!("Found invalid targets! Adjusting!");
let mut w = listener.targets.write().await; let mut w = listener.targets.write().await;
*w = target.targets; *w = target.targets;
} }
@ -136,10 +118,11 @@ async fn main() -> Result<()> {
let listener = ActiveListener { let listener = ActiveListener {
notify_shutdown: notify_shutdown, notify_shutdown: notify_shutdown,
targets: Arc::new(RwLock::new(target.targets)), targets: Arc::new(RwLock::new(target.targets)),
udp: target.udp == None || target.udp == Some(false),
}; };
let l = listener::Listener { let l = listener::Listener {
shutdown: shutdown::ShutdownReceiver::new(listener.notify_shutdown.subscribe()), shutdown: shutdown::Shutdown::new(listener.notify_shutdown.subscribe()),
source: target.source.clone(), source: target.source.clone(),
targets: listener.targets.clone(), targets: listener.targets.clone(),
}; };
@ -147,11 +130,11 @@ async fn main() -> Result<()> {
tokio::spawn(async move { tokio::spawn(async move {
if target.udp == None || target.udp == Some(false) { if target.udp == None || target.udp == Some(false) {
if let Err(err) = tcp::start_tcp_listener(l).await { if let Err(err) = tcp::start_tcp_listener(l).await {
error!("tcp listener error: {}", err); println!("tcp listener error: {}", err);
} }
} else { } else {
if let Err(err) = udp::start_udp_listener(l).await { if let Err(err) = udp::start_udp_listener(l).await {
error!("udp listener error: {}", err); println!("udp listener error: {}", err);
} }
} }
}); });
@ -168,22 +151,13 @@ async fn main() -> Result<()> {
for del_key in to_delete { for del_key in to_delete {
if let Some(listener) = listeners.get(&del_key) { if let Some(listener) = listeners.get(&del_key) {
let res = listener.notify_shutdown.send(()); //Errors are irrelevant here. I guess.... let _ = listener.notify_shutdown.send(()); //Errors are irrelevant here. I guess....
match res { println!("Removing listener!");
Ok(_) => {
info!("Sent shutdown signal!");
}
Err(_) => {
warn!("Failed to send shutdown signal!");
}
}
debug!("Removing listener!");
listeners.remove(&del_key); listeners.remove(&del_key);
} }
} }
sighup_stream.recv().await; sighup_stream.recv().await;
info!("Recevied SIGHUP, reloading config!"); println!("Recevied SIGHUP!");
} }
} }

View File

@ -1,60 +1,56 @@
use tokio::sync::broadcast; use tokio::sync::broadcast;
/// Listens for the server shutdown signal.
///
/// Shutdown is signalled using a `broadcast::Receiver`. Only a single value is
/// ever sent. Once a value has been sent via the broadcast channel, the server
/// should shutdown.
///
/// The `Shutdown` struct listens for the signal and tracks that the signal has
/// been received. Callers may query for whether the shutdown signal has been
/// received or not.
#[derive(Debug)]
pub(crate) struct Shutdown { pub(crate) struct Shutdown {
/// `true` if the shutdown signal has been received
shutdown: bool, shutdown: bool,
/// The receive half of the channel used to listen for shutdown.
notify: broadcast::Receiver<()>, notify: broadcast::Receiver<()>,
sender: broadcast::Sender<()>,
} }
impl Shutdown { impl Shutdown {
pub(crate) fn new() -> Shutdown { /// Create a new `Shutdown` backed by the given `broadcast::Receiver`.
let (sender, notify) = broadcast::channel(1); pub(crate) fn new(notify: broadcast::Receiver<()>) -> Shutdown {
Shutdown { Shutdown {
shutdown: false, shutdown: false,
notify, notify,
sender,
} }
} }
pub(crate) fn shutdown(&mut self) { /// Returns `true` if the shutdown signal has been received.
if self.shutdown { // pub(crate) fn is_shutdown(&self) -> bool {
return; // self.shutdown
} // }
let _ = self.sender.send(());
self.shutdown = true;
}
pub(crate) fn receiver(&self) -> ShutdownReceiver {
ShutdownReceiver::new(self.notify.resubscribe())
}
}
#[derive(Debug)]
pub(crate) struct ShutdownReceiver {
shutdown: bool,
notify: broadcast::Receiver<()>,
}
impl ShutdownReceiver {
pub(crate) fn new(notify: broadcast::Receiver<()>) -> ShutdownReceiver {
ShutdownReceiver {
shutdown: false,
notify,
}
}
/// Receive the shutdown notice, waiting if necessary.
pub(crate) async fn recv(&mut self) { pub(crate) async fn recv(&mut self) {
// If the shutdown signal has already been received, then return
// immediately.
if self.shutdown { if self.shutdown {
return; return;
} }
// Cannot receive a "lag error" as only one value is ever sent.
let _ = self.notify.recv().await; let _ = self.notify.recv().await;
// Remember that the signal has been received.
self.shutdown = true; self.shutdown = true;
} }
} }
impl Clone for ShutdownReceiver { impl Clone for Shutdown {
fn clone(&self) -> Self { fn clone(&self) -> Self {
ShutdownReceiver { Shutdown {
shutdown: self.shutdown, shutdown: self.shutdown,
notify: self.notify.resubscribe(), notify: self.notify.resubscribe(),
} }

View File

@ -1,5 +1,4 @@
use crate::listener::Listener; use crate::listener::Listener;
use log::{info, trace, warn};
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use std::error::Error; use std::error::Error;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
@ -23,14 +22,14 @@ impl TcpHandler {
pub(crate) async fn start_tcp_listener( pub(crate) async fn start_tcp_listener(
mut listener_config: Listener, mut listener_config: Listener,
) -> Result<(), Box<dyn Error>> { ) -> Result<(), Box<dyn Error>> {
info!("start listening on {}", &listener_config.source); println!("start listening on {}", &listener_config.source);
let listener = TcpListener::bind(&listener_config.source).await?; let listener = TcpListener::bind(&listener_config.source).await?;
loop { loop {
let (next_socket, _) = tokio::select! { let (next_socket, _) = tokio::select! {
res = listener.accept() => res?, res = listener.accept() => res?,
_ = listener_config.shutdown.recv() => { _ = listener_config.shutdown.recv() => {
info!("Exiting listener!"); println!("Exiting listener!");
return Ok(()); return Ok(());
} }
}; };
@ -39,7 +38,7 @@ pub(crate) async fn start_tcp_listener(
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let selected_target = targets.choose(&mut rng).unwrap(); let selected_target = targets.choose(&mut rng).unwrap();
trace!( println!(
"new connection from {} forwarding to {}", "new connection from {} forwarding to {}",
next_socket.peer_addr()?, next_socket.peer_addr()?,
&selected_target &selected_target
@ -52,7 +51,7 @@ pub(crate) async fn start_tcp_listener(
tokio::spawn(async move { tokio::spawn(async move {
// Process the connection. If an error is encountered, log it. // Process the connection. If an error is encountered, log it.
if let Err(err) = handler.run().await { if let Err(err) = handler.run().await {
warn!("connection error {}", err); println!("connection error {}", err);
} }
}); });
} }

View File

@ -1,225 +1,176 @@
use std::collections::HashMap; use crate::listener::Listener;
use std::sync::atomic::{AtomicI32, AtomicU64}; use crate::shutdown::Shutdown;
use std::sync::Arc;
use log::{debug, error, info, trace};
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use tokio::sync::broadcast;
use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::time::Instant; use tokio::time::Instant;
use crate::listener::Listener; async fn create_dual_udpsocket(
use crate::shutdown::{Shutdown, ShutdownReceiver}; bind_addr: &String,
use crate::Result; ) -> Result<(UdpSocket, UdpSocket), Box<dyn Error>> {
const CONNECTION_TIMEOUT: i32 = 5;
#[derive(Clone)]
struct UDPMultiSender {
socket: Arc<UdpSocket>,
}
impl UDPMultiSender {
fn new(socket: UdpSocket) -> Self {
return Self {
socket: Arc::new(socket),
};
}
async fn send(&self, buf: &[u8]) -> Result<()> {
self.socket.send(buf).await?;
return Ok(());
}
async fn send_to(&self, buf: &[u8], dest: &str) -> Result<()> {
self.socket.send_to(buf, dest).await?;
return Ok(());
}
}
async fn splitted_udp_socket(bind_addr: &str) -> Result<(UdpSocket, UDPMultiSender)> {
let listener_std = std::net::UdpSocket::bind(bind_addr)?; let listener_std = std::net::UdpSocket::bind(bind_addr)?;
let responder_std = listener_std.try_clone()?; let responder_std = listener_std.try_clone()?;
listener_std.set_nonblocking(true)?;
responder_std.set_nonblocking(true)?;
let listener = UdpSocket::from_std(listener_std)?; let listener = UdpSocket::from_std(listener_std)?;
let responder = UDPMultiSender::new(UdpSocket::from_std(responder_std)?); let responder = UdpSocket::from_std(responder_std)?;
return Ok((listener, responder)); return Ok((listener, responder));
} }
struct UDPChannel { fn get_udp_background_send(socket: UdpSocket, mut exit: Shutdown) -> Sender<(Vec<u8>, String)> {
last_packet: Arc<AtomicI32>, let (tx, mut rx) = channel::<(Vec<u8>, String)>(1);
sender: UDPMultiSender,
shutdown: Shutdown, tokio::spawn(async move {
from: String, loop {
upstream: String, let (buf, dest) = (tokio::select! {
res = rx.recv() => Some(res.unwrap()),
_ = exit.recv() => {
println!("Exiting listener!");
return;
}
})
.unwrap();
let to_send = buf.as_slice();
socket.send_to(to_send, &dest).await.expect(&format!(
"Failed to forward response from upstream server to client {}",
dest
));
}
});
return tx;
} }
impl UDPChannel { struct UdpHandler {
last_packet: Arc<Mutex<Instant>>,
kill: broadcast::Sender<()>,
target: String,
sender: Sender<(Vec<u8>, String)>,
}
impl UdpHandler {
async fn start( async fn start(
upstream: String, target: String,
responder: UDPMultiSender, source: String,
source_addr: String, sender: Sender<(Vec<u8>, String)>,
) -> Result<Self> { ) -> Result<UdpHandler, Box<dyn Error>> {
let (upstream_listener, upstream_responder) = splitted_udp_socket("0.0.0.0:0").await?; // Kill Channel
upstream_listener.connect(upstream.clone()).await?; let (tx, mut rx) = broadcast::channel::<()>(1);
let shutdown = Shutdown::new(); let (listener, responder) = create_dual_udpsocket(&"0.0.0.0:0".to_owned()).await?;
let mut shutdown_receiver = shutdown.receiver();
let channel = Self { let s = get_udp_background_send(responder, Shutdown::new(tx.subscribe()));
last_packet: Arc::new(AtomicI32::new(CONNECTION_TIMEOUT)),
sender: upstream_responder, let last_packet = Arc::new(Mutex::new(Instant::now()));
shutdown,
from: source_addr.clone(), let handler = UdpHandler {
upstream: upstream.clone(), kill: tx,
last_packet: last_packet.clone(),
// source: source.clone(),
target: target.clone(),
sender: s,
}; };
let last_packet = channel.last_packet.clone(); listener.connect(target).await?;
tokio::spawn(async move { tokio::spawn(async move {
let mut buf = [0; 64 * 1024]; let mut buf = [0; 64 * 1024];
loop { loop {
let num_bytes = tokio::select! { let (num_bytes, _) = tokio::select! {
res = upstream_listener.recv(&mut buf) => res.unwrap(), res = listener.recv_from(&mut buf) => res.unwrap(),
_ = shutdown_receiver.recv() => { _ = rx.recv() => {
info!("Exiting"); // FIXME: Source of memory leaks?
return; return ;
} }
}; };
let mut n = last_packet.lock().await;
trace!("[{}] <- [{}] ...", source_addr, upstream); *n = Instant::now();
last_packet.store(CONNECTION_TIMEOUT, std::sync::atomic::Ordering::Relaxed); sender
.send((buf[0..num_bytes].to_vec(), source.clone()))
match responder.send_to(&buf[..num_bytes], &source_addr).await { .await
Ok(_) => {} .expect(&format!("Failed to send answer to sender {}", source));
Err(e) => {
error!("Failed to send packet: {}", e);
}
};
trace!("[{}] <- [{}] ---", source_addr, upstream);
} }
}); });
return Ok(handler);
return Ok(channel);
} }
async fn close(&mut self) { async fn exit(&self) -> Result<(), Box<dyn Error>> {
self.shutdown.shutdown(); self.kill.send(())?;
debug!("Closing connection from {}", self.from);
Ok(())
} }
async fn handle(&self, data: &[u8]) { async fn on_packet(&mut self, pkg: Vec<u8>) -> Result<(), Box<dyn Error>> {
trace!("[{}] -> [{}] ...", self.from, self.upstream); let mut n = self.last_packet.lock().await;
self.last_packet *n = Instant::now();
.store(CONNECTION_TIMEOUT, std::sync::atomic::Ordering::Relaxed); self.sender.send((pkg, self.target.clone())).await?;
match self.sender.send(data).await { return Ok(());
Ok(_) => {}
Err(e) => {
error!("Failed to send packet: {}", e);
}
}
trace!("[{}] -> [{}] ---", self.from, self.upstream);
} }
} }
fn start_stale_check( pub(crate) async fn start_udp_listener(
connections: Arc<Mutex<HashMap<String, UDPChannel>>>, mut listener_config: Listener,
mut shutdown: ShutdownReceiver, ) -> Result<(), Box<dyn Error>> {
) { println!("start listening on {}", &listener_config.source);
tokio::spawn(async move { let (listener, responder) =
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); create_dual_udpsocket(&listener_config.source)
loop { .await
tokio::select! { .expect(&format!(
_ = interval.tick() => {} "Failed to clone primary listening address socket {}",
_ = shutdown.recv() => { &listener_config.source,
info!("Exiting listener!"); ));
break;
}
}
trace!("Checking for stale connections"); let sender = get_udp_background_send(responder, listener_config.shutdown.clone());
trace!("Waiting for connections lock");
let mut connections = connections.lock().await;
trace!("Got connections lock");
let mut to_remove: Vec<String> = Vec::new();
for (source_addr, channel) in connections.iter() {
let last = channel
.last_packet
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
if last <= 0 { let mut connections: HashMap<String, UdpHandler> = HashMap::new();
to_remove.push(source_addr.clone());
}
}
for source_addr in to_remove {
debug!("Closing connection from {}", source_addr);
let mut channel = connections.remove(&source_addr).unwrap();
channel.close().await;
}
trace!("Checking for stale connections - done");
drop(connections);
}
info!("Exiting stale check");
});
}
pub(crate) async fn start_udp_listener(mut listener_config: Listener) -> Result<()> {
info!("start listening on {}", &listener_config.source);
let (listener, responder) = splitted_udp_socket(&listener_config.source).await?;
let connections = Arc::new(Mutex::new(HashMap::<String, UDPChannel>::new()));
start_stale_check(connections.clone(), listener_config.shutdown.clone());
let mut buf = [0; 64 * 1024];
loop { loop {
let mut buf = vec![0; 1024 * 64];
trace!("Waiting for packet or shutdown");
let (num_bytes, src_addr) = tokio::select! { let (num_bytes, src_addr) = tokio::select! {
res = listener.recv_from(&mut buf) => res?, res = listener.recv_from(&mut buf) => res?,
_ = listener_config.shutdown.recv() => { _ = listener_config.shutdown.recv() => {
info!("Exiting listener!"); println!("Exiting listener!");
break; break;
} }
}; };
let data = buf[0..num_bytes].to_vec(); let addr = src_addr.to_string();
let source_addr = src_addr.to_string(); let handler_opt = connections.get_mut(&addr);
let vec = buf[0..num_bytes].to_vec();
let mut connections = connections.lock().await;
let handler_opt = connections.get_mut(&source_addr);
let handler = match handler_opt { let handler = match handler_opt {
Some(handler) => handler, Some(handler) => handler,
None => { None => {
debug!("New connection from {}", source_addr);
let targets = listener_config.targets.read().await; let targets = listener_config.targets.read().await;
let upstream = { let selected_target = {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
targets.choose(&mut rng).unwrap() targets.choose(&mut rng).unwrap()
}; };
let handler = let handler =
UDPChannel::start(upstream.to_string(), responder.clone(), source_addr.clone()) UdpHandler::start(selected_target.clone(), addr.clone(), sender.clone())
.await?; .await?;
connections.insert(addr.clone(), handler);
connections.insert(source_addr.clone(), handler); connections.get_mut(&addr).unwrap()
connections.get_mut(&source_addr).unwrap()
} }
}; };
handler.handle(&data).await; match handler.on_packet(vec).await {
trace!("Handled packet"); Ok(_) => (),
drop(connections); Err(err) => {
println!("Failed to forward request from client to server {}", err);
return Ok(());
}
}
} }
for (_, handler) in connections.lock().await.iter_mut() { for handler in connections.values() {
handler.close().await; handler.exit().await?;
} }
return Ok(()); Ok(())
} }