205 lines
5.6 KiB
Rust
205 lines
5.6 KiB
Rust
#![allow(non_snake_case)]
|
|
|
|
use std::collections::HashMap;
|
|
|
|
use log::debug;
|
|
use log::info;
|
|
use log::trace;
|
|
use log::warn;
|
|
use reqwest::header::HeaderMap;
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
use crate::config::Config;
|
|
use crate::config::ConfigProvider;
|
|
use crate::config::Target;
|
|
|
|
use crate::Result;
|
|
|
|
pub struct ConsulConfigProvider {
|
|
consul_config: ConsulConfig,
|
|
interval: tokio::time::Interval,
|
|
}
|
|
|
|
impl ConsulConfigProvider {
|
|
pub fn new(config: Option<&Config>) -> Self {
|
|
Self {
|
|
consul_config: if let Some(config) = config {
|
|
ConsulConfig::from_config_or_env(config)
|
|
} else {
|
|
ConsulConfig::from_env()
|
|
},
|
|
interval: tokio::time::interval(tokio::time::Duration::from_secs(10)),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait::async_trait]
|
|
impl ConfigProvider for ConsulConfigProvider {
|
|
async fn get_targets(&self) -> Result<Vec<Target>> {
|
|
info!("Getting targets from consul");
|
|
let mut targets: Vec<Target> = Vec::new();
|
|
|
|
debug!("Calling consul_get_services");
|
|
let services = consul_get_services(&self.consul_config).await?;
|
|
|
|
// Find consul services and tags
|
|
// Format of tags: rustocat:udp:port
|
|
// rustocat:tcp:port
|
|
|
|
debug!("Iterating over services: {:?}", services);
|
|
for (name, tags) in services {
|
|
for tag in tags {
|
|
if tag.starts_with("rustocat") {
|
|
trace!("Found rustocat tag: {}", tag);
|
|
let parts = tag.split(":").collect::<Vec<&str>>();
|
|
if parts.len() != 3 {
|
|
warn!("Invalid tag: {} on service {}", tag, name);
|
|
continue;
|
|
}
|
|
|
|
let port = parts[2];
|
|
trace!("Getting nodes for service: {}", name);
|
|
let nodes = consul_get_service_nodes(&self.consul_config, &name).await?;
|
|
|
|
let mut t = vec![];
|
|
for node in nodes {
|
|
t.push(format!("{}:{}", node.ServiceAddress, node.ServicePort));
|
|
}
|
|
let target = Target {
|
|
udp: Some(parts[1] == "udp"),
|
|
source: format!("0.0.0.0:{}", port),
|
|
targets: t,
|
|
};
|
|
trace!("Adding target: {:?}", target);
|
|
targets.push(target);
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(targets)
|
|
}
|
|
|
|
async fn wait_for_change(&mut self) -> Result<()> {
|
|
info!("Waiting for consul config change");
|
|
self.interval.tick().await;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
async fn consul_get_services(config: &ConsulConfig) -> Result<HashMap<String, Vec<String>>> {
|
|
let mut headers = HeaderMap::new();
|
|
if let Some(token) = config.token.clone() {
|
|
headers.insert("X-Consul-Token", token.parse()?);
|
|
}
|
|
|
|
return Ok(reqwest::Client::new()
|
|
.get(format!("{}/v1/catalog/services", config.baseurl))
|
|
.headers(headers)
|
|
.send()
|
|
.await?
|
|
.json::<HashMap<String, Vec<String>>>()
|
|
.await?);
|
|
}
|
|
|
|
async fn consul_get_service_nodes(config: &ConsulConfig, service: &str) -> Result<Vec<Node>> {
|
|
let mut headers = HeaderMap::new();
|
|
if let Some(token) = config.token.clone() {
|
|
headers.insert("X-Consul-Token", token.parse()?);
|
|
}
|
|
|
|
trace!(
|
|
"Calling consul_get_service_nodes: {}/v1/catalog/service/{service}",
|
|
config.baseurl
|
|
);
|
|
|
|
return Ok(reqwest::Client::new()
|
|
.get(format!("{}/v1/catalog/service/{service}", config.baseurl))
|
|
.headers(headers)
|
|
.send()
|
|
.await?
|
|
.json::<Vec<Node>>()
|
|
.await?);
|
|
}
|
|
|
|
#[derive(Eq, Default, PartialEq, Serialize, Deserialize, Debug)]
|
|
#[serde(default)]
|
|
struct AgentService {
|
|
ID: String,
|
|
Service: String,
|
|
Tags: Option<Vec<String>>,
|
|
Port: u16,
|
|
Address: String,
|
|
EnableTagOverride: bool,
|
|
CreateIndex: u64,
|
|
ModifyIndex: u64,
|
|
}
|
|
|
|
#[derive(Eq, Default, PartialEq, Serialize, Deserialize, Debug)]
|
|
#[serde(default)]
|
|
struct HealthCheck {
|
|
Node: String,
|
|
CheckID: String,
|
|
Name: String,
|
|
Status: String,
|
|
Notes: String,
|
|
Output: String,
|
|
ServiceID: String,
|
|
ServiceName: String,
|
|
ServiceTags: Option<Vec<String>>,
|
|
}
|
|
|
|
#[derive(Eq, Default, PartialEq, Serialize, Deserialize, Debug)]
|
|
#[serde(default)]
|
|
struct Node {
|
|
ID: String,
|
|
Node: String,
|
|
ServiceAddress: String,
|
|
ServicePort: u16,
|
|
Datacenter: Option<String>,
|
|
TaggedAddresses: Option<HashMap<String, String>>,
|
|
Meta: Option<HashMap<String, String>>,
|
|
CreateIndex: u64,
|
|
ModifyIndex: u64,
|
|
}
|
|
|
|
#[derive(Eq, Default, PartialEq, Serialize, Deserialize, Debug)]
|
|
#[serde(default)]
|
|
struct ServiceEntry {
|
|
Node: Node,
|
|
Service: AgentService,
|
|
Checks: Vec<HealthCheck>,
|
|
}
|
|
|
|
struct ConsulConfig {
|
|
baseurl: String,
|
|
token: Option<String>,
|
|
}
|
|
|
|
impl ConsulConfig {
|
|
fn from_env() -> Self {
|
|
Self {
|
|
baseurl: option_env!("CONSUL_HTTP_ADDR")
|
|
.expect("CONSUL_HTTP_ADDR not set")
|
|
.to_string(),
|
|
token: option_env!("CONSUL_HTTP_TOKEN").map(|s| s.to_string()),
|
|
}
|
|
}
|
|
|
|
fn from_config_or_env(config: &Config) -> Self {
|
|
let baseurl = match config.consul_http_addr {
|
|
Some(ref s) => s.clone(),
|
|
None => option_env!("CONSUL_HTTP_ADDR")
|
|
.expect("CONSUL_HTTP_ADDR not set")
|
|
.to_string(),
|
|
};
|
|
|
|
let token = match config.consul_http_token {
|
|
Some(ref s) => Some(s.clone()),
|
|
None => option_env!("CONSUL_HTTP_TOKEN").map(|s| s.to_string()),
|
|
};
|
|
|
|
Self { baseurl, token }
|
|
}
|
|
}
|