Path filtering and rate limit for kubernetes

This commit is contained in:
Ara Sadoyan
2025-10-15 13:42:05 +02:00
parent 5359c2e8e9
commit 25693a7058
4 changed files with 46 additions and 24 deletions

View File

@@ -86,14 +86,14 @@ async fn consul_request(url: String, whitelist: Option<Vec<ServiceMapping>>, tok
match whitelist {
Some(whitelist) => {
for k in whitelist.iter() {
let pref: String = ss.clone() + &k.real;
let pref: String = ss.clone() + &k.hostname;
let list = get_by_http(pref.clone(), token.clone()).await;
match list {
Some(list) => {
upstreams.insert(k.proxy.clone(), list);
upstreams.insert(k.upstream.clone(), list);
}
None => {
warn!("Whitelist not found for {}", k.proxy);
warn!("Whitelist not found for {}", k.upstream);
}
}
}

View File

@@ -1,5 +1,5 @@
// use crate::utils::dnsclient::DnsClientPool;
use crate::utils::structs::{Configuration, InnerMap, UpstreamsDashMap};
use crate::utils::structs::{Configuration, InnerMap, ServiceMapping, UpstreamsDashMap};
use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams};
use dashmap::DashMap;
use futures::channel::mpsc::Sender;
@@ -40,7 +40,7 @@ struct Port {
}
pub async fn start(mut toreturn: Sender<Configuration>, config: Arc<Configuration>) {
// println!("{:?}", config);
println!("{:?}", config);
let upstreams = UpstreamsDashMap::new();
let prev_upstreams = UpstreamsDashMap::new();
loop {
@@ -57,31 +57,37 @@ pub async fn start(mut toreturn: Sender<Configuration>, config: Arc<Configuratio
if end > 0 {
num = rand::rng().random_range(0..end);
}
let server = servers.get(num).unwrap().to_string();
if let Some(svc) = kuber.services {
for i in svc {
let path = i.path.unwrap_or("/".to_string());
let url = format!("https://{}/api/v1/namespaces/staging/endpoints/{}", server, i.real);
let list = get_by_http(&*url, &*token, &*path).await;
// println!("{:?}", list);
let header_list = DashMap::new();
let mut hl = Vec::new();
if let Some(headers) = &i.headers {
for header in headers {
if let Some((key, val)) = header.split_once(':') {
hl.push((key.trim().to_string(), val.trim().to_string()));
}
}
}
header_list.insert(path.clone(), hl);
let url = format!("https://{}/api/v1/namespaces/staging/endpoints/{}", server, i.hostname);
let list = get_by_http(&*url, &*token, &i).await;
if let Some(list) = list {
match upstreams.get(&i.proxy.clone()) {
match upstreams.get(&i.upstream.clone()) {
Some(foo) => {
for (k, v) in list {
foo.value().insert(k, v);
}
}
None => {
upstreams.insert(i.proxy.clone(), list);
upstreams.insert(i.upstream.clone(), list);
}
};
}
}
}
}
// print_upstreams(&upstreams);
if !compare_dashmaps(&upstreams, &prev_upstreams) {
let tosend: Configuration = Configuration {
@@ -92,7 +98,6 @@ pub async fn start(mut toreturn: Sender<Configuration>, config: Arc<Configuratio
typecfg: config.typecfg.clone(),
extraparams: config.extraparams.clone(),
};
clone_dashmap_into(&upstreams, &prev_upstreams);
clone_dashmap_into(&upstreams, &tosend.upstreams);
print_upstreams(&tosend.upstreams);
@@ -102,7 +107,7 @@ pub async fn start(mut toreturn: Sender<Configuration>, config: Arc<Configuratio
}
}
pub async fn get_by_http(url: &str, token: &str, path: &str) -> Option<DashMap<String, (Vec<InnerMap>, AtomicUsize)>> {
pub async fn get_by_http(url: &str, token: &str, conf: &ServiceMapping) -> Option<DashMap<String, (Vec<InnerMap>, AtomicUsize)>> {
let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().ok()?;
let resp = client.get(url).bearer_auth(token).send().await.ok()?;
@@ -114,7 +119,7 @@ pub async fn get_by_http(url: &str, token: &str, path: &str) -> Option<DashMap<S
let endpoints: Endpoints = resp.json().await.ok()?;
let upstreams: DashMap<String, (Vec<InnerMap>, AtomicUsize)> = DashMap::new();
// println!(" ===> {:?} : {:?}", conf.to_https.unwrap_or(false), conf.rate_limit);
if let Some(subsets) = endpoints.subsets {
for subset in subsets {
if let (Some(addresses), Some(ports)) = (subset.addresses, subset.ports) {
@@ -126,14 +131,21 @@ pub async fn get_by_http(url: &str, token: &str, path: &str) -> Option<DashMap<S
port: port.port.clone(),
is_ssl: false,
is_http2: false,
to_https: false,
rate_limit: None,
to_https: conf.to_https.unwrap_or(false),
rate_limit: conf.rate_limit,
healthcheck: None,
};
inner_vec.push(to_add);
}
}
upstreams.insert(path.to_string(), (inner_vec, AtomicUsize::new(0)));
match conf.path {
Some(ref p) => {
upstreams.insert(p.to_string(), (inner_vec, AtomicUsize::new(0)));
}
None => {
upstreams.insert("/".to_string(), (inner_vec, AtomicUsize::new(0)));
}
}
}
}
}

View File

@@ -10,11 +10,16 @@ pub type Headers = DashMap<String, DashMap<String, Vec<(String, String)>>>;
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct ServiceMapping {
pub proxy: String,
pub real: String,
pub upstream: String,
pub hostname: String,
pub path: Option<String>,
pub to_https: Option<bool>,
pub rate_limit: Option<isize>,
pub headers: Option<Vec<String>>,
}
// pub type Services = DashMap<String, Vec<(String, Option<String>)>>;
#[derive(Clone, Debug, Default)]
pub struct Extraparams {
pub sticky_sessions: bool,

View File

@@ -29,8 +29,13 @@ pub fn print_upstreams(upstreams: &UpstreamsDashMap) {
println!(" Path: {}", path);
for f in path_entry.value().0.clone() {
println!(
" IP: {}, Port: {}, SSL: {}, H2: {}, To HTTPS: {}",
f.address, f.port, f.is_ssl, f.is_http2, f.to_https
" IP: {}, Port: {}, SSL: {}, H2: {}, To HTTPS: {}, Rate Limit: {}",
f.address,
f.port,
f.is_ssl,
f.is_http2,
f.to_https,
f.rate_limit.unwrap_or(0)
);
}
}