diff --git a/Cargo.lock b/Cargo.lock index 545a5c1..2c91e0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -115,6 +115,17 @@ dependencies = [ "slab", ] +[[package]] +name = "getrandom" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "hashbrown" version = "0.11.2" @@ -265,6 +276,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "ppv-lite86" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" + [[package]] name = "proc-macro2" version = "1.0.36" @@ -283,6 +300,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" +dependencies = [ + "getrandom", +] + [[package]] name = "redox_syscall" version = "0.2.11" @@ -297,6 +344,7 @@ name = "rustocat" version = "0.0.3" dependencies = [ "futures", + "rand", "serde", "serde_json", "serde_yaml", diff --git a/Cargo.toml b/Cargo.toml index efed954..2696b46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.79" serde_yaml = "0.8.23" simple-error = "0.2.3" +rand = "0.8.5" [profile.release] opt-level = 3 # Optimize for size. diff --git a/README.md b/README.md index 5d0d3b2..3f809fa 100644 --- a/README.md +++ b/README.md @@ -4,12 +4,20 @@ Rustocat is a simple socat alternative with way less features, but it has a conf ## Config File -Configs can be either yaml or json and can be located in /etc/rustocat.{yaml|json} or ind the current working directory as config.{yaml|json}. +Configs can be either yaml or json and can be located in /etc/rustocat.{yaml|json} or in the current working directory as config.{yaml|json}. ```yaml tcp: - source: 0.0.0.0:2222 - target: 127.0.0.1:22 + targets: [127.0.0.1:22] ``` Currently only TCP is supported, UDP/Unix Socket support might be added later. + +When multiple targets are set, it will randomly pick one of them. + +## Live Changes + +It is designed to live reconfigure itself. To trigger a change, send the signal `SIGHUP` to the process. This will re-read the config file and adjust the sockets and targets accordingly. + +All existing connections will stay as they are, but removed sources are closed from further connections. diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..afe8f69 --- /dev/null +++ b/config.yaml @@ -0,0 +1,5 @@ +tcp: + - source: 127.0.0.1:4422 + targets: [127.0.0.1:22, fury.infra.stamm.me:22] + - source: 127.0.0.1:4423 + targets: [127.0.0.1:22, fury.infra.stamm.me:22] diff --git a/src/listener.rs b/src/listener.rs index 304d0ec..2f9102d 100644 --- a/src/listener.rs +++ b/src/listener.rs @@ -1,7 +1,9 @@ use crate::shutdown::Shutdown; +use std::sync::Arc; +use tokio::sync::RwLock; pub(crate) struct Listener { pub(crate) shutdown: Shutdown, pub(crate) source: String, - pub(crate) target: String, + pub(crate) targets: Arc>>, } diff --git a/src/main.rs b/src/main.rs index 853f82e..bea0e8b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,15 +4,18 @@ mod tcp; use serde::Deserialize; use simple_error::bail; +use std::collections::{HashMap, HashSet}; use std::error::Error; use std::fs::File; use std::path::Path; -use tokio::sync::broadcast; +use std::sync::Arc; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::sync::{broadcast, RwLock}; #[derive(Debug, Deserialize)] struct Target { source: String, - target: String, + targets: Vec, } #[derive(Debug, Deserialize)] @@ -57,27 +60,88 @@ fn load_config() -> Result> { bail!("No config file found"); } +// #[derive(Debug)] +struct ActiveListener { + notify_shutdown: broadcast::Sender<()>, + targets: Arc>>, +} + #[tokio::main] -async fn main() { - let config = load_config().expect("config not found"); +async fn main() -> Result<(), Box> { + let mut listeners: HashMap = HashMap::new(); + let mut sighup_stream = signal(SignalKind::hangup())?; - let (notify_shutdown, _) = broadcast::channel(1); - - for target in config.tcp { - let listener = listener::Listener { - shutdown: shutdown::Shutdown::new(notify_shutdown.subscribe()), - source: target.source, - target: target.target, - }; - tokio::spawn(async move { - if let Err(err) = tcp::start_tcp_listener(listener).await { - println!("listener error: {}", err); - } - }); - } - - // _ = notify_shutdown.send(()); loop { - tokio::time::sleep(tokio::time::Duration::from_millis(1000 * 1000)).await; + let config = load_config().expect("config not found"); + let mut required_listeners: HashSet = HashSet::new(); + + for target in config.tcp { + required_listeners.insert(target.source.clone()); + if let Some(listener) = listeners.get(&target.source) { + let mut invalid = false; + { + let targets = listener.targets.read().await; + for t in &target.targets { + if !targets.iter().any(|e| *e == *t) { + invalid = true; + break; + } + } + + if !invalid { + for t in targets.iter() { + if !target.targets.iter().any(|e| *e == *t) { + invalid = true; + break; + } + } + } + } + + if invalid { + println!("Found invalid targets! Adjusting!"); + let mut w = listener.targets.write().await; + *w = target.targets; + } + } else { + let (notify_shutdown, _) = broadcast::channel(1); + + let listener = ActiveListener { + notify_shutdown: notify_shutdown, + targets: Arc::new(RwLock::new(target.targets)), + }; + + let l = listener::Listener { + shutdown: shutdown::Shutdown::new(listener.notify_shutdown.subscribe()), + source: target.source.clone(), + targets: listener.targets.clone(), + }; + + tokio::spawn(async move { + if let Err(err) = tcp::start_tcp_listener(l).await { + println!("listener error: {}", err); + } + }); + + listeners.insert(target.source, listener); + } + } + + let to_delete: Vec<_> = listeners + .keys() + .filter(|x| required_listeners.get(*x).is_none()) + .cloned() // Clone the result to make listeners mutable again! + .collect(); + + for del_key in to_delete { + if let Some(listener) = listeners.get(&del_key) { + let _ = listener.notify_shutdown.send(()); //Errors are irrelevant here. I guess.... + println!("Removing listener!"); + listeners.remove(&del_key); + } + } + + sighup_stream.recv().await; + println!("Recevied SIGHUP!"); } } diff --git a/src/tcp.rs b/src/tcp.rs index b56e00d..489ace5 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -1,4 +1,5 @@ use crate::listener::Listener; +use rand::seq::SliceRandom; use std::error::Error; use tokio::net::{TcpListener, TcpStream}; @@ -28,17 +29,23 @@ pub(crate) async fn start_tcp_listener( let (next_socket, _) = tokio::select! { res = listener.accept() => res?, _ = listener_config.shutdown.recv() => { + println!("Exiting listener!"); return Ok(()); } }; + + let targets = listener_config.targets.read().await; + let mut rng = rand::thread_rng(); + let selected_target = targets.choose(&mut rng).unwrap(); + println!( "new connection from {} forwarding to {}", next_socket.peer_addr()?, - &listener_config.target + &selected_target ); let mut handler = TcpHandler { stream: next_socket, - target: listener_config.target.clone(), + target: selected_target.clone(), }; tokio::spawn(async move {