Finishing basic implementation

This commit is contained in:
2022-03-20 23:23:08 +01:00
commit 43278f09c8
8 changed files with 648 additions and 0 deletions

7
src/listener.rs Normal file
View File

@ -0,0 +1,7 @@
use crate::shutdown::Shutdown;
pub(crate) struct Listener {
pub(crate) shutdown: Shutdown,
pub(crate) source: String,
pub(crate) target: String,
}

47
src/main.rs Normal file
View File

@ -0,0 +1,47 @@
mod listener;
mod shutdown;
mod tcp;
use serde::Deserialize;
use std::fs::File;
use std::path::Path;
use tokio::sync::broadcast;
#[derive(Debug, Deserialize)]
struct Target {
source: String,
target: String,
}
#[derive(Debug, Deserialize)]
struct Config {
tcp: Vec<Target>,
// udp: Vec<Target>,
}
#[tokio::main]
async fn main() {
let json_file_path = Path::new("./config.json");
let file = File::open(json_file_path).expect("file not found");
let (notify_shutdown, _) = broadcast::channel(1);
let config: Config = serde_json::from_reader(file).expect("json parse error");
for target in config.tcp {
let listener = listener::Listener {
shutdown: shutdown::Shutdown::new(notify_shutdown.subscribe()),
source: target.source,
target: target.target,
};
tokio::spawn(async move {
if let Err(err) = tcp::start_tcp_listener(listener).await {
println!("listener error: {}", err);
}
});
}
// _ = notify_shutdown.send(());
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(1000 * 1000)).await;
}
}

49
src/shutdown.rs Normal file
View File

@ -0,0 +1,49 @@
use tokio::sync::broadcast;
/// Listens for the server shutdown signal.
///
/// Shutdown is signalled using a `broadcast::Receiver`. Only a single value is
/// ever sent. Once a value has been sent via the broadcast channel, the server
/// should shutdown.
///
/// The `Shutdown` struct listens for the signal and tracks that the signal has
/// been received. Callers may query for whether the shutdown signal has been
/// received or not.
#[derive(Debug)]
pub(crate) struct Shutdown {
/// `true` if the shutdown signal has been received
shutdown: bool,
/// The receive half of the channel used to listen for shutdown.
notify: broadcast::Receiver<()>,
}
impl Shutdown {
/// Create a new `Shutdown` backed by the given `broadcast::Receiver`.
pub(crate) fn new(notify: broadcast::Receiver<()>) -> Shutdown {
Shutdown {
shutdown: false,
notify,
}
}
/// Returns `true` if the shutdown signal has been received.
pub(crate) fn is_shutdown(&self) -> bool {
self.shutdown
}
/// Receive the shutdown notice, waiting if necessary.
pub(crate) async fn recv(&mut self) {
// If the shutdown signal has already been received, then return
// immediately.
if self.shutdown {
return;
}
// Cannot receive a "lag error" as only one value is ever sent.
let _ = self.notify.recv().await;
// Remember that the signal has been received.
self.shutdown = true;
}
}

51
src/tcp.rs Normal file
View File

@ -0,0 +1,51 @@
use crate::listener::Listener;
use std::error::Error;
use tokio::net::{TcpListener, TcpStream};
#[derive(Debug)]
struct TcpHandler {
stream: TcpStream,
target: String,
}
impl TcpHandler {
async fn run(&mut self) -> Result<(), Box<dyn Error>> {
let mut stream = TcpStream::connect(&self.target).await?;
tokio::io::copy_bidirectional(&mut self.stream, &mut stream).await?;
return Ok(());
}
}
pub(crate) async fn start_tcp_listener(
mut listener_config: Listener,
) -> Result<(), Box<dyn Error>> {
println!("start listening on {}", &listener_config.source);
let listener = TcpListener::bind(&listener_config.source).await?;
loop {
let (next_socket, _) = tokio::select! {
res = listener.accept() => res?,
_ = listener_config.shutdown.recv() => {
return Ok(());
}
};
println!(
"new connection from {} forwarding to {}",
next_socket.peer_addr()?,
&listener_config.target
);
let mut handler = TcpHandler {
stream: next_socket,
target: listener_config.target.clone(),
};
tokio::spawn(async move {
// Process the connection. If an error is encountered, log it.
if let Err(err) = handler.run().await {
println!("connection error {}", err);
}
});
}
}

2
src/udp.rs Normal file
View File

@ -0,0 +1,2 @@
// For UDP to work, there is some magic required with matches the returning packets
// back to the original sender. Idk. how to do that right now, but maybe some day.