Performance improvement. String removal from hot paths.

This commit is contained in:
Ara Sadoyan
2026-01-27 16:19:51 +01:00
parent 38055ae94e
commit 2b437c65fb
9 changed files with 461 additions and 358 deletions

View File

@@ -3,6 +3,7 @@ use base64::engine::general_purpose::STANDARD;
use base64::Engine;
use pingora_proxy::Session;
use std::collections::HashMap;
use std::sync::Arc;
use urlencoding::decode;
trait AuthValidator {
@@ -56,18 +57,18 @@ fn validate(auth: &dyn AuthValidator, session: &Session) -> bool {
auth.validate(session)
}
pub fn authenticate(c: &[String], session: &Session) -> bool {
match c[0].as_str() {
pub fn authenticate(c: &[Arc<str>], session: &Session) -> bool {
match &*c[0] {
"basic" => {
let auth = BasicAuth(c[1].as_str().into());
let auth = BasicAuth(&*c[1]);
validate(&auth, session)
}
"apikey" => {
let auth = ApiKeyAuth(c[1].as_str().into());
let auth = ApiKeyAuth(&*c[1]);
validate(&auth, session)
}
"jwt" => {
let auth = JwtAuth(c[1].as_str().into());
let auth = JwtAuth(&*c[1]);
validate(&auth, session)
}
_ => {

View File

@@ -7,7 +7,7 @@ use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
pub async fn for_consul(url: String, token: Option<String>, conf: &ServiceMapping) -> Option<DashMap<String, (Vec<Arc<InnerMap>>, AtomicUsize)>> {
pub async fn for_consul(url: String, token: Option<String>, conf: &ServiceMapping) -> Option<DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)>> {
let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().ok()?;
let mut headers = HeaderMap::new();
if let Some(token) = token {
@@ -20,7 +20,7 @@ pub async fn for_consul(url: String, token: Option<String>, conf: &ServiceMappin
return None;
}
let mut inner_vec = Vec::new();
let upstreams: DashMap<String, (Vec<Arc<InnerMap>>, AtomicUsize)> = DashMap::new();
let upstreams: DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)> = DashMap::new();
let endpoints: Vec<ConsulService> = resp.json().await.ok()?;
for subsets in endpoints {
// let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone();
@@ -42,7 +42,7 @@ pub async fn for_consul(url: String, token: Option<String>, conf: &ServiceMappin
Some(upstreams)
}
pub async fn for_kuber(url: &str, token: &str, conf: &ServiceMapping) -> Option<DashMap<String, (Vec<Arc<InnerMap>>, AtomicUsize)>> {
pub async fn for_kuber(url: &str, token: &str, conf: &ServiceMapping) -> Option<DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)>> {
let to = Duration::from_secs(10);
let client = Client::builder().timeout(Duration::from_secs(10)).danger_accept_invalid_certs(true).build().ok()?;
let resp = client.get(url).timeout(to).bearer_auth(token).send().await.ok()?;
@@ -51,33 +51,8 @@ pub async fn for_kuber(url: &str, token: &str, conf: &ServiceMapping) -> Option<
return None;
}
let endpoints: KubeEndpoints = resp.json().await.ok()?;
// let endpoints: KubeEndpointsList = resp.json().await.ok()?;
let upstreams: DashMap<String, (Vec<Arc<InnerMap>>, AtomicUsize)> = DashMap::new();
// for endpoints in endpoints.items {
// if let Some(subsets) = endpoints.subsets {
// for subset in subsets {
// if let (Some(addresses), Some(ports)) = (subset.addresses, subset.ports) {
// let mut inner_vec = Vec::new();
// for addr in addresses {
// for port in &ports {
// let to_add = Arc::from(InnerMap {
// address: addr.ip.parse().unwrap(),
// port: port.port.clone(),
// is_ssl: false,
// is_http2: false,
// to_https: conf.to_https.unwrap_or(false),
// rate_limit: conf.rate_limit,
// healthcheck: None,
// });
// inner_vec.push(to_add);
// }
// }
// match_path(&conf, &upstreams, inner_vec.clone());
// }
// }
// }
// }
let upstreams: DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)> = DashMap::new();
if let Some(subsets) = endpoints.subsets {
for subset in subsets {

View File

@@ -56,28 +56,28 @@ pub struct ConsulTaggedAddress {
#[serde(rename = "Port")]
pub port: u16,
}
pub fn list_to_upstreams(lt: Option<DashMap<String, (Vec<Arc<InnerMap>>, AtomicUsize)>>, upstreams: &UpstreamsDashMap, i: &ServiceMapping) {
pub fn list_to_upstreams(lt: Option<DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)>>, upstreams: &UpstreamsDashMap, i: &ServiceMapping) {
if let Some(list) = lt {
match upstreams.get(&i.hostname.clone()) {
match upstreams.get(&*i.hostname.clone()) {
Some(upstr) => {
for (k, v) in list {
upstr.value().insert(k, v);
upstr.value().insert(Arc::from(k.to_owned()), v);
}
}
None => {
upstreams.insert(i.hostname.clone(), list);
upstreams.insert(Arc::from(i.hostname.clone()), list);
}
};
}
}
pub fn match_path(conf: &ServiceMapping, upstreams: &DashMap<String, (Vec<Arc<InnerMap>>, AtomicUsize)>, values: Vec<Arc<InnerMap>>) {
pub fn match_path(conf: &ServiceMapping, upstreams: &DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)>, values: Vec<Arc<InnerMap>>) {
match conf.path {
Some(ref p) => {
upstreams.insert(p.to_string(), (values, AtomicUsize::new(0)));
upstreams.insert(Arc::from(p.clone()), (values, AtomicUsize::new(0)));
}
None => {
upstreams.insert("/".to_string(), (values, AtomicUsize::new(0)));
upstreams.insert(Arc::from("/"), (values, AtomicUsize::new(0)));
}
}
}
@@ -134,7 +134,7 @@ impl ServiceDiscovery for KubernetesDiscovery {
// header_list.insert(Arc::from(path.as_str()), hl);
// header_list.insert(Arc::from(i.path).unwrap_or(Arc::from("/")).as_str(), hl);
config.client_headers.insert(service.hostname.clone(), header_list);
config.client_headers.insert(Arc::from(service.hostname.clone()), header_list);
}
let url = format!("https://{}/api/v1/namespaces/{}/endpoints/{}", server, namespace, service.hostname);
// let url = format!("https://{}/api/v1/namespaces/{}/endpoints?labelSelector=app", server, namespace);
@@ -196,7 +196,7 @@ impl ServiceDiscovery for ConsulDiscovery {
}
}
// header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl);
config.client_headers.insert(i.hostname.clone(), header_list);
config.client_headers.insert(Arc::from(i.hostname.clone()), header_list);
}
let pref = ss.clone() + &i.upstream;

View File

@@ -77,7 +77,7 @@ async fn populate_headers_and_auth(config: &mut Configuration, parsed: &Config)
}
let global_headers: DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>> = DashMap::new();
global_headers.insert(Arc::from("/"), ch);
config.client_headers.insert("GLOBAL_CLIENT_HEADERS".to_string(), global_headers);
config.client_headers.insert(Arc::from("GLOBAL_CLIENT_HEADERS"), global_headers);
let mut sh: Vec<(Arc<str>, Arc<str>)> = Vec::new();
sh.push((Arc::from("X-Proxy-Server"), Arc::from("Aralez")));
@@ -90,7 +90,7 @@ async fn populate_headers_and_auth(config: &mut Configuration, parsed: &Config)
}
let server_global_headers: DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>> = DashMap::new();
server_global_headers.insert(Arc::from("/"), sh);
config.server_headers.insert("GLOBAL_SERVER_HEADERS".to_string(), server_global_headers);
config.server_headers.insert(Arc::from("GLOBAL_SERVER_HEADERS"), server_global_headers);
config.extraparams.sticky_sessions = parsed.sticky_sessions;
config.extraparams.to_https = parsed.to_https;
@@ -103,7 +103,10 @@ async fn populate_headers_and_auth(config: &mut Configuration, parsed: &Config)
if let Some(auth) = &parsed.authorization {
let name = auth.get("type").unwrap_or(&"".to_string()).to_string();
let creds = auth.get("creds").unwrap_or(&"".to_string()).to_string();
config.extraparams.authentication.insert("authorization".to_string(), vec![name, creds]);
config
.extraparams
.authentication
.insert(Arc::from("authorization"), vec![Arc::from(name), Arc::from(creds)]);
} else {
config.extraparams.authentication = DashMap::new();
}
@@ -144,11 +147,11 @@ async fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) {
}
}
}
path_map.insert(path.clone(), (server_list, AtomicUsize::new(0)));
path_map.insert(Arc::from(path.clone()), (server_list, AtomicUsize::new(0)));
}
config.client_headers.insert(hostname.clone(), client_header_list);
config.server_headers.insert(hostname.clone(), server_header_list);
imtdashmap.insert(hostname.clone(), path_map);
config.client_headers.insert(Arc::from(hostname.clone()), client_header_list);
config.server_headers.insert(Arc::from(hostname.clone()), server_header_list);
imtdashmap.insert(Arc::from(hostname.clone()), path_map);
}
if is_first_run() {

View File

@@ -5,10 +5,10 @@ use std::net::IpAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
pub type UpstreamsDashMap = DashMap<String, DashMap<String, (Vec<Arc<InnerMap>>, AtomicUsize)>>;
pub type UpstreamsDashMap = DashMap<Arc<str>, DashMap<Arc<str>, (Vec<Arc<InnerMap>>, AtomicUsize)>>;
pub type UpstreamsIdMap = DashMap<String, Arc<InnerMap>>;
pub type Headers = DashMap<String, DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>>>;
pub type UpstreamsIdMap = DashMap<Arc<str>, Arc<InnerMap>>;
pub type Headers = DashMap<Arc<str>, DashMap<Arc<str>, Vec<(Arc<str>, Arc<str>)>>>;
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub struct ServiceMapping {
@@ -21,13 +21,11 @@ pub struct ServiceMapping {
pub server_headers: Option<Vec<String>>,
}
// pub type Services = DashMap<String, Vec<(String, Option<String>)>>;
#[derive(Clone, Debug, Default)]
pub struct Extraparams {
pub sticky_sessions: bool,
pub to_https: Option<bool>,
pub authentication: DashMap<String, Vec<String>>,
pub authentication: DashMap<Arc<str>, Vec<Arc<str>>>,
pub rate_limit: Option<isize>,
}
#[derive(Clone, Default, Debug, Serialize, Deserialize)]

View File

@@ -169,8 +169,8 @@ pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) {
rate_limit: None,
healthcheck: None,
};
cloned.insert(id, Arc::from(to_add));
cloned.insert(hh, Arc::from(x.to_owned()));
cloned.insert(Arc::from(id.as_str()), Arc::from(to_add));
cloned.insert(Arc::from(hh.as_str()), Arc::from(x.to_owned()));
}
new_inner_map.insert(path.clone(), new_vec);
}
@@ -282,7 +282,7 @@ pub fn upstreams_to_json(upstreams: &UpstreamsDashMap) -> serde_json::Result<Str
let (backends, counter) = inner_entry.value();
inner_map.insert(
inner_entry.key().clone(),
inner_entry.key().to_string(),
UpstreamSnapshot {
backends: backends.iter().map(|a| (**a).clone()).collect(),
requests: counter.load(Ordering::Relaxed),
@@ -290,7 +290,7 @@ pub fn upstreams_to_json(upstreams: &UpstreamsDashMap) -> serde_json::Result<Str
);
}
outer.insert(outer_entry.key().clone(), inner_map);
outer.insert(outer_entry.key().to_string(), inner_map);
}
// serde_json::to_string_pretty(&outer)
@@ -301,7 +301,7 @@ pub fn upstreams_liveness_json(configured: &UpstreamsDashMap, current: &Upstream
let mut result = serde_json::Map::new();
for host_entry in configured.iter() {
let hostname = host_entry.key().clone();
let hostname = host_entry.key().to_string();
let configured_paths = host_entry.value();
let mut paths_json = serde_json::Map::new();
@@ -312,8 +312,8 @@ pub fn upstreams_liveness_json(configured: &UpstreamsDashMap, current: &Upstream
let backends_json: Vec<Value> = configured_backends
.iter()
.map(|backend| {
let alive = if let Some(host_map) = current.get(&hostname) {
if let Some(path_entry) = host_map.get(&path) {
let alive = if let Some(host_map) = current.get(&*hostname) {
if let Some(path_entry) = host_map.get(&*path) {
let list = &path_entry.value().0; // Vec<Arc<InnerMap>>
list.iter().any(|b| b.address == backend.address && b.port == backend.port)
} else {
@@ -331,7 +331,7 @@ pub fn upstreams_liveness_json(configured: &UpstreamsDashMap, current: &Upstream
.collect();
paths_json.insert(
path,
path.to_string(),
json!({
"backends": backends_json
}),

View File

@@ -50,7 +50,6 @@ impl ProxyHttp for LB {
fn new_ctx(&self) -> Self::CTX {
Context {
backend_id: Arc::from(""),
// backend_id: Arc::new((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false)),
to_https: false,
redirect_to: Arc::from(""),
start_time: Instant::now(),
@@ -64,7 +63,7 @@ impl ProxyHttp for LB {
let ep = _ctx.extraparams.as_ref();
if let Some(auth) = ep.authentication.get("authorization") {
let authenticated = authenticate(&auth.value(), &session);
let authenticated = authenticate(auth.value(), &session);
if !authenticated {
let _ = session.respond_error(401).await;
warn!("Forbidden: {:?}, {}", session.client_addr(), session.req_header().uri.path());
@@ -72,7 +71,7 @@ impl ProxyHttp for LB {
}
};
let hostname = return_header_host(&session);
let hostname = return_header_host_from_upstream(session, &self.ump_upst);
_ctx.hostname = hostname;
let mut backend_id = None;
@@ -235,33 +234,56 @@ impl ProxyHttp for LB {
}
}
fn return_header_host(session: &Session) -> Option<Arc<str>> {
if session.is_http2() {
match session.req_header().uri.host() {
Some(host) => Option::from(Arc::from(host)),
None => None,
}
} else {
match session.req_header().headers.get("host") {
Some(host) => {
let header_host: &str = host.to_str().unwrap().split_once(':').map_or(host.to_str().unwrap(), |(h, _)| h);
Option::from(Arc::<str>::from(header_host))
}
None => None,
}
}
}
// use moka::sync::Cache;
// Using Moka for a high-concurrency, size-limited cache
// static HOST_CACHE: Lazy<Cache<String, Arc<str>>> = Lazy::new(|| {
// Cache::builder()
// .max_capacity(10_000) // Limits memory usage if attacked
// .build()
// });
// fn return_header_host_cached(session: &Session) -> Option<Arc<str>> {
// let host_str = if session.is_http2() {
// session.req_header().uri.host()?
// } else {
// let h = session.req_header().headers.get("host")?.to_str().ok()?;
// h.split_once(':').map_or(h, |(host, _)| host)
// };
// HOST_CACHE
// .get_with(host_str.to_string(), || {
// Arc::from(host_str)
// })
// .into()
// }
#[allow(dead_code)]
fn return_str_host<'a>(session: &'a Session) -> Option<&'a str> {
if session.is_http2() {
session.req_header().uri.host()
// use dashmap::DashMap;
// A simple cache to reuse Arcs for common hostnames
// static HOST_CACHE: Lazy<DashMap<String, Arc<str>>> = Lazy::new(|| DashMap::with_capacity(200));
//
// fn return_header_host_cached(session: &Session) -> Option<Arc<str>> {
// let host_str = if session.is_http2() {
// session.req_header().uri.host()?
// } else {
// let h = session.req_header().headers.get("host")?.to_str().ok()?;
// h.split_once(':').map_or(h, |(host, _)| host)
// };
//
// // Fast path: check if we already have an Arc for this host
// if let Some(arc) = HOST_CACHE.get(host_str) {
// return Some(arc.clone()); // Only an atomic increment!
// }
//
// // Slow path: create new Arc and cache it
// let new_arc: Arc<str> = Arc::from(host_str);
// HOST_CACHE.insert(host_str.to_string(), new_arc.clone());
// Some(new_arc)
// }
fn return_header_host_from_upstream(session: &Session, ump_upst: &UpstreamsDashMap) -> Option<Arc<str>> {
let host_str = if session.is_http2() {
session.req_header().uri.host()?
} else {
session
.req_header()
.headers
.get("host")
.and_then(|h| h.to_str().ok())
.map(|h| h.split_once(':').map_or(h, |(host, _)| host))
}
let h = session.req_header().headers.get("host")?.to_str().ok()?;
h.split_once(':').map_or(h, |(host, _)| host)
};
ump_upst.get(host_str).map(|entry| entry.key().clone())
}