Merge Consul & Kubernetes discovery

This commit is contained in:
Ara Sadoyan
2025-10-26 15:26:09 +01:00
parent 012505b77e
commit 78c83b802f
5 changed files with 136 additions and 213 deletions

View File

@@ -1,98 +0,0 @@
use crate::utils::kuberconsul::*;
use crate::utils::parceyaml::build_headers;
use crate::utils::structs::{Configuration, InnerMap, ServiceMapping, UpstreamsDashMap};
use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams};
use dashmap::DashMap;
use futures::channel::mpsc::Sender;
use futures::SinkExt;
use pingora::prelude::sleep;
use rand::Rng;
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::Client;
use std::env;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
pub async fn start(mut toreturn: Sender<Configuration>, config: Arc<Configuration>) {
let prev_upstreams = UpstreamsDashMap::new();
loop {
let upstreams = UpstreamsDashMap::new();
if let Some(consul) = config.consul.clone() {
let servers = consul.servers.unwrap_or(vec![format!(
"{}:{}",
env::var("CONSUL_SERVICE_HOST").unwrap_or("0.0.0.0".to_string()),
env::var("CONSUL_SERVICE_PORT").unwrap_or("0".to_string())
)]);
let end = servers.len() - 1;
let mut num = 0;
if end > 0 {
num = rand::rng().random_range(0..end);
}
let consul_data = servers.get(num).unwrap().to_string();
let ss = consul_data + "/v1/catalog/service/";
if let Some(svc) = consul.services {
for i in svc {
let header_list = DashMap::new();
let mut hl = Vec::new();
build_headers(&i.headers, config.as_ref(), &mut hl);
if hl.len() > 0 {
header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl);
config.headers.insert(i.hostname.clone(), header_list);
}
let pref: String = ss.clone() + &i.upstream;
let list = get_by_http(pref, consul.token.clone(), &i).await;
list_to_upstreams(list, &upstreams, &i);
}
}
if !compare_dashmaps(&upstreams, &prev_upstreams) {
let tosend: Configuration = Configuration {
upstreams: Default::default(),
headers: config.headers.clone(),
consul: config.consul.clone(),
kubernetes: config.kubernetes.clone(),
typecfg: config.typecfg.clone(),
extraparams: config.extraparams.clone(),
};
clone_dashmap_into(&upstreams, &prev_upstreams);
clone_dashmap_into(&upstreams, &tosend.upstreams);
print_upstreams(&tosend.upstreams);
toreturn.send(tosend).await.unwrap();
}
}
sleep(Duration::from_secs(5)).await;
}
}
async fn get_by_http(url: String, token: Option<String>, conf: &ServiceMapping) -> Option<DashMap<String, (Vec<InnerMap>, AtomicUsize)>> {
let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().ok()?;
let mut headers = HeaderMap::new();
if let Some(token) = token {
headers.insert("X-Consul-Token", HeaderValue::from_str(&token).unwrap());
}
let to = Duration::from_secs(1);
let resp = client.get(url).timeout(to).send().await.ok()?;
if !resp.status().is_success() {
eprintln!("Consul API returned status: {}", resp.status());
return None;
}
let mut inner_vec = Vec::new();
let upstreams: DashMap<String, (Vec<InnerMap>, AtomicUsize)> = DashMap::new();
let endpoints: Vec<ConsulService> = resp.json().await.ok()?;
for subsets in endpoints {
let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone();
let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone();
let to_add = InnerMap {
address: addr,
port: prt,
is_ssl: false,
is_http2: false,
to_https: conf.to_https.unwrap_or(false),
rate_limit: conf.rate_limit,
healthcheck: None,
};
inner_vec.push(to_add);
}
match_path(&conf, &upstreams, inner_vec.clone());
Some(upstreams)
}

View File

@@ -1,6 +1,6 @@
use crate::utils::filewatch;
use crate::utils::kuberconsul::{ConsulDiscovery, KubernetesDiscovery, ServiceDiscovery};
use crate::utils::structs::Configuration;
use crate::utils::{consul, kuber};
use crate::web::webserver;
use async_trait::async_trait;
use futures::channel::mpsc::Sender;
@@ -51,13 +51,13 @@ impl Discovery for FromFileProvider {
#[async_trait]
impl Discovery for ConsulProvider {
async fn start(&self, tx: Sender<Configuration>) {
tokio::spawn(consul::start(tx.clone(), self.config.clone()));
tokio::spawn(ConsulDiscovery.fetch_upstreams(self.config.clone(), tx));
}
}
#[async_trait]
impl Discovery for KubernetesProvider {
async fn start(&self, tx: Sender<Configuration>) {
tokio::spawn(kuber::start(tx.clone(), self.config.clone()));
tokio::spawn(KubernetesDiscovery.fetch_upstreams(self.config.clone(), tx));
}
}

View File

@@ -1,109 +0,0 @@
use crate::utils::kuberconsul::*;
use crate::utils::parceyaml::build_headers;
use crate::utils::structs::{Configuration, InnerMap, ServiceMapping, UpstreamsDashMap};
use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams};
use dashmap::DashMap;
use futures::channel::mpsc::Sender;
use futures::SinkExt;
use pingora::prelude::sleep;
use rand::Rng;
use reqwest::Client;
use std::env;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
async fn read_token(path: &str) -> String {
let mut file = File::open(path).await.unwrap();
let mut contents = String::new();
file.read_to_string(&mut contents).await.unwrap();
contents.trim().to_string()
}
pub async fn start(mut toreturn: Sender<Configuration>, config: Arc<Configuration>) {
let prev_upstreams = UpstreamsDashMap::new();
loop {
let upstreams = UpstreamsDashMap::new();
if let Some(kuber) = config.kubernetes.clone() {
let path = kuber.tokenpath.unwrap_or("/var/run/secrets/kubernetes.io/serviceaccount/token".to_string());
let token = read_token(path.as_str()).await;
let servers = kuber.servers.unwrap_or(vec![format!(
"{}:{}",
env::var("KUBERNETES_SERVICE_HOST").unwrap_or("0.0.0.0".to_string()),
env::var("KUBERNETES_SERVICE_PORT_HTTPS").unwrap_or("0".to_string())
)]);
let end = servers.len() - 1;
let mut num = 0;
if end > 0 {
num = rand::rng().random_range(0..end);
}
let server = servers.get(num).unwrap().to_string();
if let Some(svc) = kuber.services {
for i in svc {
let header_list = DashMap::new();
let mut hl = Vec::new();
build_headers(&i.headers, config.as_ref(), &mut hl);
if hl.len() > 0 {
header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl);
config.headers.insert(i.hostname.clone(), header_list);
}
let url = format!("https://{}/api/v1/namespaces/staging/endpoints/{}", server, i.hostname);
let list = get_by_http(&*url, &*token, &i).await;
list_to_upstreams(list, &upstreams, &i);
}
}
if !compare_dashmaps(&upstreams, &prev_upstreams) {
let tosend: Configuration = Configuration {
upstreams: Default::default(),
headers: config.headers.clone(),
consul: config.consul.clone(),
kubernetes: config.kubernetes.clone(),
typecfg: config.typecfg.clone(),
extraparams: config.extraparams.clone(),
};
clone_dashmap_into(&upstreams, &prev_upstreams);
clone_dashmap_into(&upstreams, &tosend.upstreams);
print_upstreams(&tosend.upstreams);
toreturn.send(tosend).await.unwrap();
}
}
sleep(Duration::from_secs(5)).await;
}
}
pub async fn get_by_http(url: &str, token: &str, conf: &ServiceMapping) -> Option<DashMap<String, (Vec<InnerMap>, AtomicUsize)>> {
let to = Duration::from_secs(10);
let client = Client::builder().timeout(Duration::from_secs(10)).danger_accept_invalid_certs(true).build().ok()?;
let resp = client.get(url).timeout(to).bearer_auth(token).send().await.ok()?;
if !resp.status().is_success() {
eprintln!("Kubernetes API returned status: {}", resp.status());
return None;
}
let endpoints: KubeEndpoints = resp.json().await.ok()?;
let upstreams: DashMap<String, (Vec<InnerMap>, AtomicUsize)> = DashMap::new();
if let Some(subsets) = endpoints.subsets {
for subset in subsets {
if let (Some(addresses), Some(ports)) = (subset.addresses, subset.ports) {
let mut inner_vec = Vec::new();
for addr in addresses {
for port in &ports {
let to_add = InnerMap {
address: addr.ip.clone(),
port: port.port.clone(),
is_ssl: false,
is_http2: false,
to_https: conf.to_https.unwrap_or(false),
rate_limit: conf.rate_limit,
healthcheck: None,
};
inner_vec.push(to_add);
}
}
match_path(&conf, &upstreams, inner_vec.clone());
}
}
}
Some(upstreams)
}

View File

@@ -1,8 +1,21 @@
use crate::utils::structs::{InnerMap, ServiceMapping, UpstreamsDashMap};
use crate::utils::httpclient;
use crate::utils::parceyaml::build_headers;
use crate::utils::structs::{Configuration, InnerMap, ServiceMapping, UpstreamsDashMap};
use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams};
use async_trait::async_trait;
use dashmap::DashMap;
use futures::channel::mpsc::Sender;
use futures::SinkExt;
use pingora::prelude::sleep;
use rand::Rng;
use serde::Deserialize;
use std::collections::HashMap;
use std::env;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
#[derive(Debug, serde::Deserialize)]
pub struct KubeEndpoints {
@@ -62,3 +75,121 @@ pub fn match_path(conf: &ServiceMapping, upstreams: &DashMap<String, (Vec<InnerM
}
}
}
async fn read_token(path: &str) -> String {
let mut file = File::open(path).await.unwrap();
let mut contents = String::new();
file.read_to_string(&mut contents).await.unwrap();
contents.trim().to_string()
}
#[async_trait]
pub trait ServiceDiscovery {
async fn fetch_upstreams(&self, config: Arc<Configuration>, toreturn: Sender<Configuration>);
}
pub struct KubernetesDiscovery;
pub struct ConsulDiscovery;
#[async_trait]
impl ServiceDiscovery for KubernetesDiscovery {
async fn fetch_upstreams(&self, config: Arc<Configuration>, mut toreturn: Sender<Configuration>) {
let prev_upstreams = UpstreamsDashMap::new();
loop {
let upstreams = UpstreamsDashMap::new();
if let Some(kuber) = config.kubernetes.clone() {
let path = kuber.tokenpath.unwrap_or("/var/run/secrets/kubernetes.io/serviceaccount/token".to_string());
let token = read_token(path.as_str()).await;
let servers = kuber.servers.unwrap_or(vec![format!(
"{}:{}",
env::var("KUBERNETES_SERVICE_HOST").unwrap_or("0.0.0.0".to_string()),
env::var("KUBERNETES_SERVICE_PORT_HTTPS").unwrap_or("0".to_string())
)]);
let end = servers.len().saturating_sub(1);
let num = if end > 0 { rand::rng().random_range(0..end) } else { 0 };
let server = servers.get(num).unwrap().to_string();
if let Some(svc) = kuber.services {
for i in svc {
let header_list = DashMap::new();
let mut hl = Vec::new();
build_headers(&i.headers, config.as_ref(), &mut hl);
if !hl.is_empty() {
header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl);
config.headers.insert(i.hostname.clone(), header_list);
}
let url = format!("https://{}/api/v1/namespaces/staging/endpoints/{}", server, i.hostname);
let list = httpclient::for_kuber(&*url, &*token, &i).await;
list_to_upstreams(list, &upstreams, &i);
}
}
if let Some(lt) = clone_compare(&upstreams, &prev_upstreams, &config).await {
toreturn.send(lt).await.unwrap();
}
}
sleep(Duration::from_secs(5)).await;
}
}
}
#[async_trait]
impl ServiceDiscovery for ConsulDiscovery {
async fn fetch_upstreams(&self, config: Arc<Configuration>, mut toreturn: Sender<Configuration>) {
let prev_upstreams = UpstreamsDashMap::new();
loop {
let upstreams = UpstreamsDashMap::new();
if let Some(consul) = config.consul.clone() {
let servers = consul.servers.unwrap_or(vec![format!(
"{}:{}",
env::var("CONSUL_SERVICE_HOST").unwrap_or("0.0.0.0".to_string()),
env::var("CONSUL_SERVICE_PORT").unwrap_or("0".to_string())
)]);
let end = servers.len().saturating_sub(1);
let num = if end > 0 { rand::rng().random_range(0..end) } else { 0 };
let consul_data = servers.get(num).unwrap().to_string();
let ss = consul_data + "/v1/catalog/service/";
if let Some(svc) = consul.services {
for i in svc {
let header_list = DashMap::new();
let mut hl = Vec::new();
build_headers(&i.headers, config.as_ref(), &mut hl);
if !hl.is_empty() {
header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl);
config.headers.insert(i.hostname.clone(), header_list);
}
let pref = ss.clone() + &i.upstream;
let list = httpclient::for_consul(pref, consul.token.clone(), &i).await;
list_to_upstreams(list, &upstreams, &i);
}
}
}
if let Some(lt) = clone_compare(&upstreams, &prev_upstreams, &config).await {
toreturn.send(lt).await.unwrap();
}
sleep(Duration::from_secs(5)).await;
}
}
}
async fn clone_compare(upstreams: &UpstreamsDashMap, prev_upstreams: &UpstreamsDashMap, config: &Arc<Configuration>) -> Option<Configuration> {
if !compare_dashmaps(&upstreams, &prev_upstreams) {
let tosend: Configuration = Configuration {
upstreams: Default::default(),
headers: config.headers.clone(),
consul: config.consul.clone(),
kubernetes: config.kubernetes.clone(),
typecfg: config.typecfg.clone(),
extraparams: config.extraparams.clone(),
};
clone_dashmap_into(&upstreams, &prev_upstreams);
clone_dashmap_into(&upstreams, &tosend.upstreams);
print_upstreams(&tosend.upstreams);
return Some(tosend);
};
None
}