Compare commits
7 Commits
Author | SHA1 | Date | |
---|---|---|---|
635488ddeb | |||
a7e06ca5ce | |||
ef1a922933 | |||
747c4ddf6f | |||
db06b7c7d0 | |||
05da7a97e0 | |||
f74409c9eb |
37
.github/workflows/ci.yml
vendored
Normal file
37
.github/workflows/ci.yml
vendored
Normal file
@ -0,0 +1,37 @@
|
||||
# .github/workflows/ci.yml
|
||||
|
||||
name: CI
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main]
|
||||
pull_request:
|
||||
branches: [main]
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
DOCKER_USERNAME: ${{ secrets.DOCKER_USERNAME }}
|
||||
DOCKER_TOKEN: ${{ secrets.DOCKER_TOKEN }}
|
||||
FORCE_COLOR: 1
|
||||
steps:
|
||||
- uses: earthly/actions/setup-earthly@v1
|
||||
with:
|
||||
version: v0.7.0
|
||||
- uses: actions/checkout@v2
|
||||
- name: Put back the git branch into git (Earthly uses it for tagging)
|
||||
run: |
|
||||
branch=""
|
||||
if [ -n "$GITHUB_HEAD_REF" ]; then
|
||||
branch="$GITHUB_HEAD_REF"
|
||||
else
|
||||
branch="${GITHUB_REF##*/}"
|
||||
fi
|
||||
git checkout -b "$branch" || true
|
||||
- name: Docker Login
|
||||
run: docker login docker.hibas123.de --username "$DOCKER_USERNAME" --password "$DOCKER_TOKEN"
|
||||
- name: Earthly version
|
||||
run: earthly --version
|
||||
- name: Run build
|
||||
run: earthly --push +docker-multi
|
1001
Cargo.lock
generated
1001
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
19
Cargo.toml
19
Cargo.toml
@ -1,23 +1,26 @@
|
||||
[package]
|
||||
name = "rustocat"
|
||||
version = "0.1.0"
|
||||
version = "0.1.10"
|
||||
edition = "2021"
|
||||
description = "Socat in rust with many less features and a configuration file"
|
||||
license = "ISC"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
[features]
|
||||
default = ["consul"]
|
||||
consul = []
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
futures = "0.3.21"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
serde_yaml = "0.9.13"
|
||||
simple-error = "0.2.3"
|
||||
rand = "0.8.5"
|
||||
log = "0.4.17"
|
||||
fern = "0.6.1"
|
||||
chrono = "0.4.23"
|
||||
serde_yaml = "0.9"
|
||||
rand = "0.8"
|
||||
log = "0.4"
|
||||
fern = "0.6"
|
||||
chrono = "0.4"
|
||||
async-trait = "0.1"
|
||||
reqwest = { version = "0.11", features = ["json", "rustls", "hyper-tls"], default-features = false }
|
||||
|
||||
[profile.release]
|
||||
opt-level = 3 # Optimize for size.
|
||||
|
26
Dockerfile
Normal file
26
Dockerfile
Normal file
@ -0,0 +1,26 @@
|
||||
FROM rust:alpine as builder
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
RUN apk add --no-cache musl-dev libssl3 openssl-dev
|
||||
|
||||
# This should fetch the index and cache it. This should reduce the time required of subsequent builds
|
||||
RUN cargo search test
|
||||
|
||||
COPY Cargo.toml Cargo.lock /app/
|
||||
# COPY .cargo /app/.cargo
|
||||
|
||||
COPY src /app/src
|
||||
|
||||
|
||||
RUN cargo build --release
|
||||
|
||||
FROM alpine
|
||||
|
||||
RUN apk add --no-cache libssl3
|
||||
COPY --from=builder /app/target/release/rustocat /usr/bin/rustocat
|
||||
|
||||
ENTRYPOINT ["/bin/sh"]
|
||||
CMD [ "-c", "/usr/bin/rustocat" ]
|
||||
|
||||
|
27
Earthfile
Normal file
27
Earthfile
Normal file
@ -0,0 +1,27 @@
|
||||
VERSION 0.7
|
||||
FROM rust:alpine
|
||||
WORKDIR /rustbuild
|
||||
|
||||
prepare-env:
|
||||
RUN apk add --no-cache musl-dev libssl3 openssl-dev
|
||||
RUN cargo search test # Super simple way to cache the cargo index
|
||||
|
||||
docker-multi:
|
||||
BUILD --platform linux/amd64 --platform linux/arm64 +docker
|
||||
|
||||
build:
|
||||
FROM +prepare-env
|
||||
COPY . .
|
||||
RUN cargo build --release
|
||||
SAVE ARTIFACT target/release/rustocat rustocat
|
||||
|
||||
docker:
|
||||
FROM alpine
|
||||
RUN apk add --no-cache libssl3
|
||||
COPY +build/rustocat rustocat
|
||||
ENTRYPOINT ["/bin/sh"]
|
||||
CMD ["-c", "/rustbuild/rustocat"]
|
||||
ARG EARTHLY_TARGET_TAG
|
||||
ARG TAG=$EARTHLY_TARGET_TAG
|
||||
SAVE IMAGE --push docker.hibas123.de/rustocat:latest
|
||||
SAVE IMAGE --push docker.hibas123.de/rustocat:$TAG
|
93
src/config.rs
Normal file
93
src/config.rs
Normal file
@ -0,0 +1,93 @@
|
||||
use std::{fs::File, path::Path};
|
||||
|
||||
use log::info;
|
||||
use serde::Deserialize;
|
||||
use tokio::signal::unix::Signal;
|
||||
|
||||
use crate::Result;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct Target {
|
||||
pub udp: Option<bool>,
|
||||
pub source: String,
|
||||
pub targets: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct Config {
|
||||
pub consul: Option<bool>,
|
||||
pub consul_http_addr: Option<String>,
|
||||
pub consul_http_token: Option<String>,
|
||||
mappings: Vec<Target>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait ConfigProvider {
|
||||
async fn get_targets(&self) -> Result<Vec<Target>>;
|
||||
async fn wait_for_change(&mut self) -> Result<()>;
|
||||
}
|
||||
|
||||
pub struct FileConfigProvider {
|
||||
sighup_stream: Signal,
|
||||
}
|
||||
|
||||
impl FileConfigProvider {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
sighup_stream: tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
|
||||
.expect("Failed to create sighup stream"),
|
||||
}
|
||||
}
|
||||
|
||||
fn load_yaml(&self, path: &Path) -> Result<Config> {
|
||||
let file = File::open(path)?;
|
||||
let config: Config = serde_yaml::from_reader(file).expect("Failed to parse!"); //TODO: Print path
|
||||
|
||||
return Ok(config);
|
||||
}
|
||||
|
||||
fn load_json(&self, path: &Path) -> Result<Config> {
|
||||
let file = File::open(path)?;
|
||||
let config: Config = serde_json::from_reader(file).expect("Failed to parse!"); //TODO: Print path
|
||||
|
||||
return Ok(config);
|
||||
}
|
||||
|
||||
pub fn load_config(&self) -> Result<Config> {
|
||||
for path in [
|
||||
"config.yaml",
|
||||
"config.json",
|
||||
"/etc/rustocat.yaml",
|
||||
"/etc/rustocat.json",
|
||||
]
|
||||
.iter()
|
||||
{
|
||||
// if(p)
|
||||
let config = if path.ends_with(".yaml") {
|
||||
self.load_yaml(Path::new(path))
|
||||
} else {
|
||||
self.load_json(Path::new(path))
|
||||
};
|
||||
if config.is_ok() {
|
||||
return config;
|
||||
}
|
||||
}
|
||||
Err("No config file found".into())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ConfigProvider for FileConfigProvider {
|
||||
async fn get_targets(&self) -> Result<Vec<Target>> {
|
||||
info!("Getting targets from file");
|
||||
let config = self.load_config()?;
|
||||
return Ok(config.mappings);
|
||||
}
|
||||
|
||||
async fn wait_for_change(&mut self) -> Result<()> {
|
||||
info!("Waiting for file config change (SIGHUP)");
|
||||
self.sighup_stream.recv().await;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
}
|
204
src/consul.rs
Normal file
204
src/consul.rs
Normal file
@ -0,0 +1,204 @@
|
||||
#![allow(non_snake_case)]
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use log::debug;
|
||||
use log::info;
|
||||
use log::trace;
|
||||
use log::warn;
|
||||
use reqwest::header::HeaderMap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::config::ConfigProvider;
|
||||
use crate::config::Target;
|
||||
|
||||
use crate::Result;
|
||||
|
||||
pub struct ConsulConfigProvider {
|
||||
consul_config: ConsulConfig,
|
||||
interval: tokio::time::Interval,
|
||||
}
|
||||
|
||||
impl ConsulConfigProvider {
|
||||
pub fn new(config: Option<&Config>) -> Self {
|
||||
Self {
|
||||
consul_config: if let Some(config) = config {
|
||||
ConsulConfig::from_config_or_env(config)
|
||||
} else {
|
||||
ConsulConfig::from_env()
|
||||
},
|
||||
interval: tokio::time::interval(tokio::time::Duration::from_secs(10)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ConfigProvider for ConsulConfigProvider {
|
||||
async fn get_targets(&self) -> Result<Vec<Target>> {
|
||||
info!("Getting targets from consul");
|
||||
let mut targets: Vec<Target> = Vec::new();
|
||||
|
||||
debug!("Calling consul_get_services");
|
||||
let services = consul_get_services(&self.consul_config).await?;
|
||||
|
||||
// Find consul services and tags
|
||||
// Format of tags: rustocat:udp:port
|
||||
// rustocat:tcp:port
|
||||
|
||||
debug!("Iterating over services: {:?}", services);
|
||||
for (name, tags) in services {
|
||||
for tag in tags {
|
||||
if tag.starts_with("rustocat") {
|
||||
trace!("Found rustocat tag: {}", tag);
|
||||
let parts = tag.split(":").collect::<Vec<&str>>();
|
||||
if parts.len() != 3 {
|
||||
warn!("Invalid tag: {} on service {}", tag, name);
|
||||
continue;
|
||||
}
|
||||
|
||||
let port = parts[2];
|
||||
trace!("Getting nodes for service: {}", name);
|
||||
let nodes = consul_get_service_nodes(&self.consul_config, &name).await?;
|
||||
|
||||
let mut t = vec![];
|
||||
for node in nodes {
|
||||
t.push(format!("{}:{}", node.ServiceAddress, node.ServicePort));
|
||||
}
|
||||
let target = Target {
|
||||
udp: Some(parts[1] == "udp"),
|
||||
source: format!("0.0.0.0:{}", port),
|
||||
targets: t,
|
||||
};
|
||||
trace!("Adding target: {:?}", target);
|
||||
targets.push(target);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(targets)
|
||||
}
|
||||
|
||||
async fn wait_for_change(&mut self) -> Result<()> {
|
||||
info!("Waiting for consul config change");
|
||||
self.interval.tick().await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn consul_get_services(config: &ConsulConfig) -> Result<HashMap<String, Vec<String>>> {
|
||||
let mut headers = HeaderMap::new();
|
||||
if let Some(token) = config.token.clone() {
|
||||
headers.insert("X-Consul-Token", token.parse()?);
|
||||
}
|
||||
|
||||
return Ok(reqwest::Client::new()
|
||||
.get(format!("{}/v1/catalog/services", config.baseurl))
|
||||
.headers(headers)
|
||||
.send()
|
||||
.await?
|
||||
.json::<HashMap<String, Vec<String>>>()
|
||||
.await?);
|
||||
}
|
||||
|
||||
async fn consul_get_service_nodes(config: &ConsulConfig, service: &str) -> Result<Vec<Node>> {
|
||||
let mut headers = HeaderMap::new();
|
||||
if let Some(token) = config.token.clone() {
|
||||
headers.insert("X-Consul-Token", token.parse()?);
|
||||
}
|
||||
|
||||
trace!(
|
||||
"Calling consul_get_service_nodes: {}/v1/catalog/service/{service}",
|
||||
config.baseurl
|
||||
);
|
||||
|
||||
return Ok(reqwest::Client::new()
|
||||
.get(format!("{}/v1/catalog/service/{service}", config.baseurl))
|
||||
.headers(headers)
|
||||
.send()
|
||||
.await?
|
||||
.json::<Vec<Node>>()
|
||||
.await?);
|
||||
}
|
||||
|
||||
#[derive(Eq, Default, PartialEq, Serialize, Deserialize, Debug)]
|
||||
#[serde(default)]
|
||||
struct AgentService {
|
||||
ID: String,
|
||||
Service: String,
|
||||
Tags: Option<Vec<String>>,
|
||||
Port: u16,
|
||||
Address: String,
|
||||
EnableTagOverride: bool,
|
||||
CreateIndex: u64,
|
||||
ModifyIndex: u64,
|
||||
}
|
||||
|
||||
#[derive(Eq, Default, PartialEq, Serialize, Deserialize, Debug)]
|
||||
#[serde(default)]
|
||||
struct HealthCheck {
|
||||
Node: String,
|
||||
CheckID: String,
|
||||
Name: String,
|
||||
Status: String,
|
||||
Notes: String,
|
||||
Output: String,
|
||||
ServiceID: String,
|
||||
ServiceName: String,
|
||||
ServiceTags: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
#[derive(Eq, Default, PartialEq, Serialize, Deserialize, Debug)]
|
||||
#[serde(default)]
|
||||
struct Node {
|
||||
ID: String,
|
||||
Node: String,
|
||||
ServiceAddress: String,
|
||||
ServicePort: u16,
|
||||
Datacenter: Option<String>,
|
||||
TaggedAddresses: Option<HashMap<String, String>>,
|
||||
Meta: Option<HashMap<String, String>>,
|
||||
CreateIndex: u64,
|
||||
ModifyIndex: u64,
|
||||
}
|
||||
|
||||
#[derive(Eq, Default, PartialEq, Serialize, Deserialize, Debug)]
|
||||
#[serde(default)]
|
||||
struct ServiceEntry {
|
||||
Node: Node,
|
||||
Service: AgentService,
|
||||
Checks: Vec<HealthCheck>,
|
||||
}
|
||||
|
||||
struct ConsulConfig {
|
||||
baseurl: String,
|
||||
token: Option<String>,
|
||||
}
|
||||
|
||||
impl ConsulConfig {
|
||||
fn from_env() -> Self {
|
||||
Self {
|
||||
baseurl: option_env!("CONSUL_HTTP_ADDR")
|
||||
.expect("CONSUL_HTTP_ADDR not set")
|
||||
.to_string(),
|
||||
token: option_env!("CONSUL_HTTP_TOKEN").map(|s| s.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn from_config_or_env(config: &Config) -> Self {
|
||||
let baseurl = match config.consul_http_addr {
|
||||
Some(ref s) => s.clone(),
|
||||
None => option_env!("CONSUL_HTTP_ADDR")
|
||||
.expect("CONSUL_HTTP_ADDR not set")
|
||||
.to_string(),
|
||||
};
|
||||
|
||||
let token = match config.consul_http_token {
|
||||
Some(ref s) => Some(s.clone()),
|
||||
None => option_env!("CONSUL_HTTP_TOKEN").map(|s| s.to_string()),
|
||||
};
|
||||
|
||||
Self { baseurl, token }
|
||||
}
|
||||
}
|
112
src/main.rs
112
src/main.rs
@ -1,69 +1,21 @@
|
||||
mod config;
|
||||
mod listener;
|
||||
mod shutdown;
|
||||
mod tcp;
|
||||
mod udp;
|
||||
|
||||
#[cfg(feature = "consul")]
|
||||
mod consul;
|
||||
|
||||
use config::ConfigProvider;
|
||||
use log::{debug, error, info, warn};
|
||||
use serde::Deserialize;
|
||||
use simple_error::bail;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::error::Error;
|
||||
use std::fs::File;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
use tokio::sync::{broadcast, RwLock};
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Box<dyn Error>>;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct Target {
|
||||
udp: Option<bool>,
|
||||
source: String,
|
||||
targets: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct Config {
|
||||
mappings: Vec<Target>,
|
||||
}
|
||||
|
||||
fn load_yaml(path: &Path) -> Result<Config> {
|
||||
let file = File::open(path)?;
|
||||
let config: Config = serde_yaml::from_reader(file).expect("Failed to parse!"); //TODO: Print path
|
||||
|
||||
return Ok(config);
|
||||
}
|
||||
|
||||
fn load_json(path: &Path) -> Result<Config> {
|
||||
let file = File::open(path)?;
|
||||
let config: Config = serde_json::from_reader(file).expect("Failed to parse!"); //TODO: Print path
|
||||
|
||||
return Ok(config);
|
||||
}
|
||||
|
||||
fn load_config() -> Result<Config> {
|
||||
for path in [
|
||||
"config.yaml",
|
||||
"config.json",
|
||||
"/etc/rustocat.yaml",
|
||||
"/etc/rustocat.json",
|
||||
]
|
||||
.iter()
|
||||
{
|
||||
// if(p)
|
||||
let config = if path.ends_with(".yaml") {
|
||||
load_yaml(Path::new(path))
|
||||
} else {
|
||||
load_json(Path::new(path))
|
||||
};
|
||||
if config.is_ok() {
|
||||
return config;
|
||||
}
|
||||
}
|
||||
bail!("No config file found");
|
||||
}
|
||||
|
||||
// #[derive(Debug)]
|
||||
struct ActiveListener {
|
||||
notify_shutdown: broadcast::Sender<()>,
|
||||
@ -84,18 +36,56 @@ async fn main() -> Result<()> {
|
||||
})
|
||||
// Add blanket level filter -
|
||||
.level(log::LevelFilter::Info)
|
||||
.level_for("rustocat", log::LevelFilter::Trace)
|
||||
.level_for("rustocat", log::LevelFilter::Debug)
|
||||
.chain(std::io::stdout())
|
||||
.apply()?;
|
||||
|
||||
let mut listeners: HashMap<String, ActiveListener> = HashMap::new();
|
||||
let mut sighup_stream = signal(SignalKind::hangup())?;
|
||||
let listeners: HashMap<String, ActiveListener> = HashMap::new();
|
||||
let text_config_provider = config::FileConfigProvider::new();
|
||||
|
||||
let target_provider: Box<dyn config::ConfigProvider> = {
|
||||
#[cfg(feature = "consul")]
|
||||
{
|
||||
let cfg = text_config_provider.load_config()?;
|
||||
info!("Loaded yaml config");
|
||||
if cfg.consul.is_some() && cfg.consul.unwrap() {
|
||||
info!("Using consul config provider");
|
||||
let consul_config_provider = consul::ConsulConfigProvider::new(Some(&cfg));
|
||||
info!("Loaded consul config provider");
|
||||
Box::new(consul_config_provider)
|
||||
} else {
|
||||
Box::new(text_config_provider)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "consul"))]
|
||||
{
|
||||
let cfg = Box::new(text_config_provider);
|
||||
info!("Loaded yaml config");
|
||||
cfg
|
||||
}
|
||||
};
|
||||
|
||||
match run_loop(target_provider, listeners).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("Error in run loop: {}", e);
|
||||
info!("Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_loop(
|
||||
mut target_provider: Box<dyn ConfigProvider>,
|
||||
mut listeners: HashMap<String, ActiveListener>,
|
||||
) -> Result<()> {
|
||||
loop {
|
||||
let config = load_config().expect("config not found");
|
||||
let mappings = target_provider.get_targets().await?;
|
||||
let mut required_listeners: HashSet<String> = HashSet::new();
|
||||
|
||||
for target in config.mappings {
|
||||
for target in mappings {
|
||||
let mut source_str = "".to_owned();
|
||||
if target.udp == None || target.udp == Some(false) {
|
||||
source_str.push_str("udp:");
|
||||
@ -151,7 +141,7 @@ async fn main() -> Result<()> {
|
||||
}
|
||||
} else {
|
||||
if let Err(err) = udp::start_udp_listener(l).await {
|
||||
error!("udp listener error: {}", err);
|
||||
error!("udp listener error {}: {}", target.source, err);
|
||||
}
|
||||
}
|
||||
});
|
||||
@ -183,7 +173,7 @@ async fn main() -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
sighup_stream.recv().await;
|
||||
info!("Recevied SIGHUP, reloading config!");
|
||||
target_provider.wait_for_change().await?;
|
||||
info!("Reloading config!");
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicI32, AtomicU64};
|
||||
use std::sync::atomic::AtomicI32;
|
||||
use std::sync::Arc;
|
||||
|
||||
use log::{debug, error, info, trace};
|
||||
@ -7,13 +7,12 @@ use rand::seq::SliceRandom;
|
||||
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::listener::Listener;
|
||||
use crate::shutdown::{Shutdown, ShutdownReceiver};
|
||||
use crate::Result;
|
||||
|
||||
const CONNECTION_TIMEOUT: i32 = 5;
|
||||
const CONNECTION_TIMEOUT: i32 = 30;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct UDPMultiSender {
|
||||
|
Reference in New Issue
Block a user