Compare commits
No commits in common. "main" and "udp" have entirely different histories.
@ -1 +0,0 @@
|
|||||||
target/
|
|
36
.github/workflows/ci.yml
vendored
36
.github/workflows/ci.yml
vendored
@ -1,36 +0,0 @@
|
|||||||
# .github/workflows/ci.yml
|
|
||||||
|
|
||||||
name: CI
|
|
||||||
|
|
||||||
on:
|
|
||||||
push:
|
|
||||||
pull_request:
|
|
||||||
branches: [main]
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
build:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
env:
|
|
||||||
MY_DOCKER_USERNAME: ${{ secrets.MY_DOCKER_USERNAME }}
|
|
||||||
MY_DOCKER_PASSWORD: ${{ secrets.MY_DOCKER_PASSWORD }}
|
|
||||||
FORCE_COLOR: 1
|
|
||||||
steps:
|
|
||||||
- uses: https://github.com/earthly/actions-setup@v1
|
|
||||||
with:
|
|
||||||
version: v0.7.0
|
|
||||||
- uses: actions/checkout@v2
|
|
||||||
- name: Put back the git branch into git (Earthly uses it for tagging)
|
|
||||||
run: |
|
|
||||||
branch=""
|
|
||||||
if [ -n "$GITHUB_HEAD_REF" ]; then
|
|
||||||
branch="$GITHUB_HEAD_REF"
|
|
||||||
else
|
|
||||||
branch="${GITHUB_REF##*/}"
|
|
||||||
fi
|
|
||||||
git checkout -b "$branch" || true
|
|
||||||
- name: Docker Login
|
|
||||||
run: docker login git.hibas.dev --username "$MY_DOCKER_USERNAME" --password "$MY_DOCKER_PASSWORD"
|
|
||||||
- name: Earthly version
|
|
||||||
run: earthly --version
|
|
||||||
- name: Run build
|
|
||||||
run: earthly --push +docker-multi
|
|
1368
Cargo.lock
generated
1368
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
22
Cargo.toml
22
Cargo.toml
@ -1,29 +1,23 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "rustocat"
|
name = "rustocat"
|
||||||
version = "0.1.13"
|
version = "0.1.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
description = "Socat in rust with many less features and a configuration file"
|
description = "Socat in rust with many less features and a configuration file"
|
||||||
license = "ISC"
|
license = "ISC"
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
[features]
|
|
||||||
default = ["consul"]
|
|
||||||
consul = ["reqwest"]
|
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
futures = "0.3.21"
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
serde_json = "1"
|
serde_json = "1"
|
||||||
serde_yaml = "0.9"
|
serde_yaml = "0.9.13"
|
||||||
rand = "0.8"
|
simple-error = "0.2.3"
|
||||||
log = "0.4"
|
rand = "0.8.5"
|
||||||
fern = "0.7"
|
log = "0.4.17"
|
||||||
chrono = "0.4"
|
fern = "0.6.1"
|
||||||
async-trait = "0.1"
|
chrono = "0.4.23"
|
||||||
reqwest = { version = "0.12", optional = true, features = [
|
|
||||||
"json",
|
|
||||||
"default-tls",
|
|
||||||
], default-features = false }
|
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
opt-level = 3 # Optimize for size.
|
opt-level = 3 # Optimize for size.
|
||||||
|
25
Earthfile
25
Earthfile
@ -1,25 +0,0 @@
|
|||||||
VERSION 0.7
|
|
||||||
FROM rust:alpine
|
|
||||||
WORKDIR /rustbuild
|
|
||||||
|
|
||||||
prepare-env:
|
|
||||||
RUN apk add --no-cache musl-dev libssl3 openssl-dev
|
|
||||||
RUN cargo search test # Super simple way to cache the cargo index
|
|
||||||
|
|
||||||
docker-multi:
|
|
||||||
BUILD --platform linux/amd64 --platform linux/arm64 +docker
|
|
||||||
|
|
||||||
build:
|
|
||||||
FROM +prepare-env
|
|
||||||
COPY . .
|
|
||||||
RUN cargo build --release
|
|
||||||
SAVE ARTIFACT target/release/rustocat rustocat
|
|
||||||
|
|
||||||
docker:
|
|
||||||
FROM alpine
|
|
||||||
RUN apk add --no-cache libssl3
|
|
||||||
COPY +build/rustocat rustocat
|
|
||||||
ENTRYPOINT ["./rustocat"]
|
|
||||||
ARG EARTHLY_TARGET_TAG
|
|
||||||
ARG TAG=$EARTHLY_TARGET_TAG
|
|
||||||
SAVE IMAGE --push git.hibas.dev/ops/rustocat:$TAG
|
|
112
src/config.rs
112
src/config.rs
@ -1,112 +0,0 @@
|
|||||||
use std::{fs::File, path::Path};
|
|
||||||
|
|
||||||
use log::info;
|
|
||||||
use serde::Deserialize;
|
|
||||||
use tokio::signal::unix::Signal;
|
|
||||||
|
|
||||||
use crate::Result;
|
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
|
||||||
pub struct Target {
|
|
||||||
pub udp: Option<bool>,
|
|
||||||
pub source: String,
|
|
||||||
pub targets: Vec<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
|
||||||
pub struct Config {
|
|
||||||
#[cfg(feature = "consul")]
|
|
||||||
pub consul: Option<bool>,
|
|
||||||
#[cfg(feature = "consul")]
|
|
||||||
pub consul_http_addr: Option<String>,
|
|
||||||
#[cfg(feature = "consul")]
|
|
||||||
pub consul_http_token: Option<String>,
|
|
||||||
mappings: Vec<Target>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
pub trait ConfigProvider {
|
|
||||||
async fn get_targets(&self) -> Result<Vec<Target>>;
|
|
||||||
async fn wait_for_change(&mut self) -> Result<()>;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct FileConfigProvider {
|
|
||||||
sighup_stream: Signal,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FileConfigProvider {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
sighup_stream: tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
|
|
||||||
.expect("Failed to create sighup stream"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn load_yaml(&self, path: &Path) -> Result<Config> {
|
|
||||||
let file = File::open(path)?;
|
|
||||||
let config: Config = serde_yaml::from_reader(file)?; //TODO: Print path
|
|
||||||
|
|
||||||
return Ok(config);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn load_json(&self, path: &Path) -> Result<Config> {
|
|
||||||
let file = File::open(path)?;
|
|
||||||
let config: Config = serde_json::from_reader(file)?; //TODO: Print path
|
|
||||||
|
|
||||||
return Ok(config);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn load_config(&self) -> Result<Config> {
|
|
||||||
let configs = [
|
|
||||||
"config.yaml",
|
|
||||||
"config.yml",
|
|
||||||
"config.json",
|
|
||||||
"/etc/rustocat.yaml",
|
|
||||||
"/etc/rustocat.json",
|
|
||||||
]
|
|
||||||
.iter()
|
|
||||||
.map(|path| {
|
|
||||||
if std::path::Path::new(path).exists() {
|
|
||||||
return Some(path);
|
|
||||||
} else {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.filter(|p| p.is_some())
|
|
||||||
.map(|p| p.unwrap())
|
|
||||||
.map(|path| {
|
|
||||||
if path.ends_with(".yaml") {
|
|
||||||
self.load_yaml(Path::new(path))
|
|
||||||
} else {
|
|
||||||
self.load_json(Path::new(path))
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
for config in configs {
|
|
||||||
match config {
|
|
||||||
Ok(c) => return Ok(c),
|
|
||||||
Err(e) => {
|
|
||||||
info!("Error loading config: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Err("No (valid) config file found".into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl ConfigProvider for FileConfigProvider {
|
|
||||||
async fn get_targets(&self) -> Result<Vec<Target>> {
|
|
||||||
info!("Getting targets from file");
|
|
||||||
let config = self.load_config()?;
|
|
||||||
return Ok(config.mappings);
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn wait_for_change(&mut self) -> Result<()> {
|
|
||||||
info!("Waiting for file config change (SIGHUP)");
|
|
||||||
self.sighup_stream.recv().await;
|
|
||||||
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
|
204
src/consul.rs
204
src/consul.rs
@ -1,204 +0,0 @@
|
|||||||
#![allow(non_snake_case)]
|
|
||||||
|
|
||||||
use std::collections::HashMap;
|
|
||||||
|
|
||||||
use log::debug;
|
|
||||||
use log::info;
|
|
||||||
use log::trace;
|
|
||||||
use log::warn;
|
|
||||||
use reqwest::header::HeaderMap;
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
|
|
||||||
use crate::config::Config;
|
|
||||||
use crate::config::ConfigProvider;
|
|
||||||
use crate::config::Target;
|
|
||||||
|
|
||||||
use crate::Result;
|
|
||||||
|
|
||||||
pub struct ConsulConfigProvider {
|
|
||||||
consul_config: ConsulConfig,
|
|
||||||
interval: tokio::time::Interval,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ConsulConfigProvider {
|
|
||||||
pub fn new(config: Option<&Config>) -> Self {
|
|
||||||
Self {
|
|
||||||
consul_config: if let Some(config) = config {
|
|
||||||
ConsulConfig::from_config_or_env(config)
|
|
||||||
} else {
|
|
||||||
ConsulConfig::from_env()
|
|
||||||
},
|
|
||||||
interval: tokio::time::interval(tokio::time::Duration::from_secs(10)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl ConfigProvider for ConsulConfigProvider {
|
|
||||||
async fn get_targets(&self) -> Result<Vec<Target>> {
|
|
||||||
info!("Getting targets from consul");
|
|
||||||
let mut targets: Vec<Target> = Vec::new();
|
|
||||||
|
|
||||||
debug!("Calling consul_get_services");
|
|
||||||
let services = consul_get_services(&self.consul_config).await?;
|
|
||||||
|
|
||||||
// Find consul services and tags
|
|
||||||
// Format of tags: rustocat:udp:port
|
|
||||||
// rustocat:tcp:port
|
|
||||||
|
|
||||||
debug!("Iterating over services: {:?}", services);
|
|
||||||
for (name, tags) in services {
|
|
||||||
for tag in tags {
|
|
||||||
if tag.starts_with("rustocat") {
|
|
||||||
trace!("Found rustocat tag: {}", tag);
|
|
||||||
let parts = tag.split(":").collect::<Vec<&str>>();
|
|
||||||
if parts.len() != 3 {
|
|
||||||
warn!("Invalid tag: {} on service {}", tag, name);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let port = parts[2];
|
|
||||||
trace!("Getting nodes for service: {}", name);
|
|
||||||
let nodes = consul_get_service_nodes(&self.consul_config, &name).await?;
|
|
||||||
|
|
||||||
let mut t = vec![];
|
|
||||||
for node in nodes {
|
|
||||||
t.push(format!("{}:{}", node.ServiceAddress, node.ServicePort));
|
|
||||||
}
|
|
||||||
let target = Target {
|
|
||||||
udp: Some(parts[1] == "udp"),
|
|
||||||
source: format!("0.0.0.0:{}", port),
|
|
||||||
targets: t,
|
|
||||||
};
|
|
||||||
trace!("Adding target: {:?}", target);
|
|
||||||
targets.push(target);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(targets)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn wait_for_change(&mut self) -> Result<()> {
|
|
||||||
info!("Waiting for consul config change");
|
|
||||||
self.interval.tick().await;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn consul_get_services(config: &ConsulConfig) -> Result<HashMap<String, Vec<String>>> {
|
|
||||||
let mut headers = HeaderMap::new();
|
|
||||||
if let Some(token) = config.token.clone() {
|
|
||||||
headers.insert("X-Consul-Token", token.parse()?);
|
|
||||||
}
|
|
||||||
|
|
||||||
return Ok(reqwest::Client::new()
|
|
||||||
.get(format!("{}/v1/catalog/services", config.baseurl))
|
|
||||||
.headers(headers)
|
|
||||||
.send()
|
|
||||||
.await?
|
|
||||||
.json::<HashMap<String, Vec<String>>>()
|
|
||||||
.await?);
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn consul_get_service_nodes(config: &ConsulConfig, service: &str) -> Result<Vec<Node>> {
|
|
||||||
let mut headers = HeaderMap::new();
|
|
||||||
if let Some(token) = config.token.clone() {
|
|
||||||
headers.insert("X-Consul-Token", token.parse()?);
|
|
||||||
}
|
|
||||||
|
|
||||||
trace!(
|
|
||||||
"Calling consul_get_service_nodes: {}/v1/catalog/service/{service}",
|
|
||||||
config.baseurl
|
|
||||||
);
|
|
||||||
|
|
||||||
return Ok(reqwest::Client::new()
|
|
||||||
.get(format!("{}/v1/catalog/service/{service}", config.baseurl))
|
|
||||||
.headers(headers)
|
|
||||||
.send()
|
|
||||||
.await?
|
|
||||||
.json::<Vec<Node>>()
|
|
||||||
.await?);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Eq, Default, PartialEq, Serialize, Deserialize, Debug)]
|
|
||||||
#[serde(default)]
|
|
||||||
struct AgentService {
|
|
||||||
ID: String,
|
|
||||||
Service: String,
|
|
||||||
Tags: Option<Vec<String>>,
|
|
||||||
Port: u16,
|
|
||||||
Address: String,
|
|
||||||
EnableTagOverride: bool,
|
|
||||||
CreateIndex: u64,
|
|
||||||
ModifyIndex: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Eq, Default, PartialEq, Serialize, Deserialize, Debug)]
|
|
||||||
#[serde(default)]
|
|
||||||
struct HealthCheck {
|
|
||||||
Node: String,
|
|
||||||
CheckID: String,
|
|
||||||
Name: String,
|
|
||||||
Status: String,
|
|
||||||
Notes: String,
|
|
||||||
Output: String,
|
|
||||||
ServiceID: String,
|
|
||||||
ServiceName: String,
|
|
||||||
ServiceTags: Option<Vec<String>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Eq, Default, PartialEq, Serialize, Deserialize, Debug)]
|
|
||||||
#[serde(default)]
|
|
||||||
struct Node {
|
|
||||||
ID: String,
|
|
||||||
Node: String,
|
|
||||||
ServiceAddress: String,
|
|
||||||
ServicePort: u16,
|
|
||||||
Datacenter: Option<String>,
|
|
||||||
TaggedAddresses: Option<HashMap<String, String>>,
|
|
||||||
Meta: Option<HashMap<String, String>>,
|
|
||||||
CreateIndex: u64,
|
|
||||||
ModifyIndex: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Eq, Default, PartialEq, Serialize, Deserialize, Debug)]
|
|
||||||
#[serde(default)]
|
|
||||||
struct ServiceEntry {
|
|
||||||
Node: Node,
|
|
||||||
Service: AgentService,
|
|
||||||
Checks: Vec<HealthCheck>,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct ConsulConfig {
|
|
||||||
baseurl: String,
|
|
||||||
token: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ConsulConfig {
|
|
||||||
fn from_env() -> Self {
|
|
||||||
Self {
|
|
||||||
baseurl: option_env!("CONSUL_HTTP_ADDR")
|
|
||||||
.expect("CONSUL_HTTP_ADDR not set")
|
|
||||||
.to_string(),
|
|
||||||
token: option_env!("CONSUL_HTTP_TOKEN").map(|s| s.to_string()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn from_config_or_env(config: &Config) -> Self {
|
|
||||||
let baseurl = match config.consul_http_addr {
|
|
||||||
Some(ref s) => s.clone(),
|
|
||||||
None => option_env!("CONSUL_HTTP_ADDR")
|
|
||||||
.expect("CONSUL_HTTP_ADDR not set")
|
|
||||||
.to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let token = match config.consul_http_token {
|
|
||||||
Some(ref s) => Some(s.clone()),
|
|
||||||
None => option_env!("CONSUL_HTTP_TOKEN").map(|s| s.to_string()),
|
|
||||||
};
|
|
||||||
|
|
||||||
Self { baseurl, token }
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,11 +1,9 @@
|
|||||||
use crate::shutdown::ShutdownReceiver;
|
use crate::shutdown::ShutdownReceiver;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::broadcast;
|
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
pub(crate) struct Listener {
|
pub(crate) struct Listener {
|
||||||
pub(crate) shutdown: ShutdownReceiver,
|
pub(crate) shutdown: ShutdownReceiver,
|
||||||
pub(crate) source: String,
|
pub(crate) source: String,
|
||||||
pub(crate) targets: Arc<RwLock<Vec<String>>>,
|
pub(crate) targets: Arc<RwLock<Vec<String>>>,
|
||||||
pub(crate) targets_changed: broadcast::Receiver<()>,
|
|
||||||
}
|
}
|
||||||
|
122
src/main.rs
122
src/main.rs
@ -1,26 +1,73 @@
|
|||||||
mod config;
|
|
||||||
mod listener;
|
mod listener;
|
||||||
mod shutdown;
|
mod shutdown;
|
||||||
mod tcp;
|
mod tcp;
|
||||||
mod udp;
|
mod udp;
|
||||||
|
|
||||||
#[cfg(feature = "consul")]
|
|
||||||
mod consul;
|
|
||||||
|
|
||||||
use config::ConfigProvider;
|
|
||||||
use log::{debug, error, info, warn};
|
use log::{debug, error, info, warn};
|
||||||
|
use serde::Deserialize;
|
||||||
|
use simple_error::bail;
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
|
use std::fs::File;
|
||||||
|
use std::path::Path;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use tokio::signal::unix::{signal, SignalKind};
|
||||||
use tokio::sync::{broadcast, RwLock};
|
use tokio::sync::{broadcast, RwLock};
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, Box<dyn Error>>;
|
pub type Result<T> = std::result::Result<T, Box<dyn Error>>;
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
struct Target {
|
||||||
|
udp: Option<bool>,
|
||||||
|
source: String,
|
||||||
|
targets: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
struct Config {
|
||||||
|
mappings: Vec<Target>,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load_yaml(path: &Path) -> Result<Config> {
|
||||||
|
let file = File::open(path)?;
|
||||||
|
let config: Config = serde_yaml::from_reader(file).expect("Failed to parse!"); //TODO: Print path
|
||||||
|
|
||||||
|
return Ok(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load_json(path: &Path) -> Result<Config> {
|
||||||
|
let file = File::open(path)?;
|
||||||
|
let config: Config = serde_json::from_reader(file).expect("Failed to parse!"); //TODO: Print path
|
||||||
|
|
||||||
|
return Ok(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load_config() -> Result<Config> {
|
||||||
|
for path in [
|
||||||
|
"config.yaml",
|
||||||
|
"config.json",
|
||||||
|
"/etc/rustocat.yaml",
|
||||||
|
"/etc/rustocat.json",
|
||||||
|
]
|
||||||
|
.iter()
|
||||||
|
{
|
||||||
|
// if(p)
|
||||||
|
let config = if path.ends_with(".yaml") {
|
||||||
|
load_yaml(Path::new(path))
|
||||||
|
} else {
|
||||||
|
load_json(Path::new(path))
|
||||||
|
};
|
||||||
|
if config.is_ok() {
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bail!("No config file found");
|
||||||
|
}
|
||||||
|
|
||||||
// #[derive(Debug)]
|
// #[derive(Debug)]
|
||||||
struct ActiveListener {
|
struct ActiveListener {
|
||||||
notify_shutdown: broadcast::Sender<()>,
|
notify_shutdown: broadcast::Sender<()>,
|
||||||
targets: Arc<RwLock<Vec<String>>>,
|
targets: Arc<RwLock<Vec<String>>>,
|
||||||
notify_targets: broadcast::Sender<()>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
@ -37,56 +84,18 @@ async fn main() -> Result<()> {
|
|||||||
})
|
})
|
||||||
// Add blanket level filter -
|
// Add blanket level filter -
|
||||||
.level(log::LevelFilter::Info)
|
.level(log::LevelFilter::Info)
|
||||||
.level_for("rustocat", log::LevelFilter::Debug)
|
.level_for("rustocat", log::LevelFilter::Trace)
|
||||||
.chain(std::io::stdout())
|
.chain(std::io::stdout())
|
||||||
.apply()?;
|
.apply()?;
|
||||||
|
|
||||||
let listeners: HashMap<String, ActiveListener> = HashMap::new();
|
let mut listeners: HashMap<String, ActiveListener> = HashMap::new();
|
||||||
let text_config_provider = config::FileConfigProvider::new();
|
let mut sighup_stream = signal(SignalKind::hangup())?;
|
||||||
|
|
||||||
let target_provider: Box<dyn config::ConfigProvider> = {
|
|
||||||
#[cfg(feature = "consul")]
|
|
||||||
{
|
|
||||||
let cfg = text_config_provider.load_config()?;
|
|
||||||
info!("Loaded yaml config");
|
|
||||||
if cfg.consul.is_some() && cfg.consul.unwrap() {
|
|
||||||
info!("Using consul config provider");
|
|
||||||
let consul_config_provider = consul::ConsulConfigProvider::new(Some(&cfg));
|
|
||||||
info!("Loaded consul config provider");
|
|
||||||
Box::new(consul_config_provider)
|
|
||||||
} else {
|
|
||||||
Box::new(text_config_provider)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(not(feature = "consul"))]
|
|
||||||
{
|
|
||||||
let cfg = Box::new(text_config_provider);
|
|
||||||
info!("Loaded yaml config");
|
|
||||||
cfg
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
match run_loop(target_provider, listeners).await {
|
|
||||||
Ok(_) => {}
|
|
||||||
Err(e) => {
|
|
||||||
error!("Error in run loop: {}", e);
|
|
||||||
info!("Exiting");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run_loop(
|
|
||||||
mut target_provider: Box<dyn ConfigProvider>,
|
|
||||||
mut listeners: HashMap<String, ActiveListener>,
|
|
||||||
) -> Result<()> {
|
|
||||||
loop {
|
loop {
|
||||||
let mappings = target_provider.get_targets().await?;
|
let config = load_config().expect("config not found");
|
||||||
let mut required_listeners: HashSet<String> = HashSet::new();
|
let mut required_listeners: HashSet<String> = HashSet::new();
|
||||||
|
|
||||||
for target in mappings {
|
for target in config.mappings {
|
||||||
let mut source_str = "".to_owned();
|
let mut source_str = "".to_owned();
|
||||||
if target.udp == None || target.udp == Some(false) {
|
if target.udp == None || target.udp == Some(false) {
|
||||||
source_str.push_str("udp:");
|
source_str.push_str("udp:");
|
||||||
@ -120,24 +129,19 @@ async fn run_loop(
|
|||||||
warn!("Found invalid targets! Adjusting!");
|
warn!("Found invalid targets! Adjusting!");
|
||||||
let mut w = listener.targets.write().await;
|
let mut w = listener.targets.write().await;
|
||||||
*w = target.targets;
|
*w = target.targets;
|
||||||
|
|
||||||
_ = listener.notify_targets.send(())?; //TODO: Maybe ignore this error
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let (notify_shutdown, _) = broadcast::channel(1);
|
let (notify_shutdown, _) = broadcast::channel(1);
|
||||||
let (notify_targets, _) = broadcast::channel(1);
|
|
||||||
|
|
||||||
let listener = ActiveListener {
|
let listener = ActiveListener {
|
||||||
notify_shutdown,
|
notify_shutdown: notify_shutdown,
|
||||||
targets: Arc::new(RwLock::new(target.targets)),
|
targets: Arc::new(RwLock::new(target.targets)),
|
||||||
notify_targets,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let l = listener::Listener {
|
let l = listener::Listener {
|
||||||
shutdown: shutdown::ShutdownReceiver::new(listener.notify_shutdown.subscribe()),
|
shutdown: shutdown::ShutdownReceiver::new(listener.notify_shutdown.subscribe()),
|
||||||
source: target.source.clone(),
|
source: target.source.clone(),
|
||||||
targets: listener.targets.clone(),
|
targets: listener.targets.clone(),
|
||||||
targets_changed: listener.notify_targets.subscribe(),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
@ -147,7 +151,7 @@ async fn run_loop(
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if let Err(err) = udp::start_udp_listener(l).await {
|
if let Err(err) = udp::start_udp_listener(l).await {
|
||||||
error!("udp listener error {}: {}", target.source, err);
|
error!("udp listener error: {}", err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -174,14 +178,12 @@ async fn run_loop(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Maybe Wait for the listener to shut down
|
|
||||||
|
|
||||||
debug!("Removing listener!");
|
debug!("Removing listener!");
|
||||||
listeners.remove(&del_key);
|
listeners.remove(&del_key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
target_provider.wait_for_change().await?;
|
sighup_stream.recv().await;
|
||||||
info!("Reloading config!");
|
info!("Recevied SIGHUP, reloading config!");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
34
src/tcp.rs
34
src/tcp.rs
@ -1,5 +1,5 @@
|
|||||||
use crate::listener::Listener;
|
use crate::listener::Listener;
|
||||||
use log::{error, info, trace, warn};
|
use log::{info, trace, warn};
|
||||||
use rand::seq::SliceRandom;
|
use rand::seq::SliceRandom;
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
@ -24,41 +24,11 @@ pub(crate) async fn start_tcp_listener(
|
|||||||
mut listener_config: Listener,
|
mut listener_config: Listener,
|
||||||
) -> Result<(), Box<dyn Error>> {
|
) -> Result<(), Box<dyn Error>> {
|
||||||
info!("start listening on {}", &listener_config.source);
|
info!("start listening on {}", &listener_config.source);
|
||||||
let listener = loop {
|
let listener = TcpListener::bind(&listener_config.source).await?;
|
||||||
match TcpListener::bind(&listener_config.source).await {
|
|
||||||
Ok(listener) => break listener,
|
|
||||||
Err(e) => match e.kind() {
|
|
||||||
std::io::ErrorKind::AddrInUse => {
|
|
||||||
warn!("Address in use: {}", &listener_config.source);
|
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
|
||||||
}
|
|
||||||
std::io::ErrorKind::AddrNotAvailable => {
|
|
||||||
warn!("Address not available: {}", &listener_config.source);
|
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
error!(
|
|
||||||
"Error binding to {}: {} ({})",
|
|
||||||
&listener_config.source,
|
|
||||||
e,
|
|
||||||
e.kind()
|
|
||||||
);
|
|
||||||
trace!("Error: {}", e);
|
|
||||||
return Err(Box::new(e));
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
info!("listening on {}", listener.local_addr()?);
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (next_socket, _) = tokio::select! {
|
let (next_socket, _) = tokio::select! {
|
||||||
res = listener.accept() => res?,
|
res = listener.accept() => res?,
|
||||||
_ = listener_config.targets_changed.recv() => {
|
|
||||||
info!("Targets changed!");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
_ = listener_config.shutdown.recv() => {
|
_ = listener_config.shutdown.recv() => {
|
||||||
info!("Exiting listener!");
|
info!("Exiting listener!");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
37
src/udp.rs
37
src/udp.rs
@ -2,7 +2,7 @@ use std::collections::HashMap;
|
|||||||
use std::sync::atomic::AtomicI32;
|
use std::sync::atomic::AtomicI32;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use log::{debug, error, info, trace, warn};
|
use log::{debug, error, info, trace};
|
||||||
use rand::seq::SliceRandom;
|
use rand::seq::SliceRandom;
|
||||||
|
|
||||||
use tokio::net::UdpSocket;
|
use tokio::net::UdpSocket;
|
||||||
@ -12,7 +12,7 @@ use crate::listener::Listener;
|
|||||||
use crate::shutdown::{Shutdown, ShutdownReceiver};
|
use crate::shutdown::{Shutdown, ShutdownReceiver};
|
||||||
use crate::Result;
|
use crate::Result;
|
||||||
|
|
||||||
const CONNECTION_TIMEOUT: i32 = 30;
|
const CONNECTION_TIMEOUT: i32 = 5;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct UDPMultiSender {
|
struct UDPMultiSender {
|
||||||
@ -38,27 +38,7 @@ impl UDPMultiSender {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn splitted_udp_socket(bind_addr: &str) -> Result<(UdpSocket, UDPMultiSender)> {
|
async fn splitted_udp_socket(bind_addr: &str) -> Result<(UdpSocket, UDPMultiSender)> {
|
||||||
let listener_std = loop {
|
let listener_std = std::net::UdpSocket::bind(bind_addr)?;
|
||||||
match std::net::UdpSocket::bind(bind_addr) {
|
|
||||||
Ok(listener) => break listener,
|
|
||||||
Err(e) => match e.kind() {
|
|
||||||
std::io::ErrorKind::AddrInUse => {
|
|
||||||
warn!("Address in use: {}", bind_addr);
|
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
|
||||||
}
|
|
||||||
std::io::ErrorKind::AddrNotAvailable => {
|
|
||||||
warn!("Address not available: {}", bind_addr);
|
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
error!("Error binding to {}: {} ({})", bind_addr, e, e.kind());
|
|
||||||
trace!("Error: {}", e);
|
|
||||||
return Err(Box::new(e));
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let responder_std = listener_std.try_clone()?;
|
let responder_std = listener_std.try_clone()?;
|
||||||
listener_std.set_nonblocking(true)?;
|
listener_std.set_nonblocking(true)?;
|
||||||
responder_std.set_nonblocking(true)?;
|
responder_std.set_nonblocking(true)?;
|
||||||
@ -200,15 +180,6 @@ pub(crate) async fn start_udp_listener(mut listener_config: Listener) -> Result<
|
|||||||
trace!("Waiting for packet or shutdown");
|
trace!("Waiting for packet or shutdown");
|
||||||
let (num_bytes, src_addr) = tokio::select! {
|
let (num_bytes, src_addr) = tokio::select! {
|
||||||
res = listener.recv_from(&mut buf) => res?,
|
res = listener.recv_from(&mut buf) => res?,
|
||||||
_ = listener_config.targets_changed.recv() => {
|
|
||||||
info!("Targets changed!");
|
|
||||||
info!("Closing all connections!");
|
|
||||||
for (_, handler) in connections.lock().await.iter_mut() {
|
|
||||||
handler.close().await;
|
|
||||||
}
|
|
||||||
info!("Closed all connections!");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
_ = listener_config.shutdown.recv() => {
|
_ = listener_config.shutdown.recv() => {
|
||||||
info!("Exiting listener!");
|
info!("Exiting listener!");
|
||||||
break;
|
break;
|
||||||
@ -249,7 +220,5 @@ pub(crate) async fn start_udp_listener(mut listener_config: Listener) -> Result<
|
|||||||
handler.close().await;
|
handler.close().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
println!("Listener closed.");
|
|
||||||
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user