From ed44516015a8c5ff22fd4e13bfd3a1d7336c9fd5 Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Tue, 24 Mar 2026 16:08:14 +0100 Subject: [PATCH] added redirect_to directive for upstreams --- src/main.rs | 1 + src/utils/healthcheck.rs | 1 + src/utils/httpclient.rs | 4 ++ src/utils/parceyaml.rs | 20 ++++++++-- src/utils/structs.rs | 7 +++- src/utils/tools.rs | 16 +++++++- src/web/proxyhttp.rs | 86 ++++++++++++++++++++++------------------ 7 files changed, 90 insertions(+), 45 deletions(-) diff --git a/src/main.rs b/src/main.rs index fa8fea4..42c16d7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ mod web; #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; +// pub static A: CountingAllocator = CountingAllocator; fn main() { web::start::run(); diff --git a/src/utils/healthcheck.rs b/src/utils/healthcheck.rs index 2761730..16cdcdf 100644 --- a/src/utils/healthcheck.rs +++ b/src/utils/healthcheck.rs @@ -70,6 +70,7 @@ async fn build_upstreams(fullist: &UpstreamsDashMap, method: &str, client: &Clie to_https: upstream.to_https, rate_limit: upstream.rate_limit, healthcheck: upstream.healthcheck, + redirect_to: upstream.redirect_to.clone(), authorization: upstream.authorization.clone(), }; diff --git a/src/utils/httpclient.rs b/src/utils/httpclient.rs index 519d998..9b00f06 100644 --- a/src/utils/httpclient.rs +++ b/src/utils/httpclient.rs @@ -27,6 +27,7 @@ pub async fn for_consul(url: String, token: Option, conf: &ServiceMappin // let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone(); let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone(); let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone(); + let redirect_link = conf.redirect_to.as_ref().map(|www| Arc::from(www.as_str())); let to_add = Arc::from(InnerMap { address: Arc::from(&*addr), port: prt, @@ -34,6 +35,7 @@ pub async fn for_consul(url: String, token: Option, conf: &ServiceMappin is_http2: false, to_https: conf.to_https.unwrap_or(false), rate_limit: conf.rate_limit, + redirect_to: redirect_link, healthcheck: None, authorization: None, }); @@ -61,6 +63,7 @@ pub async fn for_kuber(url: &str, token: &str, conf: &ServiceMapping) -> Option< let mut inner_vec = Vec::new(); for addr in addresses { for port in &ports { + let redirect_link = conf.redirect_to.as_ref().map(|www| Arc::from(www.as_str())); let to_add = Arc::from(InnerMap { address: Arc::from(addr.ip.clone()), port: port.port.clone(), @@ -69,6 +72,7 @@ pub async fn for_kuber(url: &str, token: &str, conf: &ServiceMapping) -> Option< to_https: conf.to_https.unwrap_or(false), rate_limit: conf.rate_limit, healthcheck: None, + redirect_to: redirect_link, authorization: None, }); inner_vec.push(to_add); diff --git a/src/utils/parceyaml.rs b/src/utils/parceyaml.rs index 4a5b650..1a6cd29 100644 --- a/src/utils/parceyaml.rs +++ b/src/utils/parceyaml.rs @@ -135,7 +135,7 @@ async fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) { }; path_auth = Some(Arc::from(y)); } - + let redirect_link = path_config.redirect_to.as_ref().map(|www| Arc::from(www.as_str())); if let Some((ip, port_str)) = server.split_once(':') { if let Ok(port) = port_str.parse::() { server_list.push(Arc::from(InnerMap { @@ -146,6 +146,7 @@ async fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) { to_https: path_config.to_https.unwrap_or(false), rate_limit: path_config.rate_limit, healthcheck: path_config.healthcheck, + redirect_to: redirect_link, authorization: path_auth, })); } @@ -185,13 +186,24 @@ pub fn parce_main_config(path: &str) -> AppConfig { cfo.local_server = Option::from((ip.to_string(), port)); } } + // if let Some(tlsport_cfg) = cfo.proxy_address_tls.clone() { + // if let Some((_, port_str)) = tlsport_cfg.split_once(':') { + // if let Ok(port) = port_str.parse::() { + // cfo.proxy_port_tls = Some(port); + // } + // } + // }; + if let Some(tlsport_cfg) = cfo.proxy_address_tls.clone() { if let Some((_, port_str)) = tlsport_cfg.split_once(':') { - if let Ok(port) = port_str.parse::() { - cfo.proxy_port_tls = Some(port); - } + cfo.proxy_port_tls = Some(port_str.to_string()); } }; + + if let Some((_, port_str)) = cfo.proxy_address_http.split_once(':') { + cfo.proxy_port = Some(port_str.to_string()); + } + cfo.proxy_tls_grade = parce_tls_grades(cfo.proxy_tls_grade.clone()); cfo } diff --git a/src/utils/structs.rs b/src/utils/structs.rs index 73bbf7e..67bf329 100644 --- a/src/utils/structs.rs +++ b/src/utils/structs.rs @@ -25,6 +25,7 @@ pub struct ServiceMapping { pub to_https: Option, pub sticky_sessions: Option, pub rate_limit: Option, + pub redirect_to: Option, pub client_headers: Option>, pub server_headers: Option>, } @@ -86,6 +87,7 @@ pub struct PathConfig { pub server_headers: Option>, pub rate_limit: Option, pub healthcheck: Option, + pub redirect_to: Option, pub authorization: Option, } #[derive(Debug, Default)] @@ -113,7 +115,8 @@ pub struct AppConfig { pub config_tls_certificate: Option, pub config_tls_key_file: Option, pub proxy_address_tls: Option, - pub proxy_port_tls: Option, + pub proxy_port_tls: Option, + pub proxy_port: Option, pub local_server: Option<(String, u16)>, pub proxy_certificates: Option, pub proxy_tls_grade: Option, @@ -138,6 +141,7 @@ pub struct InnerMap { pub to_https: bool, pub rate_limit: Option, pub healthcheck: Option, + pub redirect_to: Option>, pub authorization: Option>, } @@ -152,6 +156,7 @@ impl InnerMap { to_https: Default::default(), rate_limit: Default::default(), healthcheck: Default::default(), + redirect_to: Default::default(), authorization: Default::default(), } } diff --git a/src/utils/tools.rs b/src/utils/tools.rs index dbc7a7a..d43669a 100644 --- a/src/utils/tools.rs +++ b/src/utils/tools.rs @@ -194,6 +194,7 @@ pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) { to_https: false, rate_limit: None, healthcheck: None, + redirect_to: None, authorization: None, }; cloned.insert(id, Arc::from(to_add)); @@ -203,7 +204,7 @@ pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) { new_inner_map.insert(path.clone(), new_vec); } } - info!("Upstreams are fully populated, ready to server requests"); + info!("Upstreams are fully populated. Ready to server requests"); } pub fn listdir(dir: String) -> Vec { @@ -382,3 +383,16 @@ pub fn upstreams_liveness_json(configured: &UpstreamsDashMap, current: &Upstream } Value::Object(result) } + +#[allow(dead_code)] +pub fn prepend(prefix: &str, val: &Option>, uri: &str, port: &str) -> Option { + val.as_ref().map(|s| { + let mut buf = String::with_capacity(32); + buf.push_str(prefix); + buf.push_str(s); + buf.push_str(":"); + buf.push_str(port); + buf.push_str(uri); + buf + }) +} diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index b69067a..49e06fb 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -1,6 +1,7 @@ use crate::utils::auth::authenticate; use crate::utils::metrics::*; use crate::utils::structs::{AppConfig, Extraparams, Headers, InnerMap, UpstreamsDashMap, UpstreamsIdMap}; +// use std::collections::BTreeMap; use crate::web::gethosts::{GetHost, GetHostsReturHeaders}; use arc_swap::ArcSwap; use async_trait::async_trait; @@ -20,7 +21,6 @@ use pingora_limits::rate::Rate; use pingora_proxy::{ProxyHttp, Session}; use sha2::{Digest, Sha256}; use std::cell::RefCell; -// use std::collections::BTreeMap; use std::fmt::Write; use std::sync::Arc; use std::time::Duration; @@ -43,7 +43,7 @@ pub struct LB { pub struct Context { backend_id: Option, - to_https: bool, + // to_https: bool, sticky_sessions: bool, redirect_to: Option, start_time: Instant, @@ -59,7 +59,7 @@ impl ProxyHttp for LB { fn new_ctx(&self) -> Self::CTX { Context { backend_id: None, - to_https: false, + // to_https: false, sticky_sessions: false, redirect_to: None, start_time: Instant::now(), @@ -70,7 +70,6 @@ impl ProxyHttp for LB { } } async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result { - // let ep = _ctx.extraparams.as_ref(); if let Some(auth) = &_ctx.extraparams.authentication { let authenticated = authenticate(&auth.auth_type, &auth.auth_cred, &session); if !authenticated { @@ -150,29 +149,43 @@ impl ProxyHttp for LB { peer.options.verify_cert = false; peer.options.verify_hostname = false; } + /* + Experimental optionsv + The following TCP optimizations were tested but caused performance degrade under heavy load: + peer.options.tcp_keepalive = Some(TcpKeepalive { + idle: Duration::from_secs(60), + interval: Duration::from_secs(10), + count: 5, + user_timeout: Duration::from_secs(30), + }); - // Experimental optionsv - // The following TCP optimizations were tested but caused performance degrade under heavy load: - // peer.options.tcp_keepalive = Some(TcpKeepalive { - // idle: Duration::from_secs(60), - // interval: Duration::from_secs(10), - // count: 5, - // user_timeout: Duration::from_secs(30), - // }); - // - // peer.options.idle_timeout = Some(Duration::from_secs(300)); - // peer.options.tcp_recv_buf = Some(128 * 1024); - // End of experimental options + peer.options.idle_timeout = Some(Duration::from_secs(300)); + peer.options.tcp_recv_buf = Some(128 * 1024); + End of experimental options + */ + if let Some(redirect_to) = &innermap.redirect_to { + let uri = session.req_header().uri.path(); + let capacity = redirect_to.len() + uri.len(); + let mut s = String::with_capacity(capacity); + s.push_str(redirect_to); + s.push_str(uri); + // ctx.to_https = true; + ctx.redirect_to = Some(s); + } if ctx.extraparams.to_https.unwrap_or(false) || innermap.to_https { if let Some(stream) = session.stream() { if stream.get_ssl().is_none() { if let Some(host) = ctx.hostname.as_ref() { - let uri = session.req_header().uri.path_and_query().map_or("/", |pq| pq.as_str()); - let port = self.config.proxy_port_tls.unwrap_or(443); - ctx.to_https = true; - let mut s = String::with_capacity(64); - write!(&mut s, "https://{}:{}{}", host, port, uri).unwrap_or_default(); + let port = self.config.proxy_port_tls.clone().unwrap_or_else(|| "443".to_string()); + let uri = session.req_header().uri.path(); + let capacity = host.len() + uri.len() + 8; + let mut s = String::with_capacity(capacity); + s.push_str("https://"); + s.push_str(host); + s.push_str(port.as_str()); + s.push_str(uri); + // ctx.to_https = true; ctx.redirect_to = Some(s); } } @@ -181,7 +194,6 @@ impl ProxyHttp for LB { if ctx.extraparams.sticky_sessions { let mut s = String::with_capacity(64); - // write!(&mut s, "{}:{}:{}:{}", hostname, innermap.address, innermap.port, innermap.is_ssl).unwrap(); write!( &mut s, "{}:{}:{}:{}:{}:{}:{}:{:?}", @@ -195,18 +207,6 @@ impl ProxyHttp for LB { innermap.authorization ) .unwrap_or(()); - // println!( - // "{}:{}:{}:{}:{}:{}:{}:{:?}", - // hostname, - // innermap.address, - // innermap.port, - // innermap.is_http2, - // innermap.to_https, - // innermap.rate_limit.unwrap_or_default(), - // innermap.healthcheck.unwrap_or_default(), - // innermap.authorization - // ); - ctx.backend_id = Some(s); ctx.sticky_sessions = true; } @@ -241,18 +241,19 @@ impl ProxyHttp for LB { } async fn upstream_request_filter(&self, session: &mut Session, upstream_request: &mut RequestHeader, ctx: &mut Self::CTX) -> Result<()> { - if let Some(hostname) = ctx.hostname.as_deref() { - upstream_request.insert_header("Host", hostname)?; - } + // if let Some(hostname) = ctx.hostname.as_deref() { + // upstream_request.insert_header("Host", hostname)?; + // } if let Some(client_ip) = session.client_addr() { IP_BUFFER.with(|buffer| { let mut buf = buffer.borrow_mut(); buf.clear(); write!(buf, "{}", client_ip).unwrap_or(()); - upstream_request.append_header("x-forward-for", buf.as_str()).unwrap_or(false); + upstream_request.append_header("X-Forward-For", buf.as_str()).unwrap_or(false); }); } + let hostname = ctx.hostname.as_deref().unwrap_or("localhost"); let path = session.req_header().uri.path(); let GetHostsReturHeaders { server_headers, client_headers } = match self.get_header(hostname, path) { @@ -289,13 +290,20 @@ impl ProxyHttp for LB { } } - if ctx.to_https { + if let Some(_) = &ctx.redirect_to { let mut redirect_response = ResponseHeader::build(StatusCode::MOVED_PERMANENTLY, None)?; redirect_response.insert_header("Location", ctx.redirect_to.clone().unwrap_or(String::from("/")))?; redirect_response.insert_header("Content-Length", "0")?; session.write_response_header(Box::new(redirect_response), false).await?; } + // if ctx.to_https { + // let mut redirect_response = ResponseHeader::build(StatusCode::MOVED_PERMANENTLY, None)?; + // redirect_response.insert_header("Location", ctx.redirect_to.clone().unwrap_or(String::from("/")))?; + // redirect_response.insert_header("Content-Length", "0")?; + // session.write_response_header(Box::new(redirect_response), false).await?; + // } + // ALLOCATIONS ! if let Some(client_headers) = &ctx.client_headers { for (k, v) in client_headers.iter() {