9 Commits
0.1.10 ... main

Author SHA1 Message Date
01bc71ab86 Hide consul config behind a flag
Some checks failed
CI / build (push) Has been cancelled
2024-11-09 21:48:22 +01:00
cf98c93a89 Update dependencies
Some checks are pending
CI / build (push) Waiting to run
2024-11-09 21:45:59 +01:00
d57d0c2db9 Make dependencies feature dependant
Some checks are pending
CI / build (push) Waiting to run
2024-11-09 21:38:53 +01:00
f2ad24655c Bump version
Some checks failed
CI / build (push) Failing after 1m38s
2024-04-14 00:30:57 +02:00
9bc700e0f6 Handle the case, that the bind ip address is only available after start
Some checks failed
CI / build (push) Failing after 1m1s
2024-04-13 23:57:46 +02:00
5e6552dd6c Change docker repository
All checks were successful
CI / build (push) Successful in 23m2s
2023-07-13 14:58:40 +02:00
41d10c5c94 Add listener closing when the target set has changed.
All checks were successful
CI / build (push) Successful in 22m46s
2023-07-13 11:53:51 +02:00
c90a8fff4a Bumping version to 0.1.11 2023-06-07 08:08:25 +02:00
f290fa2eb1 Adjusting Earthfile 2023-06-07 08:07:40 +02:00
11 changed files with 806 additions and 716 deletions

1
.earthlyignore Normal file
View File

@ -0,0 +1 @@
target/

View File

@ -4,7 +4,6 @@ name: CI
on: on:
push: push:
branches: [main]
pull_request: pull_request:
branches: [main] branches: [main]
@ -12,11 +11,11 @@ jobs:
build: build:
runs-on: ubuntu-latest runs-on: ubuntu-latest
env: env:
DOCKER_USERNAME: ${{ secrets.DOCKER_USERNAME }} MY_DOCKER_USERNAME: ${{ secrets.MY_DOCKER_USERNAME }}
DOCKER_TOKEN: ${{ secrets.DOCKER_TOKEN }} MY_DOCKER_PASSWORD: ${{ secrets.MY_DOCKER_PASSWORD }}
FORCE_COLOR: 1 FORCE_COLOR: 1
steps: steps:
- uses: earthly/actions/setup-earthly@v1 - uses: https://github.com/earthly/actions-setup@v1
with: with:
version: v0.7.0 version: v0.7.0
- uses: actions/checkout@v2 - uses: actions/checkout@v2
@ -30,7 +29,7 @@ jobs:
fi fi
git checkout -b "$branch" || true git checkout -b "$branch" || true
- name: Docker Login - name: Docker Login
run: docker login docker.hibas123.de --username "$DOCKER_USERNAME" --password "$DOCKER_TOKEN" run: docker login git.hibas.dev --username "$MY_DOCKER_USERNAME" --password "$MY_DOCKER_PASSWORD"
- name: Earthly version - name: Earthly version
run: earthly --version run: earthly --version
- name: Run build - name: Run build

1339
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[package] [package]
name = "rustocat" name = "rustocat"
version = "0.1.10" version = "0.1.13"
edition = "2021" edition = "2021"
description = "Socat in rust with many less features and a configuration file" description = "Socat in rust with many less features and a configuration file"
license = "ISC" license = "ISC"
@ -8,7 +8,7 @@ license = "ISC"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features] [features]
default = ["consul"] default = ["consul"]
consul = [] consul = ["reqwest"]
[dependencies] [dependencies]
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
@ -17,14 +17,17 @@ serde_json = "1"
serde_yaml = "0.9" serde_yaml = "0.9"
rand = "0.8" rand = "0.8"
log = "0.4" log = "0.4"
fern = "0.6" fern = "0.7"
chrono = "0.4" chrono = "0.4"
async-trait = "0.1" async-trait = "0.1"
reqwest = { version = "0.11", features = ["json", "rustls", "hyper-tls"], default-features = false } reqwest = { version = "0.12", optional = true, features = [
"json",
"default-tls",
], default-features = false }
[profile.release] [profile.release]
opt-level = 3 # Optimize for size. opt-level = 3 # Optimize for size.
lto = true # Enable Link Time Optimization lto = true # Enable Link Time Optimization
codegen-units = 1 # Reduce number of codegen units to increase optimizations. codegen-units = 1 # Reduce number of codegen units to increase optimizations.
panic = 'abort' # Abort on panic panic = 'abort' # Abort on panic
strip = true # Strip symbols from binary* strip = true # Strip symbols from binary*

View File

@ -1,26 +0,0 @@
FROM rust:alpine as builder
WORKDIR /app
RUN apk add --no-cache musl-dev libssl3 openssl-dev
# This should fetch the index and cache it. This should reduce the time required of subsequent builds
RUN cargo search test
COPY Cargo.toml Cargo.lock /app/
# COPY .cargo /app/.cargo
COPY src /app/src
RUN cargo build --release
FROM alpine
RUN apk add --no-cache libssl3
COPY --from=builder /app/target/release/rustocat /usr/bin/rustocat
ENTRYPOINT ["/bin/sh"]
CMD [ "-c", "/usr/bin/rustocat" ]

View File

@ -19,9 +19,7 @@ docker:
FROM alpine FROM alpine
RUN apk add --no-cache libssl3 RUN apk add --no-cache libssl3
COPY +build/rustocat rustocat COPY +build/rustocat rustocat
ENTRYPOINT ["/bin/sh"] ENTRYPOINT ["./rustocat"]
CMD ["-c", "/rustbuild/rustocat"]
ARG EARTHLY_TARGET_TAG ARG EARTHLY_TARGET_TAG
ARG TAG=$EARTHLY_TARGET_TAG ARG TAG=$EARTHLY_TARGET_TAG
SAVE IMAGE --push docker.hibas123.de/rustocat:latest SAVE IMAGE --push git.hibas.dev/ops/rustocat:$TAG
SAVE IMAGE --push docker.hibas123.de/rustocat:$TAG

View File

@ -15,8 +15,11 @@ pub struct Target {
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
pub struct Config { pub struct Config {
#[cfg(feature = "consul")]
pub consul: Option<bool>, pub consul: Option<bool>,
#[cfg(feature = "consul")]
pub consul_http_addr: Option<String>, pub consul_http_addr: Option<String>,
#[cfg(feature = "consul")]
pub consul_http_token: Option<String>, pub consul_http_token: Option<String>,
mappings: Vec<Target>, mappings: Vec<Target>,
} }
@ -41,38 +44,54 @@ impl FileConfigProvider {
fn load_yaml(&self, path: &Path) -> Result<Config> { fn load_yaml(&self, path: &Path) -> Result<Config> {
let file = File::open(path)?; let file = File::open(path)?;
let config: Config = serde_yaml::from_reader(file).expect("Failed to parse!"); //TODO: Print path let config: Config = serde_yaml::from_reader(file)?; //TODO: Print path
return Ok(config); return Ok(config);
} }
fn load_json(&self, path: &Path) -> Result<Config> { fn load_json(&self, path: &Path) -> Result<Config> {
let file = File::open(path)?; let file = File::open(path)?;
let config: Config = serde_json::from_reader(file).expect("Failed to parse!"); //TODO: Print path let config: Config = serde_json::from_reader(file)?; //TODO: Print path
return Ok(config); return Ok(config);
} }
pub fn load_config(&self) -> Result<Config> { pub fn load_config(&self) -> Result<Config> {
for path in [ let configs = [
"config.yaml", "config.yaml",
"config.yml",
"config.json", "config.json",
"/etc/rustocat.yaml", "/etc/rustocat.yaml",
"/etc/rustocat.json", "/etc/rustocat.json",
] ]
.iter() .iter()
{ .map(|path| {
// if(p) if std::path::Path::new(path).exists() {
let config = if path.ends_with(".yaml") { return Some(path);
} else {
return None;
}
})
.filter(|p| p.is_some())
.map(|p| p.unwrap())
.map(|path| {
if path.ends_with(".yaml") {
self.load_yaml(Path::new(path)) self.load_yaml(Path::new(path))
} else { } else {
self.load_json(Path::new(path)) self.load_json(Path::new(path))
}; }
if config.is_ok() { });
return config;
for config in configs {
match config {
Ok(c) => return Ok(c),
Err(e) => {
info!("Error loading config: {}", e);
}
} }
} }
Err("No config file found".into())
Err("No (valid) config file found".into())
} }
} }

View File

@ -1,9 +1,11 @@
use crate::shutdown::ShutdownReceiver; use crate::shutdown::ShutdownReceiver;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::RwLock; use tokio::sync::RwLock;
pub(crate) struct Listener { pub(crate) struct Listener {
pub(crate) shutdown: ShutdownReceiver, pub(crate) shutdown: ShutdownReceiver,
pub(crate) source: String, pub(crate) source: String,
pub(crate) targets: Arc<RwLock<Vec<String>>>, pub(crate) targets: Arc<RwLock<Vec<String>>>,
pub(crate) targets_changed: broadcast::Receiver<()>,
} }

View File

@ -20,6 +20,7 @@ pub type Result<T> = std::result::Result<T, Box<dyn Error>>;
struct ActiveListener { struct ActiveListener {
notify_shutdown: broadcast::Sender<()>, notify_shutdown: broadcast::Sender<()>,
targets: Arc<RwLock<Vec<String>>>, targets: Arc<RwLock<Vec<String>>>,
notify_targets: broadcast::Sender<()>,
} }
#[tokio::main] #[tokio::main]
@ -119,19 +120,24 @@ async fn run_loop(
warn!("Found invalid targets! Adjusting!"); warn!("Found invalid targets! Adjusting!");
let mut w = listener.targets.write().await; let mut w = listener.targets.write().await;
*w = target.targets; *w = target.targets;
_ = listener.notify_targets.send(())?; //TODO: Maybe ignore this error
} }
} else { } else {
let (notify_shutdown, _) = broadcast::channel(1); let (notify_shutdown, _) = broadcast::channel(1);
let (notify_targets, _) = broadcast::channel(1);
let listener = ActiveListener { let listener = ActiveListener {
notify_shutdown: notify_shutdown, notify_shutdown,
targets: Arc::new(RwLock::new(target.targets)), targets: Arc::new(RwLock::new(target.targets)),
notify_targets,
}; };
let l = listener::Listener { let l = listener::Listener {
shutdown: shutdown::ShutdownReceiver::new(listener.notify_shutdown.subscribe()), shutdown: shutdown::ShutdownReceiver::new(listener.notify_shutdown.subscribe()),
source: target.source.clone(), source: target.source.clone(),
targets: listener.targets.clone(), targets: listener.targets.clone(),
targets_changed: listener.notify_targets.subscribe(),
}; };
tokio::spawn(async move { tokio::spawn(async move {
@ -168,6 +174,8 @@ async fn run_loop(
} }
} }
// TODO: Maybe Wait for the listener to shut down
debug!("Removing listener!"); debug!("Removing listener!");
listeners.remove(&del_key); listeners.remove(&del_key);
} }

View File

@ -1,5 +1,5 @@
use crate::listener::Listener; use crate::listener::Listener;
use log::{info, trace, warn}; use log::{error, info, trace, warn};
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use std::error::Error; use std::error::Error;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
@ -24,11 +24,41 @@ pub(crate) async fn start_tcp_listener(
mut listener_config: Listener, mut listener_config: Listener,
) -> Result<(), Box<dyn Error>> { ) -> Result<(), Box<dyn Error>> {
info!("start listening on {}", &listener_config.source); info!("start listening on {}", &listener_config.source);
let listener = TcpListener::bind(&listener_config.source).await?; let listener = loop {
match TcpListener::bind(&listener_config.source).await {
Ok(listener) => break listener,
Err(e) => match e.kind() {
std::io::ErrorKind::AddrInUse => {
warn!("Address in use: {}", &listener_config.source);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
std::io::ErrorKind::AddrNotAvailable => {
warn!("Address not available: {}", &listener_config.source);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
_ => {
error!(
"Error binding to {}: {} ({})",
&listener_config.source,
e,
e.kind()
);
trace!("Error: {}", e);
return Err(Box::new(e));
}
},
}
};
info!("listening on {}", listener.local_addr()?);
loop { loop {
let (next_socket, _) = tokio::select! { let (next_socket, _) = tokio::select! {
res = listener.accept() => res?, res = listener.accept() => res?,
_ = listener_config.targets_changed.recv() => {
info!("Targets changed!");
continue;
}
_ = listener_config.shutdown.recv() => { _ = listener_config.shutdown.recv() => {
info!("Exiting listener!"); info!("Exiting listener!");
return Ok(()); return Ok(());

View File

@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::sync::atomic::AtomicI32; use std::sync::atomic::AtomicI32;
use std::sync::Arc; use std::sync::Arc;
use log::{debug, error, info, trace}; use log::{debug, error, info, trace, warn};
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
@ -38,7 +38,27 @@ impl UDPMultiSender {
} }
async fn splitted_udp_socket(bind_addr: &str) -> Result<(UdpSocket, UDPMultiSender)> { async fn splitted_udp_socket(bind_addr: &str) -> Result<(UdpSocket, UDPMultiSender)> {
let listener_std = std::net::UdpSocket::bind(bind_addr)?; let listener_std = loop {
match std::net::UdpSocket::bind(bind_addr) {
Ok(listener) => break listener,
Err(e) => match e.kind() {
std::io::ErrorKind::AddrInUse => {
warn!("Address in use: {}", bind_addr);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
std::io::ErrorKind::AddrNotAvailable => {
warn!("Address not available: {}", bind_addr);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
_ => {
error!("Error binding to {}: {} ({})", bind_addr, e, e.kind());
trace!("Error: {}", e);
return Err(Box::new(e));
}
},
}
};
let responder_std = listener_std.try_clone()?; let responder_std = listener_std.try_clone()?;
listener_std.set_nonblocking(true)?; listener_std.set_nonblocking(true)?;
responder_std.set_nonblocking(true)?; responder_std.set_nonblocking(true)?;
@ -180,6 +200,15 @@ pub(crate) async fn start_udp_listener(mut listener_config: Listener) -> Result<
trace!("Waiting for packet or shutdown"); trace!("Waiting for packet or shutdown");
let (num_bytes, src_addr) = tokio::select! { let (num_bytes, src_addr) = tokio::select! {
res = listener.recv_from(&mut buf) => res?, res = listener.recv_from(&mut buf) => res?,
_ = listener_config.targets_changed.recv() => {
info!("Targets changed!");
info!("Closing all connections!");
for (_, handler) in connections.lock().await.iter_mut() {
handler.close().await;
}
info!("Closed all connections!");
continue;
}
_ = listener_config.shutdown.recv() => { _ = listener_config.shutdown.recv() => {
info!("Exiting listener!"); info!("Exiting listener!");
break; break;
@ -220,5 +249,7 @@ pub(crate) async fn start_udp_listener(mut listener_config: Listener) -> Result<
handler.close().await; handler.close().await;
} }
println!("Listener closed.");
return Ok(()); return Ok(());
} }