From 7530ce3aaeea6ea7be9f6045df935e8eb1c5e34d Mon Sep 17 00:00:00 2001 From: Fabian Stamm Date: Wed, 21 Sep 2022 22:22:10 +0200 Subject: [PATCH] Implementing udp support partially...in theory. Still untested :) --- .editorconfig | 2 + Cargo.lock | 142 +++++++++++++++++++++----------------- Cargo.toml | 6 +- src/main.rs | 58 ++++++++++------ src/shutdown.rs | 63 +++++++++-------- src/tcp.rs | 74 ++++++++++---------- src/udp.rs | 178 +++++++++++++++++++++++++++++++++++++++++++++++- 7 files changed, 370 insertions(+), 153 deletions(-) create mode 100644 .editorconfig diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..7811976 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,2 @@ +[*.rs] +indent_size = 4 \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 2c91e0e..ae45c78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -128,9 +128,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.11.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" [[package]] name = "hermit-abi" @@ -143,9 +143,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.8.0" +version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223" +checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" dependencies = [ "autocfg", "hashbrown", @@ -163,12 +163,6 @@ version = "0.2.121" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f" -[[package]] -name = "linked-hash-map" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3" - [[package]] name = "lock_api" version = "0.4.6" @@ -195,34 +189,14 @@ checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" [[package]] name = "mio" -version = "0.8.2" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52da4364ffb0e4fe33a9841a98a3f3014fb964045ce4f7a45a398243c8d6b0c9" +checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf" dependencies = [ "libc", "log", - "miow", - "ntapi", "wasi", - "winapi", -] - -[[package]] -name = "miow" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" -dependencies = [ - "winapi", -] - -[[package]] -name = "ntapi" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f" -dependencies = [ - "winapi", + "windows-sys 0.36.1", ] [[package]] @@ -261,7 +235,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-sys", + "windows-sys 0.32.0", ] [[package]] @@ -284,11 +258,11 @@ checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" [[package]] name = "proc-macro2" -version = "1.0.36" +version = "1.0.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7342d5883fbccae1cc37a2353b09c87c9b0f3afd73f5fb9bba687a1f733b029" +checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab" dependencies = [ - "unicode-xid", + "unicode-ident", ] [[package]] @@ -366,18 +340,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "serde" -version = "1.0.136" +version = "1.0.144" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789" +checksum = "0f747710de3dcd43b88c9168773254e809d8ddbdf9653b84e2554ab219f17860" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.136" +version = "1.0.144" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08597e7152fcd306f41838ed3e37be9eaeed2b61c42e2117266a554fab4662f9" +checksum = "94ed3a816fb1d101812f83e789f888322c34e291f894f19590dc310963e87a00" dependencies = [ "proc-macro2", "quote", @@ -397,14 +371,15 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.8.23" +version = "0.9.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a521f2940385c165a24ee286aa8599633d162077a54bdcae2a6fd5a7bfa7a0" +checksum = "8613d593412a0deb7bbd8de9d908efff5a0cb9ccd8f62c641e7b2ed2f57291d1" dependencies = [ "indexmap", + "itoa", "ryu", "serde", - "yaml-rust", + "unsafe-libyaml", ] [[package]] @@ -446,21 +421,22 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.89" +version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea297be220d52398dcc07ce15a209fce436d361735ac1db700cab3b6cdfb9f54" +checksum = "52205623b1b0f064a4e71182c3b18ae902267282930c6d5462c91b859668426e" dependencies = [ "proc-macro2", "quote", - "unicode-xid", + "unicode-ident", ] [[package]] name = "tokio" -version = "1.17.0" +version = "1.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" +checksum = "0020c875007ad96677dcc890298f4b942882c5d4eb7cc8f439fc3bf813dc9c95" dependencies = [ + "autocfg", "bytes", "libc", "memchr", @@ -487,10 +463,16 @@ dependencies = [ ] [[package]] -name = "unicode-xid" -version = "0.2.2" +name = "unicode-ident" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" +checksum = "dcc811dc4066ac62f84f11307873c4850cb653bfa9b1719cee2bd2204a4bc5dd" + +[[package]] +name = "unsafe-libyaml" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e5fa573d8ac5f1a856f8d7be41d390ee973daf97c806b2c1a465e4e1406e68" [[package]] name = "wasi" @@ -526,11 +508,24 @@ version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3df6e476185f92a12c072be4a189a0210dcdcf512a1891d6dff9edb874deadc6" dependencies = [ - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_msvc", + "windows_aarch64_msvc 0.32.0", + "windows_i686_gnu 0.32.0", + "windows_i686_msvc 0.32.0", + "windows_x86_64_gnu 0.32.0", + "windows_x86_64_msvc 0.32.0", +] + +[[package]] +name = "windows-sys" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" +dependencies = [ + "windows_aarch64_msvc 0.36.1", + "windows_i686_gnu 0.36.1", + "windows_i686_msvc 0.36.1", + "windows_x86_64_gnu 0.36.1", + "windows_x86_64_msvc 0.36.1", ] [[package]] @@ -539,24 +534,48 @@ version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8e92753b1c443191654ec532f14c199742964a061be25d77d7a96f09db20bf5" +[[package]] +name = "windows_aarch64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" + [[package]] name = "windows_i686_gnu" version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a711c68811799e017b6038e0922cb27a5e2f43a2ddb609fe0b6f3eeda9de615" +[[package]] +name = "windows_i686_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" + [[package]] name = "windows_i686_msvc" version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "146c11bb1a02615db74680b32a68e2d61f553cc24c4eb5b4ca10311740e44172" +[[package]] +name = "windows_i686_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" + [[package]] name = "windows_x86_64_gnu" version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c912b12f7454c6620635bbff3450962753834be2a594819bd5e945af18ec64bc" +[[package]] +name = "windows_x86_64_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" + [[package]] name = "windows_x86_64_msvc" version = "0.32.0" @@ -564,10 +583,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316" [[package]] -name = "yaml-rust" -version = "0.4.5" +name = "windows_x86_64_msvc" +version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" -dependencies = [ - "linked-hash-map", -] +checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" diff --git a/Cargo.toml b/Cargo.toml index 2696b46..c086116 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,16 +8,16 @@ license = "ISC" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio = { version = "1.17.0", features = ["full"] } +tokio = { version = "1.21.1", features = ["full"] } futures = "0.3.21" serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.79" -serde_yaml = "0.8.23" +serde_yaml = "0.9.13" simple-error = "0.2.3" rand = "0.8.5" [profile.release] -opt-level = 3 # Optimize for size. +opt-level = 3 # Optimize for size. lto = true # Enable Link Time Optimization codegen-units = 1 # Reduce number of codegen units to increase optimizations. panic = 'abort' # Abort on panic diff --git a/src/main.rs b/src/main.rs index bea0e8b..9de7a14 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ mod listener; mod shutdown; mod tcp; +mod udp; use serde::Deserialize; use simple_error::bail; @@ -14,14 +15,14 @@ use tokio::sync::{broadcast, RwLock}; #[derive(Debug, Deserialize)] struct Target { + udp: Option, source: String, targets: Vec, } #[derive(Debug, Deserialize)] struct Config { - tcp: Vec, - // udp: Vec, + mappings: Vec, } fn load_yaml(path: &Path) -> Result> { @@ -62,6 +63,7 @@ fn load_config() -> Result> { // #[derive(Debug)] struct ActiveListener { + udp: bool, notify_shutdown: broadcast::Sender<()>, targets: Arc>>, } @@ -75,27 +77,34 @@ async fn main() -> Result<(), Box> { let config = load_config().expect("config not found"); let mut required_listeners: HashSet = HashSet::new(); - for target in config.tcp { - required_listeners.insert(target.source.clone()); - if let Some(listener) = listeners.get(&target.source) { + for target in config.mappings { + let mut source_str = "".to_owned(); + if target.udp == None || target.udp == Some(false) { + source_str.push_str("udp:"); + } else { + source_str.push_str("tcp:"); + } + source_str.push_str(&target.source); + + required_listeners.insert(source_str.clone()); + if let Some(listener) = listeners.get(&source_str) { let mut invalid = false; - { - let targets = listener.targets.read().await; - for t in &target.targets { - if !targets.iter().any(|e| *e == *t) { + + let targets = listener.targets.read().await; + for t in &target.targets { + if !targets.iter().any(|e| *e == *t) { + invalid = true; + break; + } + } + + if !invalid { + for t in targets.iter() { + if !target.targets.iter().any(|e| *e == *t) { invalid = true; break; } } - - if !invalid { - for t in targets.iter() { - if !target.targets.iter().any(|e| *e == *t) { - invalid = true; - break; - } - } - } } if invalid { @@ -109,6 +118,7 @@ async fn main() -> Result<(), Box> { let listener = ActiveListener { notify_shutdown: notify_shutdown, targets: Arc::new(RwLock::new(target.targets)), + udp: target.udp == None || target.udp == Some(false), }; let l = listener::Listener { @@ -118,12 +128,18 @@ async fn main() -> Result<(), Box> { }; tokio::spawn(async move { - if let Err(err) = tcp::start_tcp_listener(l).await { - println!("listener error: {}", err); + if target.udp == None || target.udp == Some(false) { + if let Err(err) = tcp::start_tcp_listener(l).await { + println!("tcp listener error: {}", err); + } + } else { + if let Err(err) = udp::start_udp_listener(l).await { + println!("udp listener error: {}", err); + } } }); - listeners.insert(target.source, listener); + listeners.insert(source_str, listener); } } diff --git a/src/shutdown.rs b/src/shutdown.rs index b871005..69687fc 100644 --- a/src/shutdown.rs +++ b/src/shutdown.rs @@ -11,39 +11,48 @@ use tokio::sync::broadcast; /// received or not. #[derive(Debug)] pub(crate) struct Shutdown { - /// `true` if the shutdown signal has been received - shutdown: bool, + /// `true` if the shutdown signal has been received + shutdown: bool, - /// The receive half of the channel used to listen for shutdown. - notify: broadcast::Receiver<()>, + /// The receive half of the channel used to listen for shutdown. + notify: broadcast::Receiver<()>, } impl Shutdown { - /// Create a new `Shutdown` backed by the given `broadcast::Receiver`. - pub(crate) fn new(notify: broadcast::Receiver<()>) -> Shutdown { - Shutdown { - shutdown: false, - notify, - } - } + /// Create a new `Shutdown` backed by the given `broadcast::Receiver`. + pub(crate) fn new(notify: broadcast::Receiver<()>) -> Shutdown { + Shutdown { + shutdown: false, + notify, + } + } - /// Returns `true` if the shutdown signal has been received. - // pub(crate) fn is_shutdown(&self) -> bool { - // self.shutdown - // } + /// Returns `true` if the shutdown signal has been received. + // pub(crate) fn is_shutdown(&self) -> bool { + // self.shutdown + // } - /// Receive the shutdown notice, waiting if necessary. - pub(crate) async fn recv(&mut self) { - // If the shutdown signal has already been received, then return - // immediately. - if self.shutdown { - return; - } + /// Receive the shutdown notice, waiting if necessary. + pub(crate) async fn recv(&mut self) { + // If the shutdown signal has already been received, then return + // immediately. + if self.shutdown { + return; + } - // Cannot receive a "lag error" as only one value is ever sent. - let _ = self.notify.recv().await; + // Cannot receive a "lag error" as only one value is ever sent. + let _ = self.notify.recv().await; - // Remember that the signal has been received. - self.shutdown = true; - } + // Remember that the signal has been received. + self.shutdown = true; + } +} + +impl Clone for Shutdown { + fn clone(&self) -> Self { + Shutdown { + shutdown: self.shutdown, + notify: self.notify.resubscribe(), + } + } } diff --git a/src/tcp.rs b/src/tcp.rs index 489ace5..176d99b 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -5,54 +5,54 @@ use tokio::net::{TcpListener, TcpStream}; #[derive(Debug)] struct TcpHandler { - stream: TcpStream, - target: String, + stream: TcpStream, + target: String, } impl TcpHandler { - async fn run(&mut self) -> Result<(), Box> { - let mut stream = TcpStream::connect(&self.target).await?; + async fn run(&mut self) -> Result<(), Box> { + let mut stream = TcpStream::connect(&self.target).await?; - tokio::io::copy_bidirectional(&mut self.stream, &mut stream).await?; + tokio::io::copy_bidirectional(&mut self.stream, &mut stream).await?; - return Ok(()); - } + return Ok(()); + } } pub(crate) async fn start_tcp_listener( - mut listener_config: Listener, + mut listener_config: Listener, ) -> Result<(), Box> { - println!("start listening on {}", &listener_config.source); - let listener = TcpListener::bind(&listener_config.source).await?; + println!("start listening on {}", &listener_config.source); + let listener = TcpListener::bind(&listener_config.source).await?; - loop { - let (next_socket, _) = tokio::select! { - res = listener.accept() => res?, - _ = listener_config.shutdown.recv() => { - println!("Exiting listener!"); - return Ok(()); - } - }; + loop { + let (next_socket, _) = tokio::select! { + res = listener.accept() => res?, + _ = listener_config.shutdown.recv() => { + println!("Exiting listener!"); + return Ok(()); + } + }; - let targets = listener_config.targets.read().await; - let mut rng = rand::thread_rng(); - let selected_target = targets.choose(&mut rng).unwrap(); + let targets = listener_config.targets.read().await; + let mut rng = rand::thread_rng(); + let selected_target = targets.choose(&mut rng).unwrap(); - println!( - "new connection from {} forwarding to {}", - next_socket.peer_addr()?, - &selected_target - ); - let mut handler = TcpHandler { - stream: next_socket, - target: selected_target.clone(), - }; + println!( + "new connection from {} forwarding to {}", + next_socket.peer_addr()?, + &selected_target + ); + let mut handler = TcpHandler { + stream: next_socket, + target: selected_target.clone(), + }; - tokio::spawn(async move { - // Process the connection. If an error is encountered, log it. - if let Err(err) = handler.run().await { - println!("connection error {}", err); - } - }); - } + tokio::spawn(async move { + // Process the connection. If an error is encountered, log it. + if let Err(err) = handler.run().await { + println!("connection error {}", err); + } + }); + } } diff --git a/src/udp.rs b/src/udp.rs index 7b58ffe..cc38cd1 100644 --- a/src/udp.rs +++ b/src/udp.rs @@ -1,2 +1,176 @@ -// For UDP to work, there is some magic required with matches the returning packets -// back to the original sender. Idk. how to do that right now, but maybe some day. +use crate::listener::Listener; +use crate::shutdown::Shutdown; +use rand::seq::SliceRandom; +use std::collections::HashMap; +use std::error::Error; +use std::sync::Arc; +use tokio::net::UdpSocket; +use tokio::sync::broadcast; +use tokio::sync::mpsc::{channel, Sender}; +use tokio::sync::Mutex; +use tokio::time::Instant; + +async fn create_dual_udpsocket( + bind_addr: &String, +) -> Result<(UdpSocket, UdpSocket), Box> { + let listener_std = std::net::UdpSocket::bind(bind_addr)?; + let responder_std = listener_std.try_clone()?; + let listener = UdpSocket::from_std(listener_std)?; + let responder = UdpSocket::from_std(responder_std)?; + + return Ok((listener, responder)); +} + +fn get_udp_background_send(socket: UdpSocket, mut exit: Shutdown) -> Sender<(Vec, String)> { + let (tx, mut rx) = channel::<(Vec, String)>(1); + + tokio::spawn(async move { + loop { + let (buf, dest) = (tokio::select! { + res = rx.recv() => Some(res.unwrap()), + _ = exit.recv() => { + println!("Exiting listener!"); + return; + } + }) + .unwrap(); + + let to_send = buf.as_slice(); + socket.send_to(to_send, &dest).await.expect(&format!( + "Failed to forward response from upstream server to client {}", + dest + )); + } + }); + + return tx; +} + +struct UdpHandler { + last_packet: Arc>, + kill: broadcast::Sender<()>, + target: String, + sender: Sender<(Vec, String)>, +} + +impl UdpHandler { + async fn start( + target: String, + source: String, + sender: Sender<(Vec, String)>, + ) -> Result> { + // Kill Channel + let (tx, mut rx) = broadcast::channel::<()>(1); + + let (listener, responder) = create_dual_udpsocket(&"0.0.0.0:0".to_owned()).await?; + + let s = get_udp_background_send(responder, Shutdown::new(tx.subscribe())); + + let last_packet = Arc::new(Mutex::new(Instant::now())); + + let handler = UdpHandler { + kill: tx, + last_packet: last_packet.clone(), + // source: source.clone(), + target: target.clone(), + sender: s, + }; + + listener.connect(target).await?; + tokio::spawn(async move { + let mut buf = [0; 64 * 1024]; + loop { + let (num_bytes, _) = tokio::select! { + res = listener.recv_from(&mut buf) => res.unwrap(), + _ = rx.recv() => { + // FIXME: Source of memory leaks? + return ; + } + }; + let mut n = last_packet.lock().await; + *n = Instant::now(); + sender + .send((buf[0..num_bytes].to_vec(), source.clone())) + .await + .expect(&format!("Failed to send answer to sender {}", source)); + } + }); + return Ok(handler); + } + + async fn exit(&self) -> Result<(), Box> { + self.kill.send(())?; + + Ok(()) + } + + async fn on_packet(&mut self, pkg: Vec) -> Result<(), Box> { + let mut n = self.last_packet.lock().await; + *n = Instant::now(); + self.sender.send((pkg, self.target.clone())).await?; + return Ok(()); + } +} + +pub(crate) async fn start_udp_listener( + mut listener_config: Listener, +) -> Result<(), Box> { + println!("start listening on {}", &listener_config.source); + let (listener, responder) = + create_dual_udpsocket(&listener_config.source) + .await + .expect(&format!( + "Failed to clone primary listening address socket {}", + &listener_config.source, + )); + + let sender = get_udp_background_send(responder, listener_config.shutdown.clone()); + + let mut connections: HashMap = HashMap::new(); + + let mut buf = [0; 64 * 1024]; + loop { + let (num_bytes, src_addr) = tokio::select! { + res = listener.recv_from(&mut buf) => res?, + _ = listener_config.shutdown.recv() => { + println!("Exiting listener!"); + break; + } + }; + + let addr = src_addr.to_string(); + let handler_opt = connections.get_mut(&addr); + let vec = buf[0..num_bytes].to_vec(); + let handler = match handler_opt { + Some(handler) => handler, + None => { + let targets = listener_config.targets.read().await; + let selected_target = { + let mut rng = rand::thread_rng(); + targets.choose(&mut rng).unwrap() + }; + + let handler = + UdpHandler::start(selected_target.clone(), addr.clone(), sender.clone()) + .await?; + connections.insert(addr.clone(), handler); + + connections.get_mut(&addr).unwrap() + } + }; + + match handler.on_packet(vec).await { + Ok(_) => (), + Err(err) => { + println!("Failed to forward request from client to server {}", err); + return Ok(()); + } + } + } + + for handler in connections.values() { + handler.exit().await?; + } + + Ok(()) +}