This commit is contained in:
Ara Sadoyan
2026-03-26 14:17:59 +01:00
parent 1f8efc6af7
commit 4706b281bc
4 changed files with 20 additions and 31 deletions

View File

@@ -1,5 +1,5 @@
use crate::utils::kuberconsul::{match_path, ConsulService, KubeEndpoints}; use crate::utils::kuberconsul::{match_path, ConsulService, KubeEndpoints};
use crate::utils::structs::{InnerMap, ServiceMapping}; use crate::utils::structs::{GlobalServiceMapping, InnerMap};
use axum::http::{HeaderMap, HeaderValue}; use axum::http::{HeaderMap, HeaderValue};
use dashmap::DashMap; use dashmap::DashMap;
use reqwest::Client; use reqwest::Client;
@@ -7,7 +7,7 @@ use std::sync::atomic::AtomicUsize;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
pub async fn for_consul(url: String, token: Option<String>, conf: &ServiceMapping) -> Option<DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)>> { pub async fn for_consul(url: String, token: Option<String>, conf: &GlobalServiceMapping) -> Option<DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)>> {
let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().ok()?; let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().ok()?;
let mut headers = HeaderMap::new(); let mut headers = HeaderMap::new();
if let Some(token) = token { if let Some(token) = token {
@@ -27,7 +27,7 @@ pub async fn for_consul(url: String, token: Option<String>, conf: &ServiceMappin
// let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone(); // let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone();
let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone(); let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone();
let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone(); let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone();
let redirect_link = conf.redirect_to.as_ref().map(|www| Arc::from(www.as_str())); // let redirect_link = conf.redirect_to.as_ref().map(|www| Arc::from(www.as_str()));
let to_add = Arc::from(InnerMap { let to_add = Arc::from(InnerMap {
address: Arc::from(&*addr), address: Arc::from(&*addr),
port: prt, port: prt,
@@ -35,7 +35,7 @@ pub async fn for_consul(url: String, token: Option<String>, conf: &ServiceMappin
is_http2: false, is_http2: false,
to_https: conf.to_https.unwrap_or(false), to_https: conf.to_https.unwrap_or(false),
rate_limit: conf.rate_limit, rate_limit: conf.rate_limit,
redirect_to: redirect_link, redirect_to: None,
healthcheck: None, healthcheck: None,
authorization: None, authorization: None,
}); });
@@ -45,7 +45,7 @@ pub async fn for_consul(url: String, token: Option<String>, conf: &ServiceMappin
Some(upstreams) Some(upstreams)
} }
pub async fn for_kuber(url: &str, token: &str, conf: &ServiceMapping) -> Option<DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)>> { pub async fn for_kuber(url: &str, token: &str, conf: &GlobalServiceMapping) -> Option<DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)>> {
let to = Duration::from_secs(10); let to = Duration::from_secs(10);
let client = Client::builder().timeout(Duration::from_secs(10)).danger_accept_invalid_certs(true).build().ok()?; let client = Client::builder().timeout(Duration::from_secs(10)).danger_accept_invalid_certs(true).build().ok()?;
let resp = client.get(url).timeout(to).bearer_auth(token).send().await.ok()?; let resp = client.get(url).timeout(to).bearer_auth(token).send().await.ok()?;
@@ -63,7 +63,7 @@ pub async fn for_kuber(url: &str, token: &str, conf: &ServiceMapping) -> Option<
let mut inner_vec = Vec::new(); let mut inner_vec = Vec::new();
for addr in addresses { for addr in addresses {
for port in &ports { for port in &ports {
let redirect_link = conf.redirect_to.as_ref().map(|www| Arc::from(www.as_str())); // let redirect_link = conf.redirect_to.as_ref().map(|www| Arc::from(www.as_str()));
let to_add = Arc::from(InnerMap { let to_add = Arc::from(InnerMap {
address: Arc::from(addr.ip.clone()), address: Arc::from(addr.ip.clone()),
port: port.port.clone(), port: port.port.clone(),
@@ -72,7 +72,7 @@ pub async fn for_kuber(url: &str, token: &str, conf: &ServiceMapping) -> Option<
to_https: conf.to_https.unwrap_or(false), to_https: conf.to_https.unwrap_or(false),
rate_limit: conf.rate_limit, rate_limit: conf.rate_limit,
healthcheck: None, healthcheck: None,
redirect_to: redirect_link, redirect_to: None,
authorization: None, authorization: None,
}); });
inner_vec.push(to_add); inner_vec.push(to_add);

View File

@@ -1,6 +1,6 @@
use crate::utils::httpclient; use crate::utils::httpclient;
use crate::utils::parceyaml::build_headers; use crate::utils::parceyaml::build_headers;
use crate::utils::structs::{Configuration, InnerMap, ServiceMapping, UpstreamsDashMap}; use crate::utils::structs::{Configuration, GlobalServiceMapping, InnerMap, UpstreamsDashMap};
use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams}; use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams};
use async_trait::async_trait; use async_trait::async_trait;
use dashmap::DashMap; use dashmap::DashMap;
@@ -52,7 +52,7 @@ pub struct ConsulTaggedAddress {
#[serde(rename = "Port")] #[serde(rename = "Port")]
pub port: u16, pub port: u16,
} }
pub fn list_to_upstreams(lt: Option<DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)>>, upstreams: &UpstreamsDashMap, i: &ServiceMapping) { pub fn list_to_upstreams(lt: Option<DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)>>, upstreams: &UpstreamsDashMap, i: &GlobalServiceMapping) {
if let Some(list) = lt { if let Some(list) = lt {
match upstreams.get(&*i.hostname.clone()) { match upstreams.get(&*i.hostname.clone()) {
Some(upstr) => { Some(upstr) => {
@@ -67,7 +67,7 @@ pub fn list_to_upstreams(lt: Option<DashMap<Arc<str>, (Vec<Arc<InnerMap>>, Atomi
} }
} }
pub fn match_path(conf: &ServiceMapping, upstreams: &DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)>, values: Vec<Arc<InnerMap>>) { pub fn match_path(conf: &GlobalServiceMapping, upstreams: &DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)>, values: Vec<Arc<InnerMap>>) {
match conf.path { match conf.path {
Some(ref p) => { Some(ref p) => {
upstreams.insert(Arc::from(p.clone()), (values, AtomicUsize::new(0))); upstreams.insert(Arc::from(p.clone()), (values, AtomicUsize::new(0)));

View File

@@ -18,14 +18,13 @@ pub struct Extraparams {
} }
#[derive(Debug, Default, Clone, Serialize, Deserialize)] #[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct ServiceMapping { pub struct GlobalServiceMapping {
pub upstream: String, pub upstream: String,
pub hostname: String, pub hostname: String,
pub path: Option<String>, pub path: Option<String>,
pub to_https: Option<bool>, pub to_https: Option<bool>,
pub sticky_sessions: Option<bool>, pub sticky_sessions: Option<bool>,
pub rate_limit: Option<isize>, pub rate_limit: Option<isize>,
pub redirect_to: Option<String>,
pub client_headers: Option<Vec<String>>, pub client_headers: Option<Vec<String>>,
pub server_headers: Option<Vec<String>>, pub server_headers: Option<Vec<String>>,
} }
@@ -33,14 +32,14 @@ pub struct ServiceMapping {
#[derive(Clone, Default, Debug, Serialize, Deserialize)] #[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct Kubernetes { pub struct Kubernetes {
pub servers: Option<Vec<String>>, pub servers: Option<Vec<String>>,
pub services: Option<Vec<ServiceMapping>>, pub services: Option<Vec<GlobalServiceMapping>>,
pub tokenpath: Option<String>, pub tokenpath: Option<String>,
} }
#[derive(Clone, Default, Debug, Serialize, Deserialize)] #[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct Consul { pub struct Consul {
pub servers: Option<Vec<String>>, pub servers: Option<Vec<String>>,
pub services: Option<Vec<ServiceMapping>>, pub services: Option<Vec<GlobalServiceMapping>>,
pub token: Option<String>, pub token: Option<String>,
} }
#[derive(Debug, Default, Serialize, Deserialize)] #[derive(Debug, Default, Serialize, Deserialize)]

View File

@@ -43,7 +43,6 @@ pub struct LB {
pub struct Context { pub struct Context {
backend_id: Option<String>, backend_id: Option<String>,
// to_https: bool,
sticky_sessions: bool, sticky_sessions: bool,
redirect_to: Option<String>, redirect_to: Option<String>,
start_time: Instant, start_time: Instant,
@@ -59,7 +58,6 @@ impl ProxyHttp for LB {
fn new_ctx(&self) -> Self::CTX { fn new_ctx(&self) -> Self::CTX {
Context { Context {
backend_id: None, backend_id: None,
// to_https: false,
sticky_sessions: false, sticky_sessions: false,
redirect_to: None, redirect_to: None,
start_time: Instant::now(), start_time: Instant::now(),
@@ -163,13 +161,13 @@ impl ProxyHttp for LB {
peer.options.tcp_recv_buf = Some(128 * 1024); peer.options.tcp_recv_buf = Some(128 * 1024);
End of experimental options End of experimental options
*/ */
if let Some(redirect_to) = &innermap.redirect_to { if let Some(redirect_to) = &innermap.redirect_to {
let uri = session.req_header().uri.path(); let uri = session.req_header().uri.path();
let capacity = redirect_to.len() + uri.len(); let capacity = redirect_to.len() + uri.len();
let mut s = String::with_capacity(capacity); let mut s = String::with_capacity(capacity);
s.push_str(redirect_to); s.push_str(redirect_to);
s.push_str(uri); s.push_str(uri);
// ctx.to_https = true;
ctx.redirect_to = Some(s); ctx.redirect_to = Some(s);
} }
@@ -185,7 +183,6 @@ impl ProxyHttp for LB {
s.push_str(host); s.push_str(host);
s.push_str(port.as_str()); s.push_str(port.as_str());
s.push_str(uri); s.push_str(uri);
// ctx.to_https = true;
ctx.redirect_to = Some(s); ctx.redirect_to = Some(s);
} }
} }
@@ -271,7 +268,7 @@ impl ProxyHttp for LB {
} }
Ok(()) Ok(())
} }
async fn response_filter(&self, session: &mut Session, _upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX) -> Result<()> { async fn response_filter(&self, _session: &mut Session, _upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX) -> Result<()> {
if ctx.sticky_sessions { if ctx.sticky_sessions {
if let Some(bid) = &ctx.backend_id { if let Some(bid) = &ctx.backend_id {
let tt = if let Some(existing) = REVERSE_STORE.get(bid) { let tt = if let Some(existing) = REVERSE_STORE.get(bid) {
@@ -290,20 +287,13 @@ impl ProxyHttp for LB {
} }
} }
if let Some(_) = &ctx.redirect_to { if let Some(redirect_to) = &ctx.redirect_to {
let mut redirect_response = ResponseHeader::build(StatusCode::MOVED_PERMANENTLY, None)?; *_upstream_response = ResponseHeader::build(StatusCode::MOVED_PERMANENTLY, None)?;
redirect_response.insert_header("Location", ctx.redirect_to.clone().unwrap_or(String::from("/")))?; _upstream_response.insert_header("Location", redirect_to)?;
redirect_response.insert_header("Content-Length", "0")?; _upstream_response.insert_header("Content-Length", "0")?;
session.write_response_header(Box::new(redirect_response), false).await?; return Ok(());
} }
// if ctx.to_https {
// let mut redirect_response = ResponseHeader::build(StatusCode::MOVED_PERMANENTLY, None)?;
// redirect_response.insert_header("Location", ctx.redirect_to.clone().unwrap_or(String::from("/")))?;
// redirect_response.insert_header("Content-Length", "0")?;
// session.write_response_header(Box::new(redirect_response), false).await?;
// }
// ALLOCATIONS ! // ALLOCATIONS !
if let Some(client_headers) = &ctx.client_headers { if let Some(client_headers) = &ctx.client_headers {
for (k, v) in client_headers.iter() { for (k, v) in client_headers.iter() {