From 25693a70589979be7d7e76dc4f60a3d507b44e6e Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Wed, 15 Oct 2025 13:42:05 +0200 Subject: [PATCH] Path filtering and rate limit for kubernetes --- src/utils/consul.rs | 6 +++--- src/utils/kuber.rs | 46 ++++++++++++++++++++++++++++---------------- src/utils/structs.rs | 9 +++++++-- src/utils/tools.rs | 9 +++++++-- 4 files changed, 46 insertions(+), 24 deletions(-) diff --git a/src/utils/consul.rs b/src/utils/consul.rs index 36b2a88..491ef57 100644 --- a/src/utils/consul.rs +++ b/src/utils/consul.rs @@ -86,14 +86,14 @@ async fn consul_request(url: String, whitelist: Option>, tok match whitelist { Some(whitelist) => { for k in whitelist.iter() { - let pref: String = ss.clone() + &k.real; + let pref: String = ss.clone() + &k.hostname; let list = get_by_http(pref.clone(), token.clone()).await; match list { Some(list) => { - upstreams.insert(k.proxy.clone(), list); + upstreams.insert(k.upstream.clone(), list); } None => { - warn!("Whitelist not found for {}", k.proxy); + warn!("Whitelist not found for {}", k.upstream); } } } diff --git a/src/utils/kuber.rs b/src/utils/kuber.rs index e29e1dc..62e3442 100644 --- a/src/utils/kuber.rs +++ b/src/utils/kuber.rs @@ -1,5 +1,5 @@ // use crate::utils::dnsclient::DnsClientPool; -use crate::utils::structs::{Configuration, InnerMap, UpstreamsDashMap}; +use crate::utils::structs::{Configuration, InnerMap, ServiceMapping, UpstreamsDashMap}; use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams}; use dashmap::DashMap; use futures::channel::mpsc::Sender; @@ -40,7 +40,7 @@ struct Port { } pub async fn start(mut toreturn: Sender, config: Arc) { - // println!("{:?}", config); + println!("{:?}", config); let upstreams = UpstreamsDashMap::new(); let prev_upstreams = UpstreamsDashMap::new(); loop { @@ -57,31 +57,37 @@ pub async fn start(mut toreturn: Sender, config: Arc 0 { num = rand::rng().random_range(0..end); } + let server = servers.get(num).unwrap().to_string(); if let Some(svc) = kuber.services { for i in svc { - let path = i.path.unwrap_or("/".to_string()); - let url = format!("https://{}/api/v1/namespaces/staging/endpoints/{}", server, i.real); - let list = get_by_http(&*url, &*token, &*path).await; - - // println!("{:?}", list); - + let header_list = DashMap::new(); + let mut hl = Vec::new(); + if let Some(headers) = &i.headers { + for header in headers { + if let Some((key, val)) = header.split_once(':') { + hl.push((key.trim().to_string(), val.trim().to_string())); + } + } + } + header_list.insert(path.clone(), hl); + let url = format!("https://{}/api/v1/namespaces/staging/endpoints/{}", server, i.hostname); + let list = get_by_http(&*url, &*token, &i).await; if let Some(list) = list { - match upstreams.get(&i.proxy.clone()) { + match upstreams.get(&i.upstream.clone()) { Some(foo) => { for (k, v) in list { foo.value().insert(k, v); } } None => { - upstreams.insert(i.proxy.clone(), list); + upstreams.insert(i.upstream.clone(), list); } }; } } } } - // print_upstreams(&upstreams); if !compare_dashmaps(&upstreams, &prev_upstreams) { let tosend: Configuration = Configuration { @@ -92,7 +98,6 @@ pub async fn start(mut toreturn: Sender, config: Arc, config: Arc Option, AtomicUsize)>> { +pub async fn get_by_http(url: &str, token: &str, conf: &ServiceMapping) -> Option, AtomicUsize)>> { let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().ok()?; let resp = client.get(url).bearer_auth(token).send().await.ok()?; @@ -114,7 +119,7 @@ pub async fn get_by_http(url: &str, token: &str, path: &str) -> Option, AtomicUsize)> = DashMap::new(); - + // println!(" ===> {:?} : {:?}", conf.to_https.unwrap_or(false), conf.rate_limit); if let Some(subsets) = endpoints.subsets { for subset in subsets { if let (Some(addresses), Some(ports)) = (subset.addresses, subset.ports) { @@ -126,14 +131,21 @@ pub async fn get_by_http(url: &str, token: &str, path: &str) -> Option { + upstreams.insert(p.to_string(), (inner_vec, AtomicUsize::new(0))); + } + None => { + upstreams.insert("/".to_string(), (inner_vec, AtomicUsize::new(0))); + } + } } } } diff --git a/src/utils/structs.rs b/src/utils/structs.rs index 3da0b15..8dceb84 100644 --- a/src/utils/structs.rs +++ b/src/utils/structs.rs @@ -10,11 +10,16 @@ pub type Headers = DashMap>>; #[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct ServiceMapping { - pub proxy: String, - pub real: String, + pub upstream: String, + pub hostname: String, pub path: Option, + pub to_https: Option, + pub rate_limit: Option, + pub headers: Option>, } +// pub type Services = DashMap)>>; + #[derive(Clone, Debug, Default)] pub struct Extraparams { pub sticky_sessions: bool, diff --git a/src/utils/tools.rs b/src/utils/tools.rs index 17bae50..b5efbd0 100644 --- a/src/utils/tools.rs +++ b/src/utils/tools.rs @@ -29,8 +29,13 @@ pub fn print_upstreams(upstreams: &UpstreamsDashMap) { println!(" Path: {}", path); for f in path_entry.value().0.clone() { println!( - " IP: {}, Port: {}, SSL: {}, H2: {}, To HTTPS: {}", - f.address, f.port, f.is_ssl, f.is_http2, f.to_https + " IP: {}, Port: {}, SSL: {}, H2: {}, To HTTPS: {}, Rate Limit: {}", + f.address, + f.port, + f.is_ssl, + f.is_http2, + f.to_https, + f.rate_limit.unwrap_or(0) ); } }