6 Commits

Author SHA1 Message Date
Ara Sadoyan
0fc79c022f perf: optimize header handling and concurrent access patterns 2025-12-10 19:09:04 +01:00
Ara Sadoyan
a43bccdfb8 minor, performance improvements 2025-11-28 13:13:15 +01:00
Ara Sadoyan
5b87391fbb some more type changes, performance improvements 2025-11-27 18:47:04 +01:00
Ara Sadoyan
c68a4ad83d Type changes, performance improvements 2025-11-27 18:03:34 +01:00
Ara Sadoyan
8ba8d32df1 Performance improvements, type changes 2025-11-26 12:12:41 +01:00
Ara Sadoyan
7a839065e6 update on kubernetes web client 2025-11-24 17:57:44 +01:00
8 changed files with 157 additions and 172 deletions

View File

@@ -46,7 +46,7 @@ async fn build_upstreams(fullist: &UpstreamsDashMap, method: &str, client: &Clie
let mut innervec = Vec::new();
for (_, upstream) in path_entry.value().0.iter().enumerate() {
let tls = detect_tls(upstream.address.as_str(), &upstream.port, &client).await;
let tls = detect_tls(&upstream.address.to_string(), &upstream.port, &client).await;
let is_h2 = matches!(tls.1, Some(Version::HTTP_2));
let link = if tls.0 {

View File

@@ -22,7 +22,9 @@ pub async fn for_consul(url: String, token: Option<String>, conf: &ServiceMappin
let upstreams: DashMap<String, (Vec<InnerMap>, AtomicUsize)> = DashMap::new();
let endpoints: Vec<ConsulService> = resp.json().await.ok()?;
for subsets in endpoints {
let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone();
// let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone();
// let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone();
let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone().parse().unwrap();
let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone();
let to_add = InnerMap {
address: addr,
@@ -56,7 +58,7 @@ pub async fn for_kuber(url: &str, token: &str, conf: &ServiceMapping) -> Option<
for addr in addresses {
for port in &ports {
let to_add = InnerMap {
address: addr.ip.clone(),
address: addr.ip.parse().unwrap(),
port: port.port.clone(),
is_ssl: false,
is_http2: false,

View File

@@ -11,6 +11,8 @@ use rand::Rng;
use serde::Deserialize;
use std::collections::HashMap;
use std::env;
use std::fs;
use std::path::Path;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
@@ -106,23 +108,31 @@ impl ServiceDiscovery for KubernetesDiscovery {
let num = if end > 0 { rand::rng().random_range(0..end) } else { 0 };
let server = servers.get(num).unwrap().to_string();
let path = kuber.tokenpath.unwrap_or("/var/run/secrets/kubernetes.io/serviceaccount/token".to_string());
let namespace = get_current_namespace().unwrap_or_else(|| "default".to_string());
let token = read_token(path.as_str()).await;
// let mut oldcrt: HashMap<String, String> = HashMap::new();
loop {
// crate::utils::watchksecret::watch_secret("ar-tls", "staging", server.clone(), token.clone(), &mut oldcrt).await;
let upstreams = UpstreamsDashMap::new();
if let Some(kuber) = config.kubernetes.clone() {
if let Some(svc) = kuber.services {
for i in svc {
let header_list = DashMap::new();
let header_list: DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>> = DashMap::new();
let mut hl = Vec::new();
build_headers(&i.client_headers, config.as_ref(), &mut hl);
if !hl.is_empty() {
header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl);
match i.path.clone() {
Some(path) => {
header_list.insert(Arc::from(path.as_str()), hl);
}
None => {
header_list.insert(Arc::from("/"), hl);
}
}
// header_list.insert(Arc::from(path.as_str()), hl);
// header_list.insert(Arc::from(i.path).unwrap_or(Arc::from("/")).as_str(), hl);
config.client_headers.insert(i.hostname.clone(), header_list);
}
let url = format!("https://{}/api/v1/namespaces/staging/endpoints/{}", server, i.hostname);
let url = format!("https://{}/api/v1/namespaces/{}/endpoints/{}", server, namespace, i.hostname);
let list = httpclient::for_kuber(&*url, &*token, &i).await;
list_to_upstreams(list, &upstreams, &i);
}
@@ -137,6 +147,16 @@ impl ServiceDiscovery for KubernetesDiscovery {
}
}
fn get_current_namespace() -> Option<String> {
let ns_path = "/var/run/secrets/kubernetes.io/serviceaccount/namespace";
if Path::new(ns_path).exists() {
if let Ok(contents) = fs::read_to_string(ns_path) {
return Some(contents.trim().to_string());
}
}
std::env::var("POD_NAMESPACE").ok()
}
#[async_trait]
impl ServiceDiscovery for ConsulDiscovery {
async fn fetch_upstreams(&self, config: Arc<Configuration>, mut toreturn: Sender<Configuration>) {
@@ -161,7 +181,15 @@ impl ServiceDiscovery for ConsulDiscovery {
let mut hl = Vec::new();
build_headers(&i.client_headers, config.as_ref(), &mut hl);
if !hl.is_empty() {
header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl);
match i.path.clone() {
Some(path) => {
header_list.insert(Arc::from(path.as_str()), hl);
}
None => {
header_list.insert(Arc::from("/"), hl);
}
}
// header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl);
config.client_headers.insert(i.hostname.clone(), header_list);
}

View File

@@ -6,7 +6,7 @@ use dashmap::DashMap;
use log::{error, info, warn};
use std::collections::HashMap;
use std::sync::atomic::AtomicUsize;
// use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::{env, fs};
// use tokio::sync::oneshot::{Receiver, Sender};
@@ -67,32 +67,31 @@ pub async fn load_configuration(d: &str, kind: &str) -> Option<Configuration> {
}
async fn populate_headers_and_auth(config: &mut Configuration, parsed: &Config) {
let mut ch = Vec::new();
ch.push(("Server".to_string(), "Aralez".to_string()));
let mut ch: Vec<(Arc<str>, Arc<str>)> = Vec::new();
ch.push((Arc::from("Server"), Arc::from("Aralez")));
// println!("{:?}", &parsed.client_headers);
if let Some(headers) = &parsed.client_headers {
for header in headers {
if let Some((key, val)) = header.split_once(':') {
println!("{}:{}", key.trim().to_string(), val.trim().to_string());
ch.push((key.trim().to_string(), val.trim().to_string()));
ch.push((Arc::from(key), Arc::from(val)));
}
}
}
let global_headers = DashMap::new();
global_headers.insert("/".to_string(), ch);
let global_headers: DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>> = DashMap::new();
global_headers.insert(Arc::from("/"), ch);
config.client_headers.insert("GLOBAL_CLIENT_HEADERS".to_string(), global_headers);
let mut sh = Vec::new();
sh.push(("X-Proxy-Server".to_string(), "Aralez".to_string()));
let mut sh: Vec<(Arc<str>, Arc<str>)> = Vec::new();
sh.push((Arc::from("X-Proxy-Server"), Arc::from("Aralez")));
if let Some(headers) = &parsed.server_headers {
for header in headers {
if let Some((key, val)) = header.split_once(':') {
sh.push((key.trim().to_string(), val.trim().to_string()));
sh.push((Arc::from(key.trim()), Arc::from(val.trim())));
}
}
}
let server_global_headers = DashMap::new();
server_global_headers.insert("/".to_string(), sh);
let server_global_headers: DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>> = DashMap::new();
server_global_headers.insert(Arc::from("/"), sh);
config.server_headers.insert("GLOBAL_SERVER_HEADERS".to_string(), server_global_headers);
config.extraparams.sticky_sessions = parsed.sticky_sessions;
@@ -124,19 +123,19 @@ async fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) {
info!("Applied Rate Limit for {} : {} request per second", hostname, rate);
}
let mut hl: Vec<(String, String)> = Vec::new();
let mut sl: Vec<(String, String)> = Vec::new();
let mut hl: Vec<(Arc<str>, Arc<str>)> = Vec::new();
let mut sl: Vec<(Arc<str>, Arc<str>)> = Vec::new();
build_headers(&path_config.client_headers, config, &mut hl);
build_headers(&path_config.server_headers, config, &mut sl);
client_header_list.insert(path.clone(), hl);
server_header_list.insert(path.clone(), sl);
client_header_list.insert(Arc::from(path.as_str()), hl);
server_header_list.insert(Arc::from(path.as_str()), sl);
let mut server_list = Vec::new();
for server in &path_config.servers {
if let Some((ip, port_str)) = server.split_once(':') {
if let Ok(port) = port_str.parse::<u16>() {
server_list.push(InnerMap {
address: ip.trim().to_string(),
address: ip.trim().parse().unwrap(),
port,
is_ssl: true,
is_http2: false,
@@ -238,11 +237,11 @@ fn log_builder(conf: &AppConfig) {
env_logger::builder().init();
}
pub fn build_headers(path_config: &Option<Vec<String>>, _config: &Configuration, hl: &mut Vec<(String, String)>) {
pub fn build_headers(path_config: &Option<Vec<String>>, _config: &Configuration, hl: &mut Vec<(Arc<str>, Arc<str>)>) {
if let Some(headers) = &path_config {
for header in headers {
if let Some((key, val)) = header.split_once(':') {
hl.push((key.trim().to_string(), val.trim().to_string()));
hl.push((Arc::from(key.trim()), Arc::from(val.trim())));
}
}
// if let Some(push) = config.client_headers.get("GLOBAL_HEADERS") {

View File

@@ -2,11 +2,12 @@ use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::AtomicUsize;
pub type UpstreamsDashMap = DashMap<String, DashMap<String, (Vec<InnerMap>, AtomicUsize)>>;
use std::net::IpAddr;
use std::sync::Arc;
pub type UpstreamsIdMap = DashMap<String, InnerMap>;
pub type Headers = DashMap<String, DashMap<String, Vec<(String, String)>>>;
pub type Headers = DashMap<String, DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>>>;
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct ServiceMapping {
@@ -116,7 +117,7 @@ pub struct AppConfig {
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct InnerMap {
pub address: String,
pub address: IpAddr,
pub port: u16,
pub is_ssl: bool,
pub is_http2: bool,
@@ -129,7 +130,7 @@ pub struct InnerMap {
impl InnerMap {
pub fn new() -> Self {
Self {
address: Default::default(),
address: "127.0.0.1".parse().unwrap(),
port: Default::default(),
is_ssl: Default::default(),
is_http2: Default::default(),

View File

@@ -15,6 +15,7 @@ use std::os::unix::fs::MetadataExt;
use std::str::FromStr;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Sender};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{fs, process, thread, time};
@@ -132,7 +133,7 @@ pub fn compare_dashmaps(map1: &UpstreamsDashMap, map2: &UpstreamsDashMap) -> boo
true
}
pub fn merge_headers(target: &DashMap<String, Vec<(String, String)>>, source: &DashMap<String, Vec<(String, String)>>) {
pub fn merge_headers(target: &DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>>, source: &DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>>) {
for entry in source.iter() {
let global_key = entry.key().clone();
let global_values = entry.value().clone();
@@ -159,7 +160,7 @@ pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) {
let hex_hash = base16ct::lower::encode_string(&hash);
let hh = hex_hash[0..50].to_string();
let to_add = InnerMap {
address: hh.clone(),
address: "127.0.0.1".parse().unwrap(),
port: 0,
is_ssl: false,
is_http2: false,

View File

@@ -2,16 +2,20 @@ use crate::utils::structs::InnerMap;
use crate::web::proxyhttp::LB;
use async_trait::async_trait;
use std::sync::atomic::Ordering;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct GetHostsReturHeaders {
pub client_headers: Option<Vec<(String, String)>>,
pub server_headers: Option<Vec<(String, String)>>,
pub client_headers: Option<Vec<(Arc<str>, Arc<str>)>>,
pub server_headers: Option<Vec<(Arc<str>, Arc<str>)>>,
}
#[async_trait]
pub trait GetHost {
// fn get_host<'a>(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<&'a InnerMap>;
fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<InnerMap>;
fn get_header(&self, peer: &str, path: &str) -> Option<GetHostsReturHeaders>;
}
#[async_trait]
@@ -22,46 +26,42 @@ impl GetHost for LB {
return Some(bb.value().clone());
}
}
let host_entry = self.ump_upst.get(peer)?;
let mut current_path = path.to_string();
let mut best_match: Option<InnerMap> = None;
let mut end = path.len();
loop {
if let Some(entry) = host_entry.get(&current_path) {
let slice = &path[..end];
if let Some(entry) = host_entry.get(slice) {
let (servers, index) = entry.value();
if !servers.is_empty() {
let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len();
best_match = Some(servers[idx].clone());
break;
return Some(servers[idx].clone());
}
}
if let Some(pos) = current_path.rfind('/') {
current_path.truncate(pos);
if let Some(pos) = slice.rfind('/') {
end = pos;
} else {
break;
}
}
if best_match.is_none() {
if let Some(entry) = host_entry.get("/") {
let (servers, index) = entry.value();
if !servers.is_empty() {
let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len();
best_match = Some(servers[idx].clone());
}
if let Some(entry) = host_entry.get("/") {
let (servers, index) = entry.value();
if !servers.is_empty() {
let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len();
return Some(servers[idx].clone());
}
}
best_match
None
}
fn get_header(&self, peer: &str, path: &str) -> Option<GetHostsReturHeaders> {
let client_entry = self.client_headers.get(peer)?;
let server_entry = self.server_headers.get(peer)?;
let mut current_path = path;
let mut best_match = None;
let mut clnt_match = None;
loop {
if let Some(entry) = client_entry.get(current_path) {
if !entry.value().is_empty() {
best_match = Some(entry.value().clone());
clnt_match = Some(entry.value().clone());
break;
}
}
@@ -85,19 +85,18 @@ impl GetHost for LB {
} else {
break;
}
if best_match.is_none() {
if serv_match.is_none() {
if let Some(entry) = server_entry.get("/") {
if !entry.value().is_empty() {
best_match = Some(entry.value().clone());
serv_match = Some(entry.value().clone());
break;
}
}
}
}
let result = GetHostsReturHeaders {
client_headers: best_match,
Some(GetHostsReturHeaders {
client_headers: clnt_match,
server_headers: serv_match,
};
Some(result)
})
}
}

View File

@@ -14,6 +14,7 @@ use pingora_core::listeners::ALPN;
use pingora_core::prelude::HttpPeer;
use pingora_limits::rate::Rate;
use pingora_proxy::{ProxyHttp, Session};
// use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
@@ -32,13 +33,15 @@ pub struct LB {
}
pub struct Context {
backend_id: String,
backend_id: Arc<str>,
// backend_id: Arc<(IpAddr, u16, bool)>,
to_https: bool,
redirect_to: String,
redirect_to: Arc<str>,
start_time: Instant,
hostname: Option<String>,
hostname: Option<Arc<str>>,
upstream_peer: Option<InnerMap>,
extraparams: arc_swap::Guard<Arc<Extraparams>>,
client_headers: Arc<Vec<(Arc<str>, Arc<str>)>>,
}
#[async_trait]
@@ -46,13 +49,15 @@ impl ProxyHttp for LB {
type CTX = Context;
fn new_ctx(&self) -> Self::CTX {
Context {
backend_id: String::new(),
backend_id: Arc::from(""),
// backend_id: Arc::new((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false)),
to_https: false,
redirect_to: String::new(),
redirect_to: Arc::from(""),
start_time: Instant::now(),
hostname: None,
upstream_peer: None,
extraparams: self.extraparams.load(),
client_headers: Arc::new(Vec::new()),
}
}
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
@@ -90,7 +95,7 @@ impl ProxyHttp for LB {
None => return Ok(false),
Some(host) => {
// let optioninnermap = self.get_host(host.as_str(), host.as_str(), backend_id);
let optioninnermap = self.get_host(host.as_str(), session.req_header().uri.path(), backend_id);
let optioninnermap = self.get_host(host, session.req_header().uri.path(), backend_id);
match optioninnermap {
None => return Ok(false),
Some(ref innermap) => {
@@ -117,54 +122,48 @@ impl ProxyHttp for LB {
Ok(false)
}
async fn upstream_peer(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> {
// let host_name = return_header_host(&session);
match ctx.hostname.as_ref() {
Some(hostname) => {
match ctx.upstream_peer.as_ref() {
// Some((address, port, ssl, is_h2, to_https)) => {
Some(innermap) => {
let mut peer = Box::new(HttpPeer::new((innermap.address.clone(), innermap.port.clone()), innermap.is_ssl, String::new()));
// if session.is_http2() {
if innermap.is_http2 {
peer.options.alpn = ALPN::H2;
}
if innermap.is_ssl {
peer.sni = hostname.clone();
peer.options.verify_cert = false;
peer.options.verify_hostname = false;
}
if ctx.to_https || innermap.to_https {
if let Some(stream) = session.stream() {
if stream.get_ssl().is_none() {
if let Some(addr) = session.server_addr() {
if let Some((host, _)) = addr.to_string().split_once(':') {
let uri = session.req_header().uri.path_and_query().map_or("/", |pq| pq.as_str());
let port = self.config.proxy_port_tls.unwrap_or(403);
ctx.to_https = true;
ctx.redirect_to = format!("https://{}:{}{}", host, port, uri);
}
Some(hostname) => match ctx.upstream_peer.as_ref() {
Some(innermap) => {
let mut peer = Box::new(HttpPeer::new((innermap.address.clone(), innermap.port.clone()), innermap.is_ssl, String::new()));
if innermap.is_http2 {
peer.options.alpn = ALPN::H2;
}
if innermap.is_ssl {
peer.sni = hostname.to_string();
peer.options.verify_cert = false;
peer.options.verify_hostname = false;
}
if ctx.to_https || innermap.to_https {
if let Some(stream) = session.stream() {
if stream.get_ssl().is_none() {
if let Some(addr) = session.server_addr() {
if let Some((host, _)) = addr.to_string().split_once(':') {
let uri = session.req_header().uri.path_and_query().map_or("/", |pq| pq.as_str());
let port = self.config.proxy_port_tls.unwrap_or(403);
ctx.to_https = true;
ctx.redirect_to = Arc::from(format!("https://{}:{}{}", host, port, uri));
}
}
}
}
ctx.backend_id = format!("{}:{}:{}", innermap.address.clone(), innermap.port.clone(), innermap.is_ssl);
Ok(peer)
}
None => {
if let Err(e) = session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await {
error!("Failed to send error response: {:?}", e);
}
Err(Box::new(Error {
etype: HTTPStatus(502),
esource: Upstream,
retry: RetryType::Decided(false),
cause: None,
context: Option::from(ImmutStr::Static("Upstream not found")),
}))
}
ctx.backend_id = Arc::from(format!("{}:{}:{}", innermap.address, innermap.port, innermap.is_ssl));
Ok(peer)
}
}
None => {
if let Err(e) = session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await {
error!("Failed to send error response: {:?}", e);
}
Err(Box::new(Error {
etype: HTTPStatus(502),
esource: Upstream,
retry: RetryType::Decided(false),
cause: None,
context: Option::from(ImmutStr::Static("Upstream not found")),
}))
}
},
None => {
// session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await.expect("Failed to send error");
if let Err(e) = session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await {
@@ -183,89 +182,45 @@ impl ProxyHttp for LB {
async fn upstream_request_filter(&self, session: &mut Session, upstream_request: &mut RequestHeader, ctx: &mut Self::CTX) -> Result<()> {
if let Some(hostname) = ctx.hostname.as_ref() {
upstream_request.insert_header("Host", hostname)?;
upstream_request.insert_header("Host", hostname.as_ref())?;
}
if let Some(peer) = ctx.upstream_peer.as_ref() {
upstream_request.insert_header("X-Forwarded-For", peer.address.as_str())?;
upstream_request.insert_header("X-Forwarded-For", peer.address.to_string())?;
}
if let Some(headers) = self.get_header(ctx.hostname.as_ref().unwrap_or(&"localhost".to_string()), session.req_header().uri.path()) {
if let Some(client_headers) = headers.server_headers {
for k in client_headers {
upstream_request.insert_header(k.0, k.1)?;
if let Some(headers) = self.get_header(ctx.hostname.as_ref().unwrap_or(&Arc::from("localhost")), session.req_header().uri.path()) {
if let Some(server_headers) = headers.server_headers {
for k in server_headers {
upstream_request.insert_header(k.0.to_string(), k.1.as_ref())?;
}
}
if let Some(client_headers) = headers.client_headers {
let converted: Vec<(Arc<str>, Arc<str>)> = client_headers.into_iter().map(|(k, v)| (Arc::<str>::from(k), Arc::<str>::from(v))).collect();
ctx.client_headers = Arc::new(converted);
}
}
Ok(())
}
// async fn request_body_filter(&self, _session: &mut Session, _body: &mut Option<Bytes>, _end_of_stream: bool, _ctx: &mut Self::CTX) -> Result<()>
// where
// Self::CTX: Send + Sync,
// {
// Ok(())
// }
async fn response_filter(&self, session: &mut Session, _upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX) -> Result<()> {
// _upstream_response.insert_header("X-Proxied-From", "Fooooooooooooooo").unwrap();
if ctx.extraparams.sticky_sessions {
let backend_id = ctx.backend_id.clone();
if let Some(bid) = self.ump_byid.get(&backend_id) {
if let Some(bid) = self.ump_byid.get(backend_id.as_ref()) {
let _ = _upstream_response.insert_header("set-cookie", format!("backend_id={}; Path=/; Max-Age=600; HttpOnly; SameSite=Lax", bid.address));
}
}
if ctx.to_https {
let mut redirect_response = ResponseHeader::build(StatusCode::MOVED_PERMANENTLY, None)?;
redirect_response.insert_header("Location", ctx.redirect_to.clone())?;
redirect_response.insert_header("Location", ctx.redirect_to.as_ref())?;
redirect_response.insert_header("Content-Length", "0")?;
session.write_response_header(Box::new(redirect_response), false).await?;
}
match ctx.hostname.as_ref() {
Some(host) => {
let path = session.req_header().uri.path();
let split_header = host.split_once(':');
match split_header {
Some((host, _port)) => {
if let Some(headers) = self.get_header(host, path) {
if let Some(server_headers) = headers.client_headers {
for k in server_headers {
_upstream_response.insert_header(k.0, k.1).unwrap();
}
}
}
}
None => {
if let Some(headers) = self.get_header(host, path) {
if let Some(server_headers) = headers.client_headers {
for k in server_headers {
_upstream_response.insert_header(k.0, k.1).unwrap();
}
}
}
}
}
// match split_header {
// Some(sh) => {
// let client_header = self.get_header(sh.0, path);
// for k in client_header.iter() {
// for t in k.iter() {
// _upstream_response.insert_header(t.0.clone(), t.1.clone()).unwrap();
// }
// }
// }
// None => {
// let client_header = self.get_header(host_header, path);
// for k in client_header.iter() {
// for t in k.iter() {
// _upstream_response.insert_header(t.0.clone(), t.1.clone()).unwrap();
// }
// }
// }
// }
}
None => {}
for (key, value) in ctx.client_headers.iter() {
_upstream_response.insert_header(key.to_string(), value.as_ref()).unwrap();
}
session.set_keepalive(Some(300));
Ok(())
}
@@ -283,17 +238,17 @@ impl ProxyHttp for LB {
}
}
fn return_header_host(session: &Session) -> Option<String> {
fn return_header_host(session: &Session) -> Option<Arc<str>> {
if session.is_http2() {
match session.req_header().uri.host() {
Some(host) => Option::from(host.to_string()),
Some(host) => Option::from(Arc::from(host)),
None => None,
}
} else {
match session.req_header().headers.get("host") {
Some(host) => {
let header_host = host.to_str().unwrap().splitn(2, ':').collect::<Vec<&str>>();
Option::from(header_host[0].to_string())
let header_host: &str = host.to_str().unwrap().split_once(':').map_or(host.to_str().unwrap(), |(h, _)| h);
Option::from(Arc::<str>::from(header_host))
}
None => None,
}