6 Commits

Author SHA1 Message Date
Ara Sadoyan
0fc79c022f perf: optimize header handling and concurrent access patterns 2025-12-10 19:09:04 +01:00
Ara Sadoyan
a43bccdfb8 minor, performance improvements 2025-11-28 13:13:15 +01:00
Ara Sadoyan
5b87391fbb some more type changes, performance improvements 2025-11-27 18:47:04 +01:00
Ara Sadoyan
c68a4ad83d Type changes, performance improvements 2025-11-27 18:03:34 +01:00
Ara Sadoyan
8ba8d32df1 Performance improvements, type changes 2025-11-26 12:12:41 +01:00
Ara Sadoyan
7a839065e6 update on kubernetes web client 2025-11-24 17:57:44 +01:00
8 changed files with 157 additions and 172 deletions

View File

@@ -46,7 +46,7 @@ async fn build_upstreams(fullist: &UpstreamsDashMap, method: &str, client: &Clie
let mut innervec = Vec::new(); let mut innervec = Vec::new();
for (_, upstream) in path_entry.value().0.iter().enumerate() { for (_, upstream) in path_entry.value().0.iter().enumerate() {
let tls = detect_tls(upstream.address.as_str(), &upstream.port, &client).await; let tls = detect_tls(&upstream.address.to_string(), &upstream.port, &client).await;
let is_h2 = matches!(tls.1, Some(Version::HTTP_2)); let is_h2 = matches!(tls.1, Some(Version::HTTP_2));
let link = if tls.0 { let link = if tls.0 {

View File

@@ -22,7 +22,9 @@ pub async fn for_consul(url: String, token: Option<String>, conf: &ServiceMappin
let upstreams: DashMap<String, (Vec<InnerMap>, AtomicUsize)> = DashMap::new(); let upstreams: DashMap<String, (Vec<InnerMap>, AtomicUsize)> = DashMap::new();
let endpoints: Vec<ConsulService> = resp.json().await.ok()?; let endpoints: Vec<ConsulService> = resp.json().await.ok()?;
for subsets in endpoints { for subsets in endpoints {
let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone(); // let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone();
// let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone();
let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone().parse().unwrap();
let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone(); let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone();
let to_add = InnerMap { let to_add = InnerMap {
address: addr, address: addr,
@@ -56,7 +58,7 @@ pub async fn for_kuber(url: &str, token: &str, conf: &ServiceMapping) -> Option<
for addr in addresses { for addr in addresses {
for port in &ports { for port in &ports {
let to_add = InnerMap { let to_add = InnerMap {
address: addr.ip.clone(), address: addr.ip.parse().unwrap(),
port: port.port.clone(), port: port.port.clone(),
is_ssl: false, is_ssl: false,
is_http2: false, is_http2: false,

View File

@@ -11,6 +11,8 @@ use rand::Rng;
use serde::Deserialize; use serde::Deserialize;
use std::collections::HashMap; use std::collections::HashMap;
use std::env; use std::env;
use std::fs;
use std::path::Path;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@@ -106,23 +108,31 @@ impl ServiceDiscovery for KubernetesDiscovery {
let num = if end > 0 { rand::rng().random_range(0..end) } else { 0 }; let num = if end > 0 { rand::rng().random_range(0..end) } else { 0 };
let server = servers.get(num).unwrap().to_string(); let server = servers.get(num).unwrap().to_string();
let path = kuber.tokenpath.unwrap_or("/var/run/secrets/kubernetes.io/serviceaccount/token".to_string()); let path = kuber.tokenpath.unwrap_or("/var/run/secrets/kubernetes.io/serviceaccount/token".to_string());
let namespace = get_current_namespace().unwrap_or_else(|| "default".to_string());
let token = read_token(path.as_str()).await; let token = read_token(path.as_str()).await;
// let mut oldcrt: HashMap<String, String> = HashMap::new();
loop { loop {
// crate::utils::watchksecret::watch_secret("ar-tls", "staging", server.clone(), token.clone(), &mut oldcrt).await;
let upstreams = UpstreamsDashMap::new(); let upstreams = UpstreamsDashMap::new();
if let Some(kuber) = config.kubernetes.clone() { if let Some(kuber) = config.kubernetes.clone() {
if let Some(svc) = kuber.services { if let Some(svc) = kuber.services {
for i in svc { for i in svc {
let header_list = DashMap::new(); let header_list: DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>> = DashMap::new();
let mut hl = Vec::new(); let mut hl = Vec::new();
build_headers(&i.client_headers, config.as_ref(), &mut hl); build_headers(&i.client_headers, config.as_ref(), &mut hl);
if !hl.is_empty() { if !hl.is_empty() {
header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl); match i.path.clone() {
Some(path) => {
header_list.insert(Arc::from(path.as_str()), hl);
}
None => {
header_list.insert(Arc::from("/"), hl);
}
}
// header_list.insert(Arc::from(path.as_str()), hl);
// header_list.insert(Arc::from(i.path).unwrap_or(Arc::from("/")).as_str(), hl);
config.client_headers.insert(i.hostname.clone(), header_list); config.client_headers.insert(i.hostname.clone(), header_list);
} }
let url = format!("https://{}/api/v1/namespaces/staging/endpoints/{}", server, i.hostname); let url = format!("https://{}/api/v1/namespaces/{}/endpoints/{}", server, namespace, i.hostname);
let list = httpclient::for_kuber(&*url, &*token, &i).await; let list = httpclient::for_kuber(&*url, &*token, &i).await;
list_to_upstreams(list, &upstreams, &i); list_to_upstreams(list, &upstreams, &i);
} }
@@ -137,6 +147,16 @@ impl ServiceDiscovery for KubernetesDiscovery {
} }
} }
fn get_current_namespace() -> Option<String> {
let ns_path = "/var/run/secrets/kubernetes.io/serviceaccount/namespace";
if Path::new(ns_path).exists() {
if let Ok(contents) = fs::read_to_string(ns_path) {
return Some(contents.trim().to_string());
}
}
std::env::var("POD_NAMESPACE").ok()
}
#[async_trait] #[async_trait]
impl ServiceDiscovery for ConsulDiscovery { impl ServiceDiscovery for ConsulDiscovery {
async fn fetch_upstreams(&self, config: Arc<Configuration>, mut toreturn: Sender<Configuration>) { async fn fetch_upstreams(&self, config: Arc<Configuration>, mut toreturn: Sender<Configuration>) {
@@ -161,7 +181,15 @@ impl ServiceDiscovery for ConsulDiscovery {
let mut hl = Vec::new(); let mut hl = Vec::new();
build_headers(&i.client_headers, config.as_ref(), &mut hl); build_headers(&i.client_headers, config.as_ref(), &mut hl);
if !hl.is_empty() { if !hl.is_empty() {
header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl); match i.path.clone() {
Some(path) => {
header_list.insert(Arc::from(path.as_str()), hl);
}
None => {
header_list.insert(Arc::from("/"), hl);
}
}
// header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl);
config.client_headers.insert(i.hostname.clone(), header_list); config.client_headers.insert(i.hostname.clone(), header_list);
} }

View File

@@ -6,7 +6,7 @@ use dashmap::DashMap;
use log::{error, info, warn}; use log::{error, info, warn};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
// use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc;
use std::{env, fs}; use std::{env, fs};
// use tokio::sync::oneshot::{Receiver, Sender}; // use tokio::sync::oneshot::{Receiver, Sender};
@@ -67,32 +67,31 @@ pub async fn load_configuration(d: &str, kind: &str) -> Option<Configuration> {
} }
async fn populate_headers_and_auth(config: &mut Configuration, parsed: &Config) { async fn populate_headers_and_auth(config: &mut Configuration, parsed: &Config) {
let mut ch = Vec::new(); let mut ch: Vec<(Arc<str>, Arc<str>)> = Vec::new();
ch.push(("Server".to_string(), "Aralez".to_string())); ch.push((Arc::from("Server"), Arc::from("Aralez")));
// println!("{:?}", &parsed.client_headers); // println!("{:?}", &parsed.client_headers);
if let Some(headers) = &parsed.client_headers { if let Some(headers) = &parsed.client_headers {
for header in headers { for header in headers {
if let Some((key, val)) = header.split_once(':') { if let Some((key, val)) = header.split_once(':') {
println!("{}:{}", key.trim().to_string(), val.trim().to_string()); ch.push((Arc::from(key), Arc::from(val)));
ch.push((key.trim().to_string(), val.trim().to_string()));
} }
} }
} }
let global_headers = DashMap::new(); let global_headers: DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>> = DashMap::new();
global_headers.insert("/".to_string(), ch); global_headers.insert(Arc::from("/"), ch);
config.client_headers.insert("GLOBAL_CLIENT_HEADERS".to_string(), global_headers); config.client_headers.insert("GLOBAL_CLIENT_HEADERS".to_string(), global_headers);
let mut sh = Vec::new(); let mut sh: Vec<(Arc<str>, Arc<str>)> = Vec::new();
sh.push(("X-Proxy-Server".to_string(), "Aralez".to_string())); sh.push((Arc::from("X-Proxy-Server"), Arc::from("Aralez")));
if let Some(headers) = &parsed.server_headers { if let Some(headers) = &parsed.server_headers {
for header in headers { for header in headers {
if let Some((key, val)) = header.split_once(':') { if let Some((key, val)) = header.split_once(':') {
sh.push((key.trim().to_string(), val.trim().to_string())); sh.push((Arc::from(key.trim()), Arc::from(val.trim())));
} }
} }
} }
let server_global_headers = DashMap::new(); let server_global_headers: DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>> = DashMap::new();
server_global_headers.insert("/".to_string(), sh); server_global_headers.insert(Arc::from("/"), sh);
config.server_headers.insert("GLOBAL_SERVER_HEADERS".to_string(), server_global_headers); config.server_headers.insert("GLOBAL_SERVER_HEADERS".to_string(), server_global_headers);
config.extraparams.sticky_sessions = parsed.sticky_sessions; config.extraparams.sticky_sessions = parsed.sticky_sessions;
@@ -124,19 +123,19 @@ async fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) {
info!("Applied Rate Limit for {} : {} request per second", hostname, rate); info!("Applied Rate Limit for {} : {} request per second", hostname, rate);
} }
let mut hl: Vec<(String, String)> = Vec::new(); let mut hl: Vec<(Arc<str>, Arc<str>)> = Vec::new();
let mut sl: Vec<(String, String)> = Vec::new(); let mut sl: Vec<(Arc<str>, Arc<str>)> = Vec::new();
build_headers(&path_config.client_headers, config, &mut hl); build_headers(&path_config.client_headers, config, &mut hl);
build_headers(&path_config.server_headers, config, &mut sl); build_headers(&path_config.server_headers, config, &mut sl);
client_header_list.insert(path.clone(), hl); client_header_list.insert(Arc::from(path.as_str()), hl);
server_header_list.insert(path.clone(), sl); server_header_list.insert(Arc::from(path.as_str()), sl);
let mut server_list = Vec::new(); let mut server_list = Vec::new();
for server in &path_config.servers { for server in &path_config.servers {
if let Some((ip, port_str)) = server.split_once(':') { if let Some((ip, port_str)) = server.split_once(':') {
if let Ok(port) = port_str.parse::<u16>() { if let Ok(port) = port_str.parse::<u16>() {
server_list.push(InnerMap { server_list.push(InnerMap {
address: ip.trim().to_string(), address: ip.trim().parse().unwrap(),
port, port,
is_ssl: true, is_ssl: true,
is_http2: false, is_http2: false,
@@ -238,11 +237,11 @@ fn log_builder(conf: &AppConfig) {
env_logger::builder().init(); env_logger::builder().init();
} }
pub fn build_headers(path_config: &Option<Vec<String>>, _config: &Configuration, hl: &mut Vec<(String, String)>) { pub fn build_headers(path_config: &Option<Vec<String>>, _config: &Configuration, hl: &mut Vec<(Arc<str>, Arc<str>)>) {
if let Some(headers) = &path_config { if let Some(headers) = &path_config {
for header in headers { for header in headers {
if let Some((key, val)) = header.split_once(':') { if let Some((key, val)) = header.split_once(':') {
hl.push((key.trim().to_string(), val.trim().to_string())); hl.push((Arc::from(key.trim()), Arc::from(val.trim())));
} }
} }
// if let Some(push) = config.client_headers.get("GLOBAL_HEADERS") { // if let Some(push) = config.client_headers.get("GLOBAL_HEADERS") {

View File

@@ -2,11 +2,12 @@ use dashmap::DashMap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
pub type UpstreamsDashMap = DashMap<String, DashMap<String, (Vec<InnerMap>, AtomicUsize)>>; pub type UpstreamsDashMap = DashMap<String, DashMap<String, (Vec<InnerMap>, AtomicUsize)>>;
use std::net::IpAddr;
use std::sync::Arc;
pub type UpstreamsIdMap = DashMap<String, InnerMap>; pub type UpstreamsIdMap = DashMap<String, InnerMap>;
pub type Headers = DashMap<String, DashMap<String, Vec<(String, String)>>>; pub type Headers = DashMap<String, DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>>>;
#[derive(Debug, Default, Clone, Serialize, Deserialize)] #[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct ServiceMapping { pub struct ServiceMapping {
@@ -116,7 +117,7 @@ pub struct AppConfig {
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct InnerMap { pub struct InnerMap {
pub address: String, pub address: IpAddr,
pub port: u16, pub port: u16,
pub is_ssl: bool, pub is_ssl: bool,
pub is_http2: bool, pub is_http2: bool,
@@ -129,7 +130,7 @@ pub struct InnerMap {
impl InnerMap { impl InnerMap {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
address: Default::default(), address: "127.0.0.1".parse().unwrap(),
port: Default::default(), port: Default::default(),
is_ssl: Default::default(), is_ssl: Default::default(),
is_http2: Default::default(), is_http2: Default::default(),

View File

@@ -15,6 +15,7 @@ use std::os::unix::fs::MetadataExt;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Sender}; use std::sync::mpsc::{channel, Sender};
use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::{fs, process, thread, time}; use std::{fs, process, thread, time};
@@ -132,7 +133,7 @@ pub fn compare_dashmaps(map1: &UpstreamsDashMap, map2: &UpstreamsDashMap) -> boo
true true
} }
pub fn merge_headers(target: &DashMap<String, Vec<(String, String)>>, source: &DashMap<String, Vec<(String, String)>>) { pub fn merge_headers(target: &DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>>, source: &DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>>) {
for entry in source.iter() { for entry in source.iter() {
let global_key = entry.key().clone(); let global_key = entry.key().clone();
let global_values = entry.value().clone(); let global_values = entry.value().clone();
@@ -159,7 +160,7 @@ pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) {
let hex_hash = base16ct::lower::encode_string(&hash); let hex_hash = base16ct::lower::encode_string(&hash);
let hh = hex_hash[0..50].to_string(); let hh = hex_hash[0..50].to_string();
let to_add = InnerMap { let to_add = InnerMap {
address: hh.clone(), address: "127.0.0.1".parse().unwrap(),
port: 0, port: 0,
is_ssl: false, is_ssl: false,
is_http2: false, is_http2: false,

View File

@@ -2,16 +2,20 @@ use crate::utils::structs::InnerMap;
use crate::web::proxyhttp::LB; use crate::web::proxyhttp::LB;
use async_trait::async_trait; use async_trait::async_trait;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct GetHostsReturHeaders { pub struct GetHostsReturHeaders {
pub client_headers: Option<Vec<(String, String)>>, pub client_headers: Option<Vec<(Arc<str>, Arc<str>)>>,
pub server_headers: Option<Vec<(String, String)>>, pub server_headers: Option<Vec<(Arc<str>, Arc<str>)>>,
} }
#[async_trait] #[async_trait]
pub trait GetHost { pub trait GetHost {
// fn get_host<'a>(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<&'a InnerMap>;
fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<InnerMap>; fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<InnerMap>;
fn get_header(&self, peer: &str, path: &str) -> Option<GetHostsReturHeaders>; fn get_header(&self, peer: &str, path: &str) -> Option<GetHostsReturHeaders>;
} }
#[async_trait] #[async_trait]
@@ -22,46 +26,42 @@ impl GetHost for LB {
return Some(bb.value().clone()); return Some(bb.value().clone());
} }
} }
let host_entry = self.ump_upst.get(peer)?; let host_entry = self.ump_upst.get(peer)?;
let mut current_path = path.to_string(); let mut end = path.len();
let mut best_match: Option<InnerMap> = None;
loop { loop {
if let Some(entry) = host_entry.get(&current_path) { let slice = &path[..end];
if let Some(entry) = host_entry.get(slice) {
let (servers, index) = entry.value(); let (servers, index) = entry.value();
if !servers.is_empty() { if !servers.is_empty() {
let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len(); let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len();
best_match = Some(servers[idx].clone()); return Some(servers[idx].clone());
break;
} }
} }
if let Some(pos) = current_path.rfind('/') { if let Some(pos) = slice.rfind('/') {
current_path.truncate(pos); end = pos;
} else { } else {
break; break;
} }
} }
if best_match.is_none() { if let Some(entry) = host_entry.get("/") {
if let Some(entry) = host_entry.get("/") { let (servers, index) = entry.value();
let (servers, index) = entry.value(); if !servers.is_empty() {
if !servers.is_empty() { let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len();
let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len(); return Some(servers[idx].clone());
best_match = Some(servers[idx].clone());
}
} }
} }
best_match None
} }
fn get_header(&self, peer: &str, path: &str) -> Option<GetHostsReturHeaders> { fn get_header(&self, peer: &str, path: &str) -> Option<GetHostsReturHeaders> {
let client_entry = self.client_headers.get(peer)?; let client_entry = self.client_headers.get(peer)?;
let server_entry = self.server_headers.get(peer)?; let server_entry = self.server_headers.get(peer)?;
let mut current_path = path; let mut current_path = path;
let mut best_match = None; let mut clnt_match = None;
loop { loop {
if let Some(entry) = client_entry.get(current_path) { if let Some(entry) = client_entry.get(current_path) {
if !entry.value().is_empty() { if !entry.value().is_empty() {
best_match = Some(entry.value().clone()); clnt_match = Some(entry.value().clone());
break; break;
} }
} }
@@ -85,19 +85,18 @@ impl GetHost for LB {
} else { } else {
break; break;
} }
if best_match.is_none() { if serv_match.is_none() {
if let Some(entry) = server_entry.get("/") { if let Some(entry) = server_entry.get("/") {
if !entry.value().is_empty() { if !entry.value().is_empty() {
best_match = Some(entry.value().clone()); serv_match = Some(entry.value().clone());
break; break;
} }
} }
} }
} }
let result = GetHostsReturHeaders { Some(GetHostsReturHeaders {
client_headers: best_match, client_headers: clnt_match,
server_headers: serv_match, server_headers: serv_match,
}; })
Some(result)
} }
} }

View File

@@ -14,6 +14,7 @@ use pingora_core::listeners::ALPN;
use pingora_core::prelude::HttpPeer; use pingora_core::prelude::HttpPeer;
use pingora_limits::rate::Rate; use pingora_limits::rate::Rate;
use pingora_proxy::{ProxyHttp, Session}; use pingora_proxy::{ProxyHttp, Session};
// use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::time::Instant; use tokio::time::Instant;
@@ -32,13 +33,15 @@ pub struct LB {
} }
pub struct Context { pub struct Context {
backend_id: String, backend_id: Arc<str>,
// backend_id: Arc<(IpAddr, u16, bool)>,
to_https: bool, to_https: bool,
redirect_to: String, redirect_to: Arc<str>,
start_time: Instant, start_time: Instant,
hostname: Option<String>, hostname: Option<Arc<str>>,
upstream_peer: Option<InnerMap>, upstream_peer: Option<InnerMap>,
extraparams: arc_swap::Guard<Arc<Extraparams>>, extraparams: arc_swap::Guard<Arc<Extraparams>>,
client_headers: Arc<Vec<(Arc<str>, Arc<str>)>>,
} }
#[async_trait] #[async_trait]
@@ -46,13 +49,15 @@ impl ProxyHttp for LB {
type CTX = Context; type CTX = Context;
fn new_ctx(&self) -> Self::CTX { fn new_ctx(&self) -> Self::CTX {
Context { Context {
backend_id: String::new(), backend_id: Arc::from(""),
// backend_id: Arc::new((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false)),
to_https: false, to_https: false,
redirect_to: String::new(), redirect_to: Arc::from(""),
start_time: Instant::now(), start_time: Instant::now(),
hostname: None, hostname: None,
upstream_peer: None, upstream_peer: None,
extraparams: self.extraparams.load(), extraparams: self.extraparams.load(),
client_headers: Arc::new(Vec::new()),
} }
} }
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> { async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
@@ -90,7 +95,7 @@ impl ProxyHttp for LB {
None => return Ok(false), None => return Ok(false),
Some(host) => { Some(host) => {
// let optioninnermap = self.get_host(host.as_str(), host.as_str(), backend_id); // let optioninnermap = self.get_host(host.as_str(), host.as_str(), backend_id);
let optioninnermap = self.get_host(host.as_str(), session.req_header().uri.path(), backend_id); let optioninnermap = self.get_host(host, session.req_header().uri.path(), backend_id);
match optioninnermap { match optioninnermap {
None => return Ok(false), None => return Ok(false),
Some(ref innermap) => { Some(ref innermap) => {
@@ -117,54 +122,48 @@ impl ProxyHttp for LB {
Ok(false) Ok(false)
} }
async fn upstream_peer(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> { async fn upstream_peer(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> {
// let host_name = return_header_host(&session);
match ctx.hostname.as_ref() { match ctx.hostname.as_ref() {
Some(hostname) => { Some(hostname) => match ctx.upstream_peer.as_ref() {
match ctx.upstream_peer.as_ref() { Some(innermap) => {
// Some((address, port, ssl, is_h2, to_https)) => { let mut peer = Box::new(HttpPeer::new((innermap.address.clone(), innermap.port.clone()), innermap.is_ssl, String::new()));
Some(innermap) => { if innermap.is_http2 {
let mut peer = Box::new(HttpPeer::new((innermap.address.clone(), innermap.port.clone()), innermap.is_ssl, String::new())); peer.options.alpn = ALPN::H2;
// if session.is_http2() { }
if innermap.is_http2 { if innermap.is_ssl {
peer.options.alpn = ALPN::H2; peer.sni = hostname.to_string();
} peer.options.verify_cert = false;
if innermap.is_ssl { peer.options.verify_hostname = false;
peer.sni = hostname.clone(); }
peer.options.verify_cert = false; if ctx.to_https || innermap.to_https {
peer.options.verify_hostname = false; if let Some(stream) = session.stream() {
} if stream.get_ssl().is_none() {
if ctx.to_https || innermap.to_https { if let Some(addr) = session.server_addr() {
if let Some(stream) = session.stream() { if let Some((host, _)) = addr.to_string().split_once(':') {
if stream.get_ssl().is_none() { let uri = session.req_header().uri.path_and_query().map_or("/", |pq| pq.as_str());
if let Some(addr) = session.server_addr() { let port = self.config.proxy_port_tls.unwrap_or(403);
if let Some((host, _)) = addr.to_string().split_once(':') { ctx.to_https = true;
let uri = session.req_header().uri.path_and_query().map_or("/", |pq| pq.as_str()); ctx.redirect_to = Arc::from(format!("https://{}:{}{}", host, port, uri));
let port = self.config.proxy_port_tls.unwrap_or(403);
ctx.to_https = true;
ctx.redirect_to = format!("https://{}:{}{}", host, port, uri);
}
} }
} }
} }
} }
ctx.backend_id = format!("{}:{}:{}", innermap.address.clone(), innermap.port.clone(), innermap.is_ssl);
Ok(peer)
}
None => {
if let Err(e) = session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await {
error!("Failed to send error response: {:?}", e);
}
Err(Box::new(Error {
etype: HTTPStatus(502),
esource: Upstream,
retry: RetryType::Decided(false),
cause: None,
context: Option::from(ImmutStr::Static("Upstream not found")),
}))
} }
ctx.backend_id = Arc::from(format!("{}:{}:{}", innermap.address, innermap.port, innermap.is_ssl));
Ok(peer)
} }
} None => {
if let Err(e) = session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await {
error!("Failed to send error response: {:?}", e);
}
Err(Box::new(Error {
etype: HTTPStatus(502),
esource: Upstream,
retry: RetryType::Decided(false),
cause: None,
context: Option::from(ImmutStr::Static("Upstream not found")),
}))
}
},
None => { None => {
// session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await.expect("Failed to send error"); // session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await.expect("Failed to send error");
if let Err(e) = session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await { if let Err(e) = session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await {
@@ -183,89 +182,45 @@ impl ProxyHttp for LB {
async fn upstream_request_filter(&self, session: &mut Session, upstream_request: &mut RequestHeader, ctx: &mut Self::CTX) -> Result<()> { async fn upstream_request_filter(&self, session: &mut Session, upstream_request: &mut RequestHeader, ctx: &mut Self::CTX) -> Result<()> {
if let Some(hostname) = ctx.hostname.as_ref() { if let Some(hostname) = ctx.hostname.as_ref() {
upstream_request.insert_header("Host", hostname)?; upstream_request.insert_header("Host", hostname.as_ref())?;
} }
if let Some(peer) = ctx.upstream_peer.as_ref() { if let Some(peer) = ctx.upstream_peer.as_ref() {
upstream_request.insert_header("X-Forwarded-For", peer.address.as_str())?; upstream_request.insert_header("X-Forwarded-For", peer.address.to_string())?;
} }
if let Some(headers) = self.get_header(ctx.hostname.as_ref().unwrap_or(&"localhost".to_string()), session.req_header().uri.path()) { if let Some(headers) = self.get_header(ctx.hostname.as_ref().unwrap_or(&Arc::from("localhost")), session.req_header().uri.path()) {
if let Some(client_headers) = headers.server_headers { if let Some(server_headers) = headers.server_headers {
for k in client_headers { for k in server_headers {
upstream_request.insert_header(k.0, k.1)?; upstream_request.insert_header(k.0.to_string(), k.1.as_ref())?;
} }
} }
if let Some(client_headers) = headers.client_headers {
let converted: Vec<(Arc<str>, Arc<str>)> = client_headers.into_iter().map(|(k, v)| (Arc::<str>::from(k), Arc::<str>::from(v))).collect();
ctx.client_headers = Arc::new(converted);
}
} }
Ok(()) Ok(())
} }
// async fn request_body_filter(&self, _session: &mut Session, _body: &mut Option<Bytes>, _end_of_stream: bool, _ctx: &mut Self::CTX) -> Result<()>
// where
// Self::CTX: Send + Sync,
// {
// Ok(())
// }
async fn response_filter(&self, session: &mut Session, _upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX) -> Result<()> { async fn response_filter(&self, session: &mut Session, _upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX) -> Result<()> {
// _upstream_response.insert_header("X-Proxied-From", "Fooooooooooooooo").unwrap();
if ctx.extraparams.sticky_sessions { if ctx.extraparams.sticky_sessions {
let backend_id = ctx.backend_id.clone(); let backend_id = ctx.backend_id.clone();
if let Some(bid) = self.ump_byid.get(&backend_id) { if let Some(bid) = self.ump_byid.get(backend_id.as_ref()) {
let _ = _upstream_response.insert_header("set-cookie", format!("backend_id={}; Path=/; Max-Age=600; HttpOnly; SameSite=Lax", bid.address)); let _ = _upstream_response.insert_header("set-cookie", format!("backend_id={}; Path=/; Max-Age=600; HttpOnly; SameSite=Lax", bid.address));
} }
} }
if ctx.to_https { if ctx.to_https {
let mut redirect_response = ResponseHeader::build(StatusCode::MOVED_PERMANENTLY, None)?; let mut redirect_response = ResponseHeader::build(StatusCode::MOVED_PERMANENTLY, None)?;
redirect_response.insert_header("Location", ctx.redirect_to.clone())?; redirect_response.insert_header("Location", ctx.redirect_to.as_ref())?;
redirect_response.insert_header("Content-Length", "0")?; redirect_response.insert_header("Content-Length", "0")?;
session.write_response_header(Box::new(redirect_response), false).await?; session.write_response_header(Box::new(redirect_response), false).await?;
} }
match ctx.hostname.as_ref() {
Some(host) => {
let path = session.req_header().uri.path();
let split_header = host.split_once(':');
match split_header {
Some((host, _port)) => {
if let Some(headers) = self.get_header(host, path) {
if let Some(server_headers) = headers.client_headers {
for k in server_headers {
_upstream_response.insert_header(k.0, k.1).unwrap();
}
}
}
}
None => {
if let Some(headers) = self.get_header(host, path) {
if let Some(server_headers) = headers.client_headers {
for k in server_headers {
_upstream_response.insert_header(k.0, k.1).unwrap();
}
}
}
}
}
// match split_header { for (key, value) in ctx.client_headers.iter() {
// Some(sh) => { _upstream_response.insert_header(key.to_string(), value.as_ref()).unwrap();
// let client_header = self.get_header(sh.0, path);
// for k in client_header.iter() {
// for t in k.iter() {
// _upstream_response.insert_header(t.0.clone(), t.1.clone()).unwrap();
// }
// }
// }
// None => {
// let client_header = self.get_header(host_header, path);
// for k in client_header.iter() {
// for t in k.iter() {
// _upstream_response.insert_header(t.0.clone(), t.1.clone()).unwrap();
// }
// }
// }
// }
}
None => {}
} }
session.set_keepalive(Some(300)); session.set_keepalive(Some(300));
Ok(()) Ok(())
} }
@@ -283,17 +238,17 @@ impl ProxyHttp for LB {
} }
} }
fn return_header_host(session: &Session) -> Option<String> { fn return_header_host(session: &Session) -> Option<Arc<str>> {
if session.is_http2() { if session.is_http2() {
match session.req_header().uri.host() { match session.req_header().uri.host() {
Some(host) => Option::from(host.to_string()), Some(host) => Option::from(Arc::from(host)),
None => None, None => None,
} }
} else { } else {
match session.req_header().headers.get("host") { match session.req_header().headers.get("host") {
Some(host) => { Some(host) => {
let header_host = host.to_str().unwrap().splitn(2, ':').collect::<Vec<&str>>(); let header_host: &str = host.to_str().unwrap().split_once(':').map_or(host.to_str().unwrap(), |(h, _)| h);
Option::from(header_host[0].to_string()) Option::from(Arc::<str>::from(header_host))
} }
None => None, None => None,
} }