From 0fc79c022fa836f18e7c262df3f12bebd9691621 Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Wed, 10 Dec 2025 19:09:04 +0100 Subject: [PATCH] perf: optimize header handling and concurrent access patterns --- src/utils/kuberconsul.rs | 24 +++++++++++++++++++++--- src/utils/parceyaml.rs | 34 +++++++++++++++++----------------- src/utils/structs.rs | 3 ++- src/utils/tools.rs | 3 ++- src/web/gethosts.rs | 5 +++-- src/web/proxyhttp.rs | 2 +- 6 files changed, 46 insertions(+), 25 deletions(-) diff --git a/src/utils/kuberconsul.rs b/src/utils/kuberconsul.rs index 5a0a0e4..653bb8f 100644 --- a/src/utils/kuberconsul.rs +++ b/src/utils/kuberconsul.rs @@ -115,11 +115,21 @@ impl ServiceDiscovery for KubernetesDiscovery { if let Some(kuber) = config.kubernetes.clone() { if let Some(svc) = kuber.services { for i in svc { - let header_list = DashMap::new(); + let header_list: DashMap, Vec<(Arc, Arc)>> = DashMap::new(); let mut hl = Vec::new(); build_headers(&i.client_headers, config.as_ref(), &mut hl); if !hl.is_empty() { - header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl); + match i.path.clone() { + Some(path) => { + header_list.insert(Arc::from(path.as_str()), hl); + } + None => { + header_list.insert(Arc::from("/"), hl); + } + } + + // header_list.insert(Arc::from(path.as_str()), hl); + // header_list.insert(Arc::from(i.path).unwrap_or(Arc::from("/")).as_str(), hl); config.client_headers.insert(i.hostname.clone(), header_list); } let url = format!("https://{}/api/v1/namespaces/{}/endpoints/{}", server, namespace, i.hostname); @@ -171,7 +181,15 @@ impl ServiceDiscovery for ConsulDiscovery { let mut hl = Vec::new(); build_headers(&i.client_headers, config.as_ref(), &mut hl); if !hl.is_empty() { - header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl); + match i.path.clone() { + Some(path) => { + header_list.insert(Arc::from(path.as_str()), hl); + } + None => { + header_list.insert(Arc::from("/"), hl); + } + } + // header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl); config.client_headers.insert(i.hostname.clone(), header_list); } diff --git a/src/utils/parceyaml.rs b/src/utils/parceyaml.rs index fdac89e..1135247 100644 --- a/src/utils/parceyaml.rs +++ b/src/utils/parceyaml.rs @@ -6,7 +6,7 @@ use dashmap::DashMap; use log::{error, info, warn}; use std::collections::HashMap; use std::sync::atomic::AtomicUsize; -// use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::Arc; use std::{env, fs}; // use tokio::sync::oneshot::{Receiver, Sender}; @@ -67,31 +67,31 @@ pub async fn load_configuration(d: &str, kind: &str) -> Option { } async fn populate_headers_and_auth(config: &mut Configuration, parsed: &Config) { - let mut ch = Vec::new(); - ch.push(("Server".to_string(), "Aralez".to_string())); + let mut ch: Vec<(Arc, Arc)> = Vec::new(); + ch.push((Arc::from("Server"), Arc::from("Aralez"))); // println!("{:?}", &parsed.client_headers); if let Some(headers) = &parsed.client_headers { for header in headers { if let Some((key, val)) = header.split_once(':') { - ch.push((key.trim().to_string(), val.trim().to_string())); + ch.push((Arc::from(key), Arc::from(val))); } } } - let global_headers = DashMap::new(); - global_headers.insert("/".to_string(), ch); + let global_headers: DashMap, Vec<(Arc, Arc)>> = DashMap::new(); + global_headers.insert(Arc::from("/"), ch); config.client_headers.insert("GLOBAL_CLIENT_HEADERS".to_string(), global_headers); - let mut sh = Vec::new(); - sh.push(("X-Proxy-Server".to_string(), "Aralez".to_string())); + let mut sh: Vec<(Arc, Arc)> = Vec::new(); + sh.push((Arc::from("X-Proxy-Server"), Arc::from("Aralez"))); if let Some(headers) = &parsed.server_headers { for header in headers { if let Some((key, val)) = header.split_once(':') { - sh.push((key.trim().to_string(), val.trim().to_string())); + sh.push((Arc::from(key.trim()), Arc::from(val.trim()))); } } } - let server_global_headers = DashMap::new(); - server_global_headers.insert("/".to_string(), sh); + let server_global_headers: DashMap, Vec<(Arc, Arc)>> = DashMap::new(); + server_global_headers.insert(Arc::from("/"), sh); config.server_headers.insert("GLOBAL_SERVER_HEADERS".to_string(), server_global_headers); config.extraparams.sticky_sessions = parsed.sticky_sessions; @@ -123,12 +123,12 @@ async fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) { info!("Applied Rate Limit for {} : {} request per second", hostname, rate); } - let mut hl: Vec<(String, String)> = Vec::new(); - let mut sl: Vec<(String, String)> = Vec::new(); + let mut hl: Vec<(Arc, Arc)> = Vec::new(); + let mut sl: Vec<(Arc, Arc)> = Vec::new(); build_headers(&path_config.client_headers, config, &mut hl); build_headers(&path_config.server_headers, config, &mut sl); - client_header_list.insert(path.clone(), hl); - server_header_list.insert(path.clone(), sl); + client_header_list.insert(Arc::from(path.as_str()), hl); + server_header_list.insert(Arc::from(path.as_str()), sl); let mut server_list = Vec::new(); for server in &path_config.servers { @@ -237,11 +237,11 @@ fn log_builder(conf: &AppConfig) { env_logger::builder().init(); } -pub fn build_headers(path_config: &Option>, _config: &Configuration, hl: &mut Vec<(String, String)>) { +pub fn build_headers(path_config: &Option>, _config: &Configuration, hl: &mut Vec<(Arc, Arc)>) { if let Some(headers) = &path_config { for header in headers { if let Some((key, val)) = header.split_once(':') { - hl.push((key.trim().to_string(), val.trim().to_string())); + hl.push((Arc::from(key.trim()), Arc::from(val.trim()))); } } // if let Some(push) = config.client_headers.get("GLOBAL_HEADERS") { diff --git a/src/utils/structs.rs b/src/utils/structs.rs index d15b931..6b326c3 100644 --- a/src/utils/structs.rs +++ b/src/utils/structs.rs @@ -4,9 +4,10 @@ use std::collections::HashMap; use std::sync::atomic::AtomicUsize; pub type UpstreamsDashMap = DashMap, AtomicUsize)>>; use std::net::IpAddr; +use std::sync::Arc; pub type UpstreamsIdMap = DashMap; -pub type Headers = DashMap>>; +pub type Headers = DashMap, Vec<(Arc, Arc)>>>; #[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct ServiceMapping { diff --git a/src/utils/tools.rs b/src/utils/tools.rs index 3977eee..39905cb 100644 --- a/src/utils/tools.rs +++ b/src/utils/tools.rs @@ -15,6 +15,7 @@ use std::os::unix::fs::MetadataExt; use std::str::FromStr; use std::sync::atomic::AtomicUsize; use std::sync::mpsc::{channel, Sender}; +use std::sync::Arc; use std::time::{Duration, Instant}; use std::{fs, process, thread, time}; @@ -132,7 +133,7 @@ pub fn compare_dashmaps(map1: &UpstreamsDashMap, map2: &UpstreamsDashMap) -> boo true } -pub fn merge_headers(target: &DashMap>, source: &DashMap>) { +pub fn merge_headers(target: &DashMap, Vec<(Arc, Arc)>>, source: &DashMap, Vec<(Arc, Arc)>>) { for entry in source.iter() { let global_key = entry.key().clone(); let global_values = entry.value().clone(); diff --git a/src/web/gethosts.rs b/src/web/gethosts.rs index a4b5575..fb332d7 100644 --- a/src/web/gethosts.rs +++ b/src/web/gethosts.rs @@ -2,11 +2,12 @@ use crate::utils::structs::InnerMap; use crate::web::proxyhttp::LB; use async_trait::async_trait; use std::sync::atomic::Ordering; +use std::sync::Arc; #[derive(Debug, Clone)] pub struct GetHostsReturHeaders { - pub client_headers: Option>, - pub server_headers: Option>, + pub client_headers: Option, Arc)>>, + pub server_headers: Option, Arc)>>, } #[async_trait] diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index 1f45632..1c1b133 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -191,7 +191,7 @@ impl ProxyHttp for LB { if let Some(headers) = self.get_header(ctx.hostname.as_ref().unwrap_or(&Arc::from("localhost")), session.req_header().uri.path()) { if let Some(server_headers) = headers.server_headers { for k in server_headers { - upstream_request.insert_header(k.0, k.1)?; + upstream_request.insert_header(k.0.to_string(), k.1.as_ref())?; } } if let Some(client_headers) = headers.client_headers {