Compare commits

..

15 Commits
udp ... main

Author SHA1 Message Date
Fabian Stamm
01bc71ab86 Hide consul config behind a flag
Some checks failed
CI / build (push) Has been cancelled
2024-11-09 21:48:22 +01:00
Fabian Stamm
cf98c93a89 Update dependencies
Some checks are pending
CI / build (push) Waiting to run
2024-11-09 21:45:59 +01:00
Fabian Stamm
d57d0c2db9 Make dependencies feature dependant
Some checks are pending
CI / build (push) Waiting to run
2024-11-09 21:38:53 +01:00
Fabian Stamm
f2ad24655c Bump version
Some checks failed
CI / build (push) Failing after 1m38s
2024-04-14 00:30:57 +02:00
Fabian Stamm
9bc700e0f6 Handle the case, that the bind ip address is only available after start
Some checks failed
CI / build (push) Failing after 1m1s
2024-04-13 23:57:46 +02:00
Fabian Stamm
5e6552dd6c Change docker repository
All checks were successful
CI / build (push) Successful in 23m2s
2023-07-13 14:58:40 +02:00
Fabian Stamm
41d10c5c94 Add listener closing when the target set has changed.
All checks were successful
CI / build (push) Successful in 22m46s
2023-07-13 11:53:51 +02:00
c90a8fff4a Bumping version to 0.1.11 2023-06-07 08:08:25 +02:00
f290fa2eb1 Adjusting Earthfile 2023-06-07 08:07:40 +02:00
635488ddeb Bumping version 2023-06-06 22:28:23 +02:00
a7e06ca5ce Adding Earthfile for CI 2023-06-06 22:27:49 +02:00
ef1a922933 Make it work in nomad 2023-04-14 04:13:41 +02:00
747c4ddf6f Remove unused dependency 2023-02-06 14:24:46 +01:00
db06b7c7d0 Integrating consul catalog for autoconfig of listeners 2023-02-06 14:21:05 +01:00
05da7a97e0 Adding Dockerfile 2023-02-05 15:11:45 +01:00
11 changed files with 1562 additions and 445 deletions

1
.earthlyignore Normal file
View File

@ -0,0 +1 @@
target/

36
.github/workflows/ci.yml vendored Normal file
View File

@ -0,0 +1,36 @@
# .github/workflows/ci.yml
name: CI
on:
push:
pull_request:
branches: [main]
jobs:
build:
runs-on: ubuntu-latest
env:
MY_DOCKER_USERNAME: ${{ secrets.MY_DOCKER_USERNAME }}
MY_DOCKER_PASSWORD: ${{ secrets.MY_DOCKER_PASSWORD }}
FORCE_COLOR: 1
steps:
- uses: https://github.com/earthly/actions-setup@v1
with:
version: v0.7.0
- uses: actions/checkout@v2
- name: Put back the git branch into git (Earthly uses it for tagging)
run: |
branch=""
if [ -n "$GITHUB_HEAD_REF" ]; then
branch="$GITHUB_HEAD_REF"
else
branch="${GITHUB_REF##*/}"
fi
git checkout -b "$branch" || true
- name: Docker Login
run: docker login git.hibas.dev --username "$MY_DOCKER_USERNAME" --password "$MY_DOCKER_PASSWORD"
- name: Earthly version
run: earthly --version
- name: Run build
run: earthly --push +docker-multi

1402
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,23 +1,29 @@
[package]
name = "rustocat"
version = "0.1.0"
version = "0.1.13"
edition = "2021"
description = "Socat in rust with many less features and a configuration file"
license = "ISC"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["consul"]
consul = ["reqwest"]
[dependencies]
tokio = { version = "1", features = ["full"] }
futures = "0.3.21"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_yaml = "0.9.13"
simple-error = "0.2.3"
rand = "0.8.5"
log = "0.4.17"
fern = "0.6.1"
chrono = "0.4.23"
serde_yaml = "0.9"
rand = "0.8"
log = "0.4"
fern = "0.7"
chrono = "0.4"
async-trait = "0.1"
reqwest = { version = "0.12", optional = true, features = [
"json",
"default-tls",
], default-features = false }
[profile.release]
opt-level = 3 # Optimize for size.

25
Earthfile Normal file
View File

@ -0,0 +1,25 @@
VERSION 0.7
FROM rust:alpine
WORKDIR /rustbuild
prepare-env:
RUN apk add --no-cache musl-dev libssl3 openssl-dev
RUN cargo search test # Super simple way to cache the cargo index
docker-multi:
BUILD --platform linux/amd64 --platform linux/arm64 +docker
build:
FROM +prepare-env
COPY . .
RUN cargo build --release
SAVE ARTIFACT target/release/rustocat rustocat
docker:
FROM alpine
RUN apk add --no-cache libssl3
COPY +build/rustocat rustocat
ENTRYPOINT ["./rustocat"]
ARG EARTHLY_TARGET_TAG
ARG TAG=$EARTHLY_TARGET_TAG
SAVE IMAGE --push git.hibas.dev/ops/rustocat:$TAG

112
src/config.rs Normal file
View File

@ -0,0 +1,112 @@
use std::{fs::File, path::Path};
use log::info;
use serde::Deserialize;
use tokio::signal::unix::Signal;
use crate::Result;
#[derive(Debug, Deserialize)]
pub struct Target {
pub udp: Option<bool>,
pub source: String,
pub targets: Vec<String>,
}
#[derive(Debug, Deserialize)]
pub struct Config {
#[cfg(feature = "consul")]
pub consul: Option<bool>,
#[cfg(feature = "consul")]
pub consul_http_addr: Option<String>,
#[cfg(feature = "consul")]
pub consul_http_token: Option<String>,
mappings: Vec<Target>,
}
#[async_trait::async_trait]
pub trait ConfigProvider {
async fn get_targets(&self) -> Result<Vec<Target>>;
async fn wait_for_change(&mut self) -> Result<()>;
}
pub struct FileConfigProvider {
sighup_stream: Signal,
}
impl FileConfigProvider {
pub fn new() -> Self {
Self {
sighup_stream: tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
.expect("Failed to create sighup stream"),
}
}
fn load_yaml(&self, path: &Path) -> Result<Config> {
let file = File::open(path)?;
let config: Config = serde_yaml::from_reader(file)?; //TODO: Print path
return Ok(config);
}
fn load_json(&self, path: &Path) -> Result<Config> {
let file = File::open(path)?;
let config: Config = serde_json::from_reader(file)?; //TODO: Print path
return Ok(config);
}
pub fn load_config(&self) -> Result<Config> {
let configs = [
"config.yaml",
"config.yml",
"config.json",
"/etc/rustocat.yaml",
"/etc/rustocat.json",
]
.iter()
.map(|path| {
if std::path::Path::new(path).exists() {
return Some(path);
} else {
return None;
}
})
.filter(|p| p.is_some())
.map(|p| p.unwrap())
.map(|path| {
if path.ends_with(".yaml") {
self.load_yaml(Path::new(path))
} else {
self.load_json(Path::new(path))
}
});
for config in configs {
match config {
Ok(c) => return Ok(c),
Err(e) => {
info!("Error loading config: {}", e);
}
}
}
Err("No (valid) config file found".into())
}
}
#[async_trait::async_trait]
impl ConfigProvider for FileConfigProvider {
async fn get_targets(&self) -> Result<Vec<Target>> {
info!("Getting targets from file");
let config = self.load_config()?;
return Ok(config.mappings);
}
async fn wait_for_change(&mut self) -> Result<()> {
info!("Waiting for file config change (SIGHUP)");
self.sighup_stream.recv().await;
return Ok(());
}
}

204
src/consul.rs Normal file
View File

@ -0,0 +1,204 @@
#![allow(non_snake_case)]
use std::collections::HashMap;
use log::debug;
use log::info;
use log::trace;
use log::warn;
use reqwest::header::HeaderMap;
use serde::{Deserialize, Serialize};
use crate::config::Config;
use crate::config::ConfigProvider;
use crate::config::Target;
use crate::Result;
pub struct ConsulConfigProvider {
consul_config: ConsulConfig,
interval: tokio::time::Interval,
}
impl ConsulConfigProvider {
pub fn new(config: Option<&Config>) -> Self {
Self {
consul_config: if let Some(config) = config {
ConsulConfig::from_config_or_env(config)
} else {
ConsulConfig::from_env()
},
interval: tokio::time::interval(tokio::time::Duration::from_secs(10)),
}
}
}
#[async_trait::async_trait]
impl ConfigProvider for ConsulConfigProvider {
async fn get_targets(&self) -> Result<Vec<Target>> {
info!("Getting targets from consul");
let mut targets: Vec<Target> = Vec::new();
debug!("Calling consul_get_services");
let services = consul_get_services(&self.consul_config).await?;
// Find consul services and tags
// Format of tags: rustocat:udp:port
// rustocat:tcp:port
debug!("Iterating over services: {:?}", services);
for (name, tags) in services {
for tag in tags {
if tag.starts_with("rustocat") {
trace!("Found rustocat tag: {}", tag);
let parts = tag.split(":").collect::<Vec<&str>>();
if parts.len() != 3 {
warn!("Invalid tag: {} on service {}", tag, name);
continue;
}
let port = parts[2];
trace!("Getting nodes for service: {}", name);
let nodes = consul_get_service_nodes(&self.consul_config, &name).await?;
let mut t = vec![];
for node in nodes {
t.push(format!("{}:{}", node.ServiceAddress, node.ServicePort));
}
let target = Target {
udp: Some(parts[1] == "udp"),
source: format!("0.0.0.0:{}", port),
targets: t,
};
trace!("Adding target: {:?}", target);
targets.push(target);
}
}
}
Ok(targets)
}
async fn wait_for_change(&mut self) -> Result<()> {
info!("Waiting for consul config change");
self.interval.tick().await;
Ok(())
}
}
async fn consul_get_services(config: &ConsulConfig) -> Result<HashMap<String, Vec<String>>> {
let mut headers = HeaderMap::new();
if let Some(token) = config.token.clone() {
headers.insert("X-Consul-Token", token.parse()?);
}
return Ok(reqwest::Client::new()
.get(format!("{}/v1/catalog/services", config.baseurl))
.headers(headers)
.send()
.await?
.json::<HashMap<String, Vec<String>>>()
.await?);
}
async fn consul_get_service_nodes(config: &ConsulConfig, service: &str) -> Result<Vec<Node>> {
let mut headers = HeaderMap::new();
if let Some(token) = config.token.clone() {
headers.insert("X-Consul-Token", token.parse()?);
}
trace!(
"Calling consul_get_service_nodes: {}/v1/catalog/service/{service}",
config.baseurl
);
return Ok(reqwest::Client::new()
.get(format!("{}/v1/catalog/service/{service}", config.baseurl))
.headers(headers)
.send()
.await?
.json::<Vec<Node>>()
.await?);
}
#[derive(Eq, Default, PartialEq, Serialize, Deserialize, Debug)]
#[serde(default)]
struct AgentService {
ID: String,
Service: String,
Tags: Option<Vec<String>>,
Port: u16,
Address: String,
EnableTagOverride: bool,
CreateIndex: u64,
ModifyIndex: u64,
}
#[derive(Eq, Default, PartialEq, Serialize, Deserialize, Debug)]
#[serde(default)]
struct HealthCheck {
Node: String,
CheckID: String,
Name: String,
Status: String,
Notes: String,
Output: String,
ServiceID: String,
ServiceName: String,
ServiceTags: Option<Vec<String>>,
}
#[derive(Eq, Default, PartialEq, Serialize, Deserialize, Debug)]
#[serde(default)]
struct Node {
ID: String,
Node: String,
ServiceAddress: String,
ServicePort: u16,
Datacenter: Option<String>,
TaggedAddresses: Option<HashMap<String, String>>,
Meta: Option<HashMap<String, String>>,
CreateIndex: u64,
ModifyIndex: u64,
}
#[derive(Eq, Default, PartialEq, Serialize, Deserialize, Debug)]
#[serde(default)]
struct ServiceEntry {
Node: Node,
Service: AgentService,
Checks: Vec<HealthCheck>,
}
struct ConsulConfig {
baseurl: String,
token: Option<String>,
}
impl ConsulConfig {
fn from_env() -> Self {
Self {
baseurl: option_env!("CONSUL_HTTP_ADDR")
.expect("CONSUL_HTTP_ADDR not set")
.to_string(),
token: option_env!("CONSUL_HTTP_TOKEN").map(|s| s.to_string()),
}
}
fn from_config_or_env(config: &Config) -> Self {
let baseurl = match config.consul_http_addr {
Some(ref s) => s.clone(),
None => option_env!("CONSUL_HTTP_ADDR")
.expect("CONSUL_HTTP_ADDR not set")
.to_string(),
};
let token = match config.consul_http_token {
Some(ref s) => Some(s.clone()),
None => option_env!("CONSUL_HTTP_TOKEN").map(|s| s.to_string()),
};
Self { baseurl, token }
}
}

View File

@ -1,9 +1,11 @@
use crate::shutdown::ShutdownReceiver;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::RwLock;
pub(crate) struct Listener {
pub(crate) shutdown: ShutdownReceiver,
pub(crate) source: String,
pub(crate) targets: Arc<RwLock<Vec<String>>>,
pub(crate) targets_changed: broadcast::Receiver<()>,
}

View File

@ -1,73 +1,26 @@
mod config;
mod listener;
mod shutdown;
mod tcp;
mod udp;
#[cfg(feature = "consul")]
mod consul;
use config::ConfigProvider;
use log::{debug, error, info, warn};
use serde::Deserialize;
use simple_error::bail;
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::fs::File;
use std::path::Path;
use std::sync::Arc;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{broadcast, RwLock};
pub type Result<T> = std::result::Result<T, Box<dyn Error>>;
#[derive(Debug, Deserialize)]
struct Target {
udp: Option<bool>,
source: String,
targets: Vec<String>,
}
#[derive(Debug, Deserialize)]
struct Config {
mappings: Vec<Target>,
}
fn load_yaml(path: &Path) -> Result<Config> {
let file = File::open(path)?;
let config: Config = serde_yaml::from_reader(file).expect("Failed to parse!"); //TODO: Print path
return Ok(config);
}
fn load_json(path: &Path) -> Result<Config> {
let file = File::open(path)?;
let config: Config = serde_json::from_reader(file).expect("Failed to parse!"); //TODO: Print path
return Ok(config);
}
fn load_config() -> Result<Config> {
for path in [
"config.yaml",
"config.json",
"/etc/rustocat.yaml",
"/etc/rustocat.json",
]
.iter()
{
// if(p)
let config = if path.ends_with(".yaml") {
load_yaml(Path::new(path))
} else {
load_json(Path::new(path))
};
if config.is_ok() {
return config;
}
}
bail!("No config file found");
}
// #[derive(Debug)]
struct ActiveListener {
notify_shutdown: broadcast::Sender<()>,
targets: Arc<RwLock<Vec<String>>>,
notify_targets: broadcast::Sender<()>,
}
#[tokio::main]
@ -84,18 +37,56 @@ async fn main() -> Result<()> {
})
// Add blanket level filter -
.level(log::LevelFilter::Info)
.level_for("rustocat", log::LevelFilter::Trace)
.level_for("rustocat", log::LevelFilter::Debug)
.chain(std::io::stdout())
.apply()?;
let mut listeners: HashMap<String, ActiveListener> = HashMap::new();
let mut sighup_stream = signal(SignalKind::hangup())?;
let listeners: HashMap<String, ActiveListener> = HashMap::new();
let text_config_provider = config::FileConfigProvider::new();
let target_provider: Box<dyn config::ConfigProvider> = {
#[cfg(feature = "consul")]
{
let cfg = text_config_provider.load_config()?;
info!("Loaded yaml config");
if cfg.consul.is_some() && cfg.consul.unwrap() {
info!("Using consul config provider");
let consul_config_provider = consul::ConsulConfigProvider::new(Some(&cfg));
info!("Loaded consul config provider");
Box::new(consul_config_provider)
} else {
Box::new(text_config_provider)
}
}
#[cfg(not(feature = "consul"))]
{
let cfg = Box::new(text_config_provider);
info!("Loaded yaml config");
cfg
}
};
match run_loop(target_provider, listeners).await {
Ok(_) => {}
Err(e) => {
error!("Error in run loop: {}", e);
info!("Exiting");
}
}
Ok(())
}
async fn run_loop(
mut target_provider: Box<dyn ConfigProvider>,
mut listeners: HashMap<String, ActiveListener>,
) -> Result<()> {
loop {
let config = load_config().expect("config not found");
let mappings = target_provider.get_targets().await?;
let mut required_listeners: HashSet<String> = HashSet::new();
for target in config.mappings {
for target in mappings {
let mut source_str = "".to_owned();
if target.udp == None || target.udp == Some(false) {
source_str.push_str("udp:");
@ -129,19 +120,24 @@ async fn main() -> Result<()> {
warn!("Found invalid targets! Adjusting!");
let mut w = listener.targets.write().await;
*w = target.targets;
_ = listener.notify_targets.send(())?; //TODO: Maybe ignore this error
}
} else {
let (notify_shutdown, _) = broadcast::channel(1);
let (notify_targets, _) = broadcast::channel(1);
let listener = ActiveListener {
notify_shutdown: notify_shutdown,
notify_shutdown,
targets: Arc::new(RwLock::new(target.targets)),
notify_targets,
};
let l = listener::Listener {
shutdown: shutdown::ShutdownReceiver::new(listener.notify_shutdown.subscribe()),
source: target.source.clone(),
targets: listener.targets.clone(),
targets_changed: listener.notify_targets.subscribe(),
};
tokio::spawn(async move {
@ -151,7 +147,7 @@ async fn main() -> Result<()> {
}
} else {
if let Err(err) = udp::start_udp_listener(l).await {
error!("udp listener error: {}", err);
error!("udp listener error {}: {}", target.source, err);
}
}
});
@ -178,12 +174,14 @@ async fn main() -> Result<()> {
}
}
// TODO: Maybe Wait for the listener to shut down
debug!("Removing listener!");
listeners.remove(&del_key);
}
}
sighup_stream.recv().await;
info!("Recevied SIGHUP, reloading config!");
target_provider.wait_for_change().await?;
info!("Reloading config!");
}
}

View File

@ -1,5 +1,5 @@
use crate::listener::Listener;
use log::{info, trace, warn};
use log::{error, info, trace, warn};
use rand::seq::SliceRandom;
use std::error::Error;
use tokio::net::{TcpListener, TcpStream};
@ -24,11 +24,41 @@ pub(crate) async fn start_tcp_listener(
mut listener_config: Listener,
) -> Result<(), Box<dyn Error>> {
info!("start listening on {}", &listener_config.source);
let listener = TcpListener::bind(&listener_config.source).await?;
let listener = loop {
match TcpListener::bind(&listener_config.source).await {
Ok(listener) => break listener,
Err(e) => match e.kind() {
std::io::ErrorKind::AddrInUse => {
warn!("Address in use: {}", &listener_config.source);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
std::io::ErrorKind::AddrNotAvailable => {
warn!("Address not available: {}", &listener_config.source);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
_ => {
error!(
"Error binding to {}: {} ({})",
&listener_config.source,
e,
e.kind()
);
trace!("Error: {}", e);
return Err(Box::new(e));
}
},
}
};
info!("listening on {}", listener.local_addr()?);
loop {
let (next_socket, _) = tokio::select! {
res = listener.accept() => res?,
_ = listener_config.targets_changed.recv() => {
info!("Targets changed!");
continue;
}
_ = listener_config.shutdown.recv() => {
info!("Exiting listener!");
return Ok(());

View File

@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::sync::atomic::AtomicI32;
use std::sync::Arc;
use log::{debug, error, info, trace};
use log::{debug, error, info, trace, warn};
use rand::seq::SliceRandom;
use tokio::net::UdpSocket;
@ -12,7 +12,7 @@ use crate::listener::Listener;
use crate::shutdown::{Shutdown, ShutdownReceiver};
use crate::Result;
const CONNECTION_TIMEOUT: i32 = 5;
const CONNECTION_TIMEOUT: i32 = 30;
#[derive(Clone)]
struct UDPMultiSender {
@ -38,7 +38,27 @@ impl UDPMultiSender {
}
async fn splitted_udp_socket(bind_addr: &str) -> Result<(UdpSocket, UDPMultiSender)> {
let listener_std = std::net::UdpSocket::bind(bind_addr)?;
let listener_std = loop {
match std::net::UdpSocket::bind(bind_addr) {
Ok(listener) => break listener,
Err(e) => match e.kind() {
std::io::ErrorKind::AddrInUse => {
warn!("Address in use: {}", bind_addr);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
std::io::ErrorKind::AddrNotAvailable => {
warn!("Address not available: {}", bind_addr);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
_ => {
error!("Error binding to {}: {} ({})", bind_addr, e, e.kind());
trace!("Error: {}", e);
return Err(Box::new(e));
}
},
}
};
let responder_std = listener_std.try_clone()?;
listener_std.set_nonblocking(true)?;
responder_std.set_nonblocking(true)?;
@ -180,6 +200,15 @@ pub(crate) async fn start_udp_listener(mut listener_config: Listener) -> Result<
trace!("Waiting for packet or shutdown");
let (num_bytes, src_addr) = tokio::select! {
res = listener.recv_from(&mut buf) => res?,
_ = listener_config.targets_changed.recv() => {
info!("Targets changed!");
info!("Closing all connections!");
for (_, handler) in connections.lock().await.iter_mut() {
handler.close().await;
}
info!("Closed all connections!");
continue;
}
_ = listener_config.shutdown.recv() => {
info!("Exiting listener!");
break;
@ -220,5 +249,7 @@ pub(crate) async fn start_udp_listener(mut listener_config: Listener) -> Result<
handler.close().await;
}
println!("Listener closed.");
return Ok(());
}