Add support for multiple targets and live-reload.

This commit is contained in:
Fabian Stamm
2022-08-25 20:50:11 +00:00
parent cc9872034d
commit f7af1ac020
7 changed files with 161 additions and 26 deletions

View File

@ -1,7 +1,9 @@
use crate::shutdown::Shutdown;
use std::sync::Arc;
use tokio::sync::RwLock;
pub(crate) struct Listener {
pub(crate) shutdown: Shutdown,
pub(crate) source: String,
pub(crate) target: String,
pub(crate) targets: Arc<RwLock<Vec<String>>>,
}

View File

@ -4,15 +4,18 @@ mod tcp;
use serde::Deserialize;
use simple_error::bail;
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::fs::File;
use std::path::Path;
use tokio::sync::broadcast;
use std::sync::Arc;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{broadcast, RwLock};
#[derive(Debug, Deserialize)]
struct Target {
source: String,
target: String,
targets: Vec<String>,
}
#[derive(Debug, Deserialize)]
@ -57,27 +60,88 @@ fn load_config() -> Result<Config, Box<dyn Error>> {
bail!("No config file found");
}
// #[derive(Debug)]
struct ActiveListener {
notify_shutdown: broadcast::Sender<()>,
targets: Arc<RwLock<Vec<String>>>,
}
#[tokio::main]
async fn main() {
let config = load_config().expect("config not found");
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut listeners: HashMap<String, ActiveListener> = HashMap::new();
let mut sighup_stream = signal(SignalKind::hangup())?;
let (notify_shutdown, _) = broadcast::channel(1);
for target in config.tcp {
let listener = listener::Listener {
shutdown: shutdown::Shutdown::new(notify_shutdown.subscribe()),
source: target.source,
target: target.target,
};
tokio::spawn(async move {
if let Err(err) = tcp::start_tcp_listener(listener).await {
println!("listener error: {}", err);
}
});
}
// _ = notify_shutdown.send(());
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(1000 * 1000)).await;
let config = load_config().expect("config not found");
let mut required_listeners: HashSet<String> = HashSet::new();
for target in config.tcp {
required_listeners.insert(target.source.clone());
if let Some(listener) = listeners.get(&target.source) {
let mut invalid = false;
{
let targets = listener.targets.read().await;
for t in &target.targets {
if !targets.iter().any(|e| *e == *t) {
invalid = true;
break;
}
}
if !invalid {
for t in targets.iter() {
if !target.targets.iter().any(|e| *e == *t) {
invalid = true;
break;
}
}
}
}
if invalid {
println!("Found invalid targets! Adjusting!");
let mut w = listener.targets.write().await;
*w = target.targets;
}
} else {
let (notify_shutdown, _) = broadcast::channel(1);
let listener = ActiveListener {
notify_shutdown: notify_shutdown,
targets: Arc::new(RwLock::new(target.targets)),
};
let l = listener::Listener {
shutdown: shutdown::Shutdown::new(listener.notify_shutdown.subscribe()),
source: target.source.clone(),
targets: listener.targets.clone(),
};
tokio::spawn(async move {
if let Err(err) = tcp::start_tcp_listener(l).await {
println!("listener error: {}", err);
}
});
listeners.insert(target.source, listener);
}
}
let to_delete: Vec<_> = listeners
.keys()
.filter(|x| required_listeners.get(*x).is_none())
.cloned() // Clone the result to make listeners mutable again!
.collect();
for del_key in to_delete {
if let Some(listener) = listeners.get(&del_key) {
let _ = listener.notify_shutdown.send(()); //Errors are irrelevant here. I guess....
println!("Removing listener!");
listeners.remove(&del_key);
}
}
sighup_stream.recv().await;
println!("Recevied SIGHUP!");
}
}

View File

@ -1,4 +1,5 @@
use crate::listener::Listener;
use rand::seq::SliceRandom;
use std::error::Error;
use tokio::net::{TcpListener, TcpStream};
@ -28,17 +29,23 @@ pub(crate) async fn start_tcp_listener(
let (next_socket, _) = tokio::select! {
res = listener.accept() => res?,
_ = listener_config.shutdown.recv() => {
println!("Exiting listener!");
return Ok(());
}
};
let targets = listener_config.targets.read().await;
let mut rng = rand::thread_rng();
let selected_target = targets.choose(&mut rng).unwrap();
println!(
"new connection from {} forwarding to {}",
next_socket.peer_addr()?,
&listener_config.target
&selected_target
);
let mut handler = TcpHandler {
stream: next_socket,
target: listener_config.target.clone(),
target: selected_target.clone(),
};
tokio::spawn(async move {