Make it work in nomad
This commit is contained in:
@ -1,5 +1,6 @@
|
||||
use std::{fs::File, path::Path};
|
||||
|
||||
use log::info;
|
||||
use serde::Deserialize;
|
||||
use tokio::signal::unix::Signal;
|
||||
|
||||
@ -15,6 +16,8 @@ pub struct Target {
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct Config {
|
||||
pub consul: Option<bool>,
|
||||
pub consul_http_addr: Option<String>,
|
||||
pub consul_http_token: Option<String>,
|
||||
mappings: Vec<Target>,
|
||||
}
|
||||
|
||||
@ -76,11 +79,13 @@ impl FileConfigProvider {
|
||||
#[async_trait::async_trait]
|
||||
impl ConfigProvider for FileConfigProvider {
|
||||
async fn get_targets(&self) -> Result<Vec<Target>> {
|
||||
info!("Getting targets from file");
|
||||
let config = self.load_config()?;
|
||||
return Ok(config.mappings);
|
||||
}
|
||||
|
||||
async fn wait_for_change(&mut self) -> Result<()> {
|
||||
info!("Waiting for file config change (SIGHUP)");
|
||||
self.sighup_stream.recv().await;
|
||||
|
||||
return Ok(());
|
||||
|
@ -2,10 +2,14 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use log::debug;
|
||||
use log::info;
|
||||
use log::trace;
|
||||
use log::warn;
|
||||
use reqwest::header::HeaderMap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::config::ConfigProvider;
|
||||
use crate::config::Target;
|
||||
|
||||
@ -17,9 +21,13 @@ pub struct ConsulConfigProvider {
|
||||
}
|
||||
|
||||
impl ConsulConfigProvider {
|
||||
pub fn new() -> Self {
|
||||
pub fn new(config: Option<&Config>) -> Self {
|
||||
Self {
|
||||
consul_config: ConsulConfig::from_env(),
|
||||
consul_config: if let Some(config) = config {
|
||||
ConsulConfig::from_config_or_env(config)
|
||||
} else {
|
||||
ConsulConfig::from_env()
|
||||
},
|
||||
interval: tokio::time::interval(tokio::time::Duration::from_secs(10)),
|
||||
}
|
||||
}
|
||||
@ -28,16 +36,21 @@ impl ConsulConfigProvider {
|
||||
#[async_trait::async_trait]
|
||||
impl ConfigProvider for ConsulConfigProvider {
|
||||
async fn get_targets(&self) -> Result<Vec<Target>> {
|
||||
info!("Getting targets from consul");
|
||||
let mut targets: Vec<Target> = Vec::new();
|
||||
|
||||
debug!("Calling consul_get_services");
|
||||
let services = consul_get_services(&self.consul_config).await?;
|
||||
|
||||
// Find consul services and tags
|
||||
// Format of tags: rustocat:udp:port
|
||||
// rustocat:tcp:port
|
||||
|
||||
debug!("Iterating over services: {:?}", services);
|
||||
for (name, tags) in services {
|
||||
for tag in tags {
|
||||
if tag.starts_with("rustocat") {
|
||||
trace!("Found rustocat tag: {}", tag);
|
||||
let parts = tag.split(":").collect::<Vec<&str>>();
|
||||
if parts.len() != 3 {
|
||||
warn!("Invalid tag: {} on service {}", tag, name);
|
||||
@ -45,17 +58,19 @@ impl ConfigProvider for ConsulConfigProvider {
|
||||
}
|
||||
|
||||
let port = parts[2];
|
||||
trace!("Getting nodes for service: {}", name);
|
||||
let nodes = consul_get_service_nodes(&self.consul_config, &name).await?;
|
||||
|
||||
let mut t = vec![];
|
||||
for node in nodes {
|
||||
t.push(format!("{}:{}", node.Service.Address, node.Service.Port));
|
||||
t.push(format!("{}:{}", node.ServiceAddress, node.ServicePort));
|
||||
}
|
||||
let target = Target {
|
||||
udp: Some(parts[1] == "udp"),
|
||||
source: format!("0.0.0.0:{}", port),
|
||||
targets: t,
|
||||
};
|
||||
trace!("Adding target: {:?}", target);
|
||||
targets.push(target);
|
||||
}
|
||||
}
|
||||
@ -65,6 +80,7 @@ impl ConfigProvider for ConsulConfigProvider {
|
||||
}
|
||||
|
||||
async fn wait_for_change(&mut self) -> Result<()> {
|
||||
info!("Waiting for consul config change");
|
||||
self.interval.tick().await;
|
||||
|
||||
Ok(())
|
||||
@ -74,7 +90,7 @@ impl ConfigProvider for ConsulConfigProvider {
|
||||
async fn consul_get_services(config: &ConsulConfig) -> Result<HashMap<String, Vec<String>>> {
|
||||
let mut headers = HeaderMap::new();
|
||||
if let Some(token) = config.token.clone() {
|
||||
headers.insert("X-Consul-Token", token.parse().unwrap());
|
||||
headers.insert("X-Consul-Token", token.parse()?);
|
||||
}
|
||||
|
||||
return Ok(reqwest::Client::new()
|
||||
@ -86,21 +102,23 @@ async fn consul_get_services(config: &ConsulConfig) -> Result<HashMap<String, Ve
|
||||
.await?);
|
||||
}
|
||||
|
||||
async fn consul_get_service_nodes(
|
||||
config: &ConsulConfig,
|
||||
service: &str,
|
||||
) -> Result<Vec<ServiceEntry>> {
|
||||
async fn consul_get_service_nodes(config: &ConsulConfig, service: &str) -> Result<Vec<Node>> {
|
||||
let mut headers = HeaderMap::new();
|
||||
if let Some(token) = config.token.clone() {
|
||||
headers.insert("X-Consul-Token", token.parse().unwrap());
|
||||
headers.insert("X-Consul-Token", token.parse()?);
|
||||
}
|
||||
|
||||
trace!(
|
||||
"Calling consul_get_service_nodes: {}/v1/catalog/service/{service}",
|
||||
config.baseurl
|
||||
);
|
||||
|
||||
return Ok(reqwest::Client::new()
|
||||
.get(format!("{}/v1/catalog/services/{service}", config.baseurl))
|
||||
.get(format!("{}/v1/catalog/service/{service}", config.baseurl))
|
||||
.headers(headers)
|
||||
.send()
|
||||
.await?
|
||||
.json::<Vec<ServiceEntry>>()
|
||||
.json::<Vec<Node>>()
|
||||
.await?);
|
||||
}
|
||||
|
||||
@ -136,7 +154,8 @@ struct HealthCheck {
|
||||
struct Node {
|
||||
ID: String,
|
||||
Node: String,
|
||||
Address: String,
|
||||
ServiceAddress: String,
|
||||
ServicePort: u16,
|
||||
Datacenter: Option<String>,
|
||||
TaggedAddresses: Option<HashMap<String, String>>,
|
||||
Meta: Option<HashMap<String, String>>,
|
||||
@ -166,4 +185,20 @@ impl ConsulConfig {
|
||||
token: option_env!("CONSUL_HTTP_TOKEN").map(|s| s.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn from_config_or_env(config: &Config) -> Self {
|
||||
let baseurl = match config.consul_http_addr {
|
||||
Some(ref s) => s.clone(),
|
||||
None => option_env!("CONSUL_HTTP_ADDR")
|
||||
.expect("CONSUL_HTTP_ADDR not set")
|
||||
.to_string(),
|
||||
};
|
||||
|
||||
let token = match config.consul_http_token {
|
||||
Some(ref s) => Some(s.clone()),
|
||||
None => option_env!("CONSUL_HTTP_TOKEN").map(|s| s.to_string()),
|
||||
};
|
||||
|
||||
Self { baseurl, token }
|
||||
}
|
||||
}
|
||||
|
33
src/main.rs
33
src/main.rs
@ -7,6 +7,7 @@ mod udp;
|
||||
#[cfg(feature = "consul")]
|
||||
mod consul;
|
||||
|
||||
use config::ConfigProvider;
|
||||
use log::{debug, error, info, warn};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::error::Error;
|
||||
@ -39,15 +40,18 @@ async fn main() -> Result<()> {
|
||||
.chain(std::io::stdout())
|
||||
.apply()?;
|
||||
|
||||
let mut listeners: HashMap<String, ActiveListener> = HashMap::new();
|
||||
let listeners: HashMap<String, ActiveListener> = HashMap::new();
|
||||
let text_config_provider = config::FileConfigProvider::new();
|
||||
|
||||
let mut target_provider: Box<dyn config::ConfigProvider> = {
|
||||
let target_provider: Box<dyn config::ConfigProvider> = {
|
||||
#[cfg(feature = "consul")]
|
||||
{
|
||||
let cfg = text_config_provider.load_config()?;
|
||||
info!("Loaded yaml config");
|
||||
if cfg.consul.is_some() && cfg.consul.unwrap() {
|
||||
let consul_config_provider = consul::ConsulConfigProvider::new();
|
||||
info!("Using consul config provider");
|
||||
let consul_config_provider = consul::ConsulConfigProvider::new(Some(&cfg));
|
||||
info!("Loaded consul config provider");
|
||||
Box::new(consul_config_provider)
|
||||
} else {
|
||||
Box::new(text_config_provider)
|
||||
@ -55,9 +59,28 @@ async fn main() -> Result<()> {
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "consul"))]
|
||||
Box::new(text_config_provider)
|
||||
{
|
||||
let cfg = Box::new(text_config_provider);
|
||||
info!("Loaded yaml config");
|
||||
cfg
|
||||
}
|
||||
};
|
||||
|
||||
match run_loop(target_provider, listeners).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("Error in run loop: {}", e);
|
||||
info!("Exiting");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_loop(
|
||||
mut target_provider: Box<dyn ConfigProvider>,
|
||||
mut listeners: HashMap<String, ActiveListener>,
|
||||
) -> Result<()> {
|
||||
loop {
|
||||
let mappings = target_provider.get_targets().await?;
|
||||
let mut required_listeners: HashSet<String> = HashSet::new();
|
||||
@ -151,6 +174,6 @@ async fn main() -> Result<()> {
|
||||
}
|
||||
|
||||
target_provider.wait_for_change().await?;
|
||||
info!("Recevied SIGHUP, reloading config!");
|
||||
info!("Reloading config!");
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user