From 78c83b802ffa1acddcd4974d77066da33ea1cad6 Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Sun, 26 Oct 2025 15:26:09 +0100 Subject: [PATCH] Merge Consul & Kubernetes discovery --- src/utils.rs | 3 +- src/utils/consul.rs | 98 ----------------------------- src/utils/discovery.rs | 6 +- src/utils/kuber.rs | 109 -------------------------------- src/utils/kuberconsul.rs | 133 ++++++++++++++++++++++++++++++++++++++- 5 files changed, 136 insertions(+), 213 deletions(-) delete mode 100644 src/utils/consul.rs delete mode 100644 src/utils/kuber.rs diff --git a/src/utils.rs b/src/utils.rs index 61fd5b7..3a2ee68 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,11 +1,10 @@ pub mod auth; -pub mod consul; pub mod discovery; pub mod dnsclient; mod filewatch; pub mod healthcheck; +pub mod httpclient; pub mod jwt; -pub mod kuber; pub mod kuberconsul; pub mod metrics; pub mod parceyaml; diff --git a/src/utils/consul.rs b/src/utils/consul.rs deleted file mode 100644 index 53cee6e..0000000 --- a/src/utils/consul.rs +++ /dev/null @@ -1,98 +0,0 @@ -use crate::utils::kuberconsul::*; -use crate::utils::parceyaml::build_headers; -use crate::utils::structs::{Configuration, InnerMap, ServiceMapping, UpstreamsDashMap}; -use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams}; -use dashmap::DashMap; -use futures::channel::mpsc::Sender; -use futures::SinkExt; -use pingora::prelude::sleep; -use rand::Rng; -use reqwest::header::{HeaderMap, HeaderValue}; -use reqwest::Client; -use std::env; -use std::sync::atomic::AtomicUsize; -use std::sync::Arc; -use std::time::Duration; - -pub async fn start(mut toreturn: Sender, config: Arc) { - let prev_upstreams = UpstreamsDashMap::new(); - loop { - let upstreams = UpstreamsDashMap::new(); - if let Some(consul) = config.consul.clone() { - let servers = consul.servers.unwrap_or(vec![format!( - "{}:{}", - env::var("CONSUL_SERVICE_HOST").unwrap_or("0.0.0.0".to_string()), - env::var("CONSUL_SERVICE_PORT").unwrap_or("0".to_string()) - )]); - let end = servers.len() - 1; - let mut num = 0; - if end > 0 { - num = rand::rng().random_range(0..end); - } - let consul_data = servers.get(num).unwrap().to_string(); - let ss = consul_data + "/v1/catalog/service/"; - if let Some(svc) = consul.services { - for i in svc { - let header_list = DashMap::new(); - let mut hl = Vec::new(); - build_headers(&i.headers, config.as_ref(), &mut hl); - if hl.len() > 0 { - header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl); - config.headers.insert(i.hostname.clone(), header_list); - } - let pref: String = ss.clone() + &i.upstream; - let list = get_by_http(pref, consul.token.clone(), &i).await; - list_to_upstreams(list, &upstreams, &i); - } - } - if !compare_dashmaps(&upstreams, &prev_upstreams) { - let tosend: Configuration = Configuration { - upstreams: Default::default(), - headers: config.headers.clone(), - consul: config.consul.clone(), - kubernetes: config.kubernetes.clone(), - typecfg: config.typecfg.clone(), - extraparams: config.extraparams.clone(), - }; - clone_dashmap_into(&upstreams, &prev_upstreams); - clone_dashmap_into(&upstreams, &tosend.upstreams); - print_upstreams(&tosend.upstreams); - toreturn.send(tosend).await.unwrap(); - } - } - sleep(Duration::from_secs(5)).await; - } -} - -async fn get_by_http(url: String, token: Option, conf: &ServiceMapping) -> Option, AtomicUsize)>> { - let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().ok()?; - let mut headers = HeaderMap::new(); - if let Some(token) = token { - headers.insert("X-Consul-Token", HeaderValue::from_str(&token).unwrap()); - } - let to = Duration::from_secs(1); - let resp = client.get(url).timeout(to).send().await.ok()?; - if !resp.status().is_success() { - eprintln!("Consul API returned status: {}", resp.status()); - return None; - } - let mut inner_vec = Vec::new(); - let upstreams: DashMap, AtomicUsize)> = DashMap::new(); - let endpoints: Vec = resp.json().await.ok()?; - for subsets in endpoints { - let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone(); - let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone(); - let to_add = InnerMap { - address: addr, - port: prt, - is_ssl: false, - is_http2: false, - to_https: conf.to_https.unwrap_or(false), - rate_limit: conf.rate_limit, - healthcheck: None, - }; - inner_vec.push(to_add); - } - match_path(&conf, &upstreams, inner_vec.clone()); - Some(upstreams) -} diff --git a/src/utils/discovery.rs b/src/utils/discovery.rs index 826f87a..405c25e 100644 --- a/src/utils/discovery.rs +++ b/src/utils/discovery.rs @@ -1,6 +1,6 @@ use crate::utils::filewatch; +use crate::utils::kuberconsul::{ConsulDiscovery, KubernetesDiscovery, ServiceDiscovery}; use crate::utils::structs::Configuration; -use crate::utils::{consul, kuber}; use crate::web::webserver; use async_trait::async_trait; use futures::channel::mpsc::Sender; @@ -51,13 +51,13 @@ impl Discovery for FromFileProvider { #[async_trait] impl Discovery for ConsulProvider { async fn start(&self, tx: Sender) { - tokio::spawn(consul::start(tx.clone(), self.config.clone())); + tokio::spawn(ConsulDiscovery.fetch_upstreams(self.config.clone(), tx)); } } #[async_trait] impl Discovery for KubernetesProvider { async fn start(&self, tx: Sender) { - tokio::spawn(kuber::start(tx.clone(), self.config.clone())); + tokio::spawn(KubernetesDiscovery.fetch_upstreams(self.config.clone(), tx)); } } diff --git a/src/utils/kuber.rs b/src/utils/kuber.rs deleted file mode 100644 index 5e37dcc..0000000 --- a/src/utils/kuber.rs +++ /dev/null @@ -1,109 +0,0 @@ -use crate::utils::kuberconsul::*; -use crate::utils::parceyaml::build_headers; -use crate::utils::structs::{Configuration, InnerMap, ServiceMapping, UpstreamsDashMap}; -use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams}; -use dashmap::DashMap; -use futures::channel::mpsc::Sender; -use futures::SinkExt; -use pingora::prelude::sleep; -use rand::Rng; -use reqwest::Client; -use std::env; -use std::sync::atomic::AtomicUsize; -use std::sync::Arc; -use std::time::Duration; -use tokio::fs::File; -use tokio::io::AsyncReadExt; - -async fn read_token(path: &str) -> String { - let mut file = File::open(path).await.unwrap(); - let mut contents = String::new(); - file.read_to_string(&mut contents).await.unwrap(); - contents.trim().to_string() -} -pub async fn start(mut toreturn: Sender, config: Arc) { - let prev_upstreams = UpstreamsDashMap::new(); - loop { - let upstreams = UpstreamsDashMap::new(); - if let Some(kuber) = config.kubernetes.clone() { - let path = kuber.tokenpath.unwrap_or("/var/run/secrets/kubernetes.io/serviceaccount/token".to_string()); - let token = read_token(path.as_str()).await; - let servers = kuber.servers.unwrap_or(vec![format!( - "{}:{}", - env::var("KUBERNETES_SERVICE_HOST").unwrap_or("0.0.0.0".to_string()), - env::var("KUBERNETES_SERVICE_PORT_HTTPS").unwrap_or("0".to_string()) - )]); - let end = servers.len() - 1; - let mut num = 0; - if end > 0 { - num = rand::rng().random_range(0..end); - } - - let server = servers.get(num).unwrap().to_string(); - if let Some(svc) = kuber.services { - for i in svc { - let header_list = DashMap::new(); - let mut hl = Vec::new(); - build_headers(&i.headers, config.as_ref(), &mut hl); - if hl.len() > 0 { - header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl); - config.headers.insert(i.hostname.clone(), header_list); - } - let url = format!("https://{}/api/v1/namespaces/staging/endpoints/{}", server, i.hostname); - let list = get_by_http(&*url, &*token, &i).await; - list_to_upstreams(list, &upstreams, &i); - } - } - if !compare_dashmaps(&upstreams, &prev_upstreams) { - let tosend: Configuration = Configuration { - upstreams: Default::default(), - headers: config.headers.clone(), - consul: config.consul.clone(), - kubernetes: config.kubernetes.clone(), - typecfg: config.typecfg.clone(), - extraparams: config.extraparams.clone(), - }; - clone_dashmap_into(&upstreams, &prev_upstreams); - clone_dashmap_into(&upstreams, &tosend.upstreams); - print_upstreams(&tosend.upstreams); - toreturn.send(tosend).await.unwrap(); - } - } - sleep(Duration::from_secs(5)).await; - } -} - -pub async fn get_by_http(url: &str, token: &str, conf: &ServiceMapping) -> Option, AtomicUsize)>> { - let to = Duration::from_secs(10); - let client = Client::builder().timeout(Duration::from_secs(10)).danger_accept_invalid_certs(true).build().ok()?; - let resp = client.get(url).timeout(to).bearer_auth(token).send().await.ok()?; - if !resp.status().is_success() { - eprintln!("Kubernetes API returned status: {}", resp.status()); - return None; - } - let endpoints: KubeEndpoints = resp.json().await.ok()?; - let upstreams: DashMap, AtomicUsize)> = DashMap::new(); - if let Some(subsets) = endpoints.subsets { - for subset in subsets { - if let (Some(addresses), Some(ports)) = (subset.addresses, subset.ports) { - let mut inner_vec = Vec::new(); - for addr in addresses { - for port in &ports { - let to_add = InnerMap { - address: addr.ip.clone(), - port: port.port.clone(), - is_ssl: false, - is_http2: false, - to_https: conf.to_https.unwrap_or(false), - rate_limit: conf.rate_limit, - healthcheck: None, - }; - inner_vec.push(to_add); - } - } - match_path(&conf, &upstreams, inner_vec.clone()); - } - } - } - Some(upstreams) -} diff --git a/src/utils/kuberconsul.rs b/src/utils/kuberconsul.rs index 626ce07..0d8b4a7 100644 --- a/src/utils/kuberconsul.rs +++ b/src/utils/kuberconsul.rs @@ -1,8 +1,21 @@ -use crate::utils::structs::{InnerMap, ServiceMapping, UpstreamsDashMap}; +use crate::utils::httpclient; +use crate::utils::parceyaml::build_headers; +use crate::utils::structs::{Configuration, InnerMap, ServiceMapping, UpstreamsDashMap}; +use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams}; +use async_trait::async_trait; use dashmap::DashMap; +use futures::channel::mpsc::Sender; +use futures::SinkExt; +use pingora::prelude::sleep; +use rand::Rng; use serde::Deserialize; use std::collections::HashMap; +use std::env; use std::sync::atomic::AtomicUsize; +use std::sync::Arc; +use std::time::Duration; +use tokio::fs::File; +use tokio::io::AsyncReadExt; #[derive(Debug, serde::Deserialize)] pub struct KubeEndpoints { @@ -62,3 +75,121 @@ pub fn match_path(conf: &ServiceMapping, upstreams: &DashMap String { + let mut file = File::open(path).await.unwrap(); + let mut contents = String::new(); + file.read_to_string(&mut contents).await.unwrap(); + contents.trim().to_string() +} +#[async_trait] +pub trait ServiceDiscovery { + async fn fetch_upstreams(&self, config: Arc, toreturn: Sender); +} + +pub struct KubernetesDiscovery; +pub struct ConsulDiscovery; + +#[async_trait] +impl ServiceDiscovery for KubernetesDiscovery { + async fn fetch_upstreams(&self, config: Arc, mut toreturn: Sender) { + let prev_upstreams = UpstreamsDashMap::new(); + loop { + let upstreams = UpstreamsDashMap::new(); + + if let Some(kuber) = config.kubernetes.clone() { + let path = kuber.tokenpath.unwrap_or("/var/run/secrets/kubernetes.io/serviceaccount/token".to_string()); + let token = read_token(path.as_str()).await; + + let servers = kuber.servers.unwrap_or(vec![format!( + "{}:{}", + env::var("KUBERNETES_SERVICE_HOST").unwrap_or("0.0.0.0".to_string()), + env::var("KUBERNETES_SERVICE_PORT_HTTPS").unwrap_or("0".to_string()) + )]); + + let end = servers.len().saturating_sub(1); + let num = if end > 0 { rand::rng().random_range(0..end) } else { 0 }; + let server = servers.get(num).unwrap().to_string(); + + if let Some(svc) = kuber.services { + for i in svc { + let header_list = DashMap::new(); + let mut hl = Vec::new(); + build_headers(&i.headers, config.as_ref(), &mut hl); + if !hl.is_empty() { + header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl); + config.headers.insert(i.hostname.clone(), header_list); + } + + let url = format!("https://{}/api/v1/namespaces/staging/endpoints/{}", server, i.hostname); + let list = httpclient::for_kuber(&*url, &*token, &i).await; + list_to_upstreams(list, &upstreams, &i); + } + } + if let Some(lt) = clone_compare(&upstreams, &prev_upstreams, &config).await { + toreturn.send(lt).await.unwrap(); + } + } + sleep(Duration::from_secs(5)).await; + } + } +} + +#[async_trait] +impl ServiceDiscovery for ConsulDiscovery { + async fn fetch_upstreams(&self, config: Arc, mut toreturn: Sender) { + let prev_upstreams = UpstreamsDashMap::new(); + loop { + let upstreams = UpstreamsDashMap::new(); + + if let Some(consul) = config.consul.clone() { + let servers = consul.servers.unwrap_or(vec![format!( + "{}:{}", + env::var("CONSUL_SERVICE_HOST").unwrap_or("0.0.0.0".to_string()), + env::var("CONSUL_SERVICE_PORT").unwrap_or("0".to_string()) + )]); + let end = servers.len().saturating_sub(1); + let num = if end > 0 { rand::rng().random_range(0..end) } else { 0 }; + let consul_data = servers.get(num).unwrap().to_string(); + let ss = consul_data + "/v1/catalog/service/"; + + if let Some(svc) = consul.services { + for i in svc { + let header_list = DashMap::new(); + let mut hl = Vec::new(); + build_headers(&i.headers, config.as_ref(), &mut hl); + if !hl.is_empty() { + header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl); + config.headers.insert(i.hostname.clone(), header_list); + } + + let pref = ss.clone() + &i.upstream; + let list = httpclient::for_consul(pref, consul.token.clone(), &i).await; + list_to_upstreams(list, &upstreams, &i); + } + } + } + if let Some(lt) = clone_compare(&upstreams, &prev_upstreams, &config).await { + toreturn.send(lt).await.unwrap(); + } + sleep(Duration::from_secs(5)).await; + } + } +} +async fn clone_compare(upstreams: &UpstreamsDashMap, prev_upstreams: &UpstreamsDashMap, config: &Arc) -> Option { + if !compare_dashmaps(&upstreams, &prev_upstreams) { + let tosend: Configuration = Configuration { + upstreams: Default::default(), + headers: config.headers.clone(), + consul: config.consul.clone(), + kubernetes: config.kubernetes.clone(), + typecfg: config.typecfg.clone(), + extraparams: config.extraparams.clone(), + }; + clone_dashmap_into(&upstreams, &prev_upstreams); + clone_dashmap_into(&upstreams, &tosend.upstreams); + print_upstreams(&tosend.upstreams); + return Some(tosend); + }; + None +}