Integrating consul catalog for autoconfig of listeners

This commit is contained in:
2023-02-06 14:21:05 +01:00
parent 05da7a97e0
commit db06b7c7d0
6 changed files with 880 additions and 69 deletions

89
src/config.rs Normal file
View File

@ -0,0 +1,89 @@
use std::{fs::File, path::Path};
use serde::Deserialize;
use simple_error::bail;
use tokio::signal::unix::Signal;
use crate::Result;
#[derive(Debug, Deserialize)]
pub struct Target {
pub udp: Option<bool>,
pub source: String,
pub targets: Vec<String>,
}
#[derive(Debug, Deserialize)]
pub struct Config {
pub consul: Option<bool>,
mappings: Vec<Target>,
}
#[async_trait::async_trait]
pub trait ConfigProvider {
async fn get_targets(&self) -> Result<Vec<Target>>;
async fn wait_for_change(&mut self) -> Result<()>;
}
pub struct FileConfigProvider {
sighup_stream: Signal,
}
impl FileConfigProvider {
pub fn new() -> Self {
Self {
sighup_stream: tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
.expect("Failed to create sighup stream"),
}
}
fn load_yaml(&self, path: &Path) -> Result<Config> {
let file = File::open(path)?;
let config: Config = serde_yaml::from_reader(file).expect("Failed to parse!"); //TODO: Print path
return Ok(config);
}
fn load_json(&self, path: &Path) -> Result<Config> {
let file = File::open(path)?;
let config: Config = serde_json::from_reader(file).expect("Failed to parse!"); //TODO: Print path
return Ok(config);
}
pub fn load_config(&self) -> Result<Config> {
for path in [
"config.yaml",
"config.json",
"/etc/rustocat.yaml",
"/etc/rustocat.json",
]
.iter()
{
// if(p)
let config = if path.ends_with(".yaml") {
self.load_yaml(Path::new(path))
} else {
self.load_json(Path::new(path))
};
if config.is_ok() {
return config;
}
}
bail!("No config file found");
}
}
#[async_trait::async_trait]
impl ConfigProvider for FileConfigProvider {
async fn get_targets(&self) -> Result<Vec<Target>> {
let config = self.load_config()?;
return Ok(config.mappings);
}
async fn wait_for_change(&mut self) -> Result<()> {
self.sighup_stream.recv().await;
return Ok(());
}
}

169
src/consul.rs Normal file
View File

@ -0,0 +1,169 @@
#![allow(non_snake_case)]
use std::collections::HashMap;
use log::warn;
use reqwest::header::HeaderMap;
use serde::{Deserialize, Serialize};
use crate::config::ConfigProvider;
use crate::config::Target;
use crate::Result;
pub struct ConsulConfigProvider {
consul_config: ConsulConfig,
interval: tokio::time::Interval,
}
impl ConsulConfigProvider {
pub fn new() -> Self {
Self {
consul_config: ConsulConfig::from_env(),
interval: tokio::time::interval(tokio::time::Duration::from_secs(10)),
}
}
}
#[async_trait::async_trait]
impl ConfigProvider for ConsulConfigProvider {
async fn get_targets(&self) -> Result<Vec<Target>> {
let mut targets: Vec<Target> = Vec::new();
let services = consul_get_services(&self.consul_config).await?;
// Find consul services and tags
// Format of tags: rustocat:udp:port
// rustocat:tcp:port
for (name, tags) in services {
for tag in tags {
if tag.starts_with("rustocat") {
let parts = tag.split(":").collect::<Vec<&str>>();
if parts.len() != 3 {
warn!("Invalid tag: {} on service {}", tag, name);
continue;
}
let port = parts[2];
let nodes = consul_get_service_nodes(&self.consul_config, &name).await?;
let mut t = vec![];
for node in nodes {
t.push(format!("{}:{}", node.Service.Address, node.Service.Port));
}
let target = Target {
udp: Some(parts[1] == "udp"),
source: format!("0.0.0.0:{}", port),
targets: t,
};
targets.push(target);
}
}
}
Ok(targets)
}
async fn wait_for_change(&mut self) -> Result<()> {
self.interval.tick().await;
Ok(())
}
}
async fn consul_get_services(config: &ConsulConfig) -> Result<HashMap<String, Vec<String>>> {
let mut headers = HeaderMap::new();
if let Some(token) = config.token.clone() {
headers.insert("X-Consul-Token", token.parse().unwrap());
}
return Ok(reqwest::Client::new()
.get(format!("{}/v1/catalog/services", config.baseurl))
.headers(headers)
.send()
.await?
.json::<HashMap<String, Vec<String>>>()
.await?);
}
async fn consul_get_service_nodes(
config: &ConsulConfig,
service: &str,
) -> Result<Vec<ServiceEntry>> {
let mut headers = HeaderMap::new();
if let Some(token) = config.token.clone() {
headers.insert("X-Consul-Token", token.parse().unwrap());
}
return Ok(reqwest::Client::new()
.get(format!("{}/v1/catalog/services/{service}", config.baseurl))
.headers(headers)
.send()
.await?
.json::<Vec<ServiceEntry>>()
.await?);
}
#[derive(Eq, Default, PartialEq, Serialize, Deserialize, Debug)]
#[serde(default)]
struct AgentService {
ID: String,
Service: String,
Tags: Option<Vec<String>>,
Port: u16,
Address: String,
EnableTagOverride: bool,
CreateIndex: u64,
ModifyIndex: u64,
}
#[derive(Eq, Default, PartialEq, Serialize, Deserialize, Debug)]
#[serde(default)]
struct HealthCheck {
Node: String,
CheckID: String,
Name: String,
Status: String,
Notes: String,
Output: String,
ServiceID: String,
ServiceName: String,
ServiceTags: Option<Vec<String>>,
}
#[derive(Eq, Default, PartialEq, Serialize, Deserialize, Debug)]
#[serde(default)]
struct Node {
ID: String,
Node: String,
Address: String,
Datacenter: Option<String>,
TaggedAddresses: Option<HashMap<String, String>>,
Meta: Option<HashMap<String, String>>,
CreateIndex: u64,
ModifyIndex: u64,
}
#[derive(Eq, Default, PartialEq, Serialize, Deserialize, Debug)]
#[serde(default)]
struct ServiceEntry {
Node: Node,
Service: AgentService,
Checks: Vec<HealthCheck>,
}
struct ConsulConfig {
baseurl: String,
token: Option<String>,
}
impl ConsulConfig {
fn from_env() -> Self {
Self {
baseurl: option_env!("CONSUL_HTTP_ADDR")
.expect("CONSUL_HTTP_ADDR not set")
.to_string(),
token: option_env!("CONSUL_HTTP_TOKEN").map(|s| s.to_string()),
}
}
}

View File

@ -1,69 +1,20 @@
mod config;
mod listener;
mod shutdown;
mod tcp;
mod udp;
#[cfg(feature = "consul")]
mod consul;
use log::{debug, error, info, warn};
use serde::Deserialize;
use simple_error::bail;
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::fs::File;
use std::path::Path;
use std::sync::Arc;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{broadcast, RwLock};
pub type Result<T> = std::result::Result<T, Box<dyn Error>>;
#[derive(Debug, Deserialize)]
struct Target {
udp: Option<bool>,
source: String,
targets: Vec<String>,
}
#[derive(Debug, Deserialize)]
struct Config {
mappings: Vec<Target>,
}
fn load_yaml(path: &Path) -> Result<Config> {
let file = File::open(path)?;
let config: Config = serde_yaml::from_reader(file).expect("Failed to parse!"); //TODO: Print path
return Ok(config);
}
fn load_json(path: &Path) -> Result<Config> {
let file = File::open(path)?;
let config: Config = serde_json::from_reader(file).expect("Failed to parse!"); //TODO: Print path
return Ok(config);
}
fn load_config() -> Result<Config> {
for path in [
"config.yaml",
"config.json",
"/etc/rustocat.yaml",
"/etc/rustocat.json",
]
.iter()
{
// if(p)
let config = if path.ends_with(".yaml") {
load_yaml(Path::new(path))
} else {
load_json(Path::new(path))
};
if config.is_ok() {
return config;
}
}
bail!("No config file found");
}
// #[derive(Debug)]
struct ActiveListener {
notify_shutdown: broadcast::Sender<()>,
@ -84,18 +35,34 @@ async fn main() -> Result<()> {
})
// Add blanket level filter -
.level(log::LevelFilter::Info)
.level_for("rustocat", log::LevelFilter::Trace)
.level_for("rustocat", log::LevelFilter::Debug)
.chain(std::io::stdout())
.apply()?;
let mut listeners: HashMap<String, ActiveListener> = HashMap::new();
let mut sighup_stream = signal(SignalKind::hangup())?;
let text_config_provider = config::FileConfigProvider::new();
let mut target_provider: Box<dyn config::ConfigProvider> = {
#[cfg(feature = "consul")]
{
let cfg = text_config_provider.load_config()?;
if cfg.consul.is_some() && cfg.consul.unwrap() {
let consul_config_provider = consul::ConsulConfigProvider::new();
Box::new(consul_config_provider)
} else {
Box::new(text_config_provider)
}
}
#[cfg(not(feature = "consul"))]
Box::new(text_config_provider)
};
loop {
let config = load_config().expect("config not found");
let mappings = target_provider.get_targets().await?;
let mut required_listeners: HashSet<String> = HashSet::new();
for target in config.mappings {
for target in mappings {
let mut source_str = "".to_owned();
if target.udp == None || target.udp == Some(false) {
source_str.push_str("udp:");
@ -151,7 +118,7 @@ async fn main() -> Result<()> {
}
} else {
if let Err(err) = udp::start_udp_listener(l).await {
error!("udp listener error: {}", err);
error!("udp listener error {}: {}", target.source, err);
}
}
});
@ -183,7 +150,7 @@ async fn main() -> Result<()> {
}
}
sighup_stream.recv().await;
target_provider.wait_for_change().await?;
info!("Recevied SIGHUP, reloading config!");
}
}

View File

@ -12,7 +12,7 @@ use crate::listener::Listener;
use crate::shutdown::{Shutdown, ShutdownReceiver};
use crate::Result;
const CONNECTION_TIMEOUT: i32 = 5;
const CONNECTION_TIMEOUT: i32 = 30;
#[derive(Clone)]
struct UDPMultiSender {