From abb5fef1d68c625bf706e04ff6d1444d3b93591a Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Tue, 17 Feb 2026 17:03:52 +0100 Subject: [PATCH] minor improvements --- Cargo.toml | 6 +++--- src/utils/metrics.rs | 2 +- src/web/proxyhttp.rs | 30 +++++++++++++----------------- 3 files changed, 17 insertions(+), 21 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e95c74a..63b8abc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,10 +20,10 @@ pingora-proxy = "0.7.0" pingora-http = "0.7.0" pingora-limits = "0.7.0" async-trait = "0.1.89" -env_logger = "0.11.8" +env_logger = "0.11.9" log = "0.4.29" -futures = "0.3.31" -notify = "9.0.0-rc.1" +futures = "0.3.32" +notify = "9.0.0-rc.2" axum = { version = "0.8.8" } axum-server = { version = "0.8.0", features = ["tls-openssl"] } reqwest = { version = "0.13.1", features = ["json", "stream"] } diff --git a/src/utils/metrics.rs b/src/utils/metrics.rs index 3545ed7..766f01e 100644 --- a/src/utils/metrics.rs +++ b/src/utils/metrics.rs @@ -40,7 +40,7 @@ lazy_static::lazy_static! { pub static ref REQUESTS_BY_UPSTREAM: IntCounterVec = register_int_counter_vec!( "aralez_requests_by_upstream", "Number of requests by UPSTREAM server", - &["method"] + &["upstream"] ).unwrap(); pub static ref REQUESTS_BY_VERSION: IntCounterVec = register_int_counter_vec!( "aralez_requests_by_version_total", diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index 73b1cd6..fb73e54 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -16,16 +16,15 @@ use pingora_core::prelude::HttpPeer; use pingora_limits::rate::Rate; use pingora_proxy::{ProxyHttp, Session}; use sha2::{Digest, Sha256}; +use std::cell::RefCell; use std::fmt::Write; -// use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use tokio::time::Instant; static RATE_LIMITER: Lazy = Lazy::new(|| Rate::new(Duration::from_secs(1))); static REVERSE_STORE: Lazy> = Lazy::new(|| DashMap::new()); - -// static BARADI: String = String::new(); +thread_local! {static IP_BUFFER: RefCell = RefCell::new(String::with_capacity(50));} #[derive(Clone)] pub struct LB { @@ -40,7 +39,6 @@ pub struct LB { pub struct Context { backend_id: Option, - // backend_id: Option<(IpAddr, u16, bool)>, to_https: bool, sticky_sessions: bool, redirect_to: Option, @@ -69,7 +67,6 @@ impl ProxyHttp for LB { } async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result { let ep = _ctx.extraparams.as_ref(); - if let Some(auth) = ep.authentication.get("authorization") { let authenticated = authenticate(auth.value(), &session); if !authenticated { @@ -97,7 +94,6 @@ impl ProxyHttp for LB { } } } - // println!("backend_id ===========> {:?}", backend_id); match _ctx.hostname.as_ref() { None => return Ok(false), Some(host) => { @@ -106,7 +102,6 @@ impl ProxyHttp for LB { None => return Ok(false), Some(ref innermap) => { if let Some(rate) = innermap.rate_limit.or(ep.rate_limit) { - // let rate_key = session.client_addr().and_then(|addr| addr.as_inet()).map(|inet| inet.ip().to_string()).unwrap_or_else(|| host.to_string()); let rate_key = session.client_addr().and_then(|addr| addr.as_inet()).map(|inet| inet.ip()); let curr_window_requests = RATE_LIMITER.observe(&rate_key, 1); if curr_window_requests > rate { @@ -128,7 +123,6 @@ impl ProxyHttp for LB { Ok(false) } async fn upstream_peer(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result> { - // let before = crate::utils::fordebug::ALLOC_COUNT.load(Ordering::Relaxed); match ctx.hostname.as_ref() { Some(hostname) => match ctx.upstream_peer.as_ref() { Some(innermap) => { @@ -163,9 +157,6 @@ impl ProxyHttp for LB { ctx.backend_id = Some(s); ctx.sticky_sessions = true; } - // let after = crate::utils::fordebug::ALLOC_COUNT.load(Ordering::Relaxed); - // println!("Allocations : {} : {:?}", after - before, ctx.backend_id); - Ok(peer) } None => { @@ -182,7 +173,6 @@ impl ProxyHttp for LB { } }, None => { - // session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await.expect("Failed to send error"); if let Err(e) = session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await { error!("Failed to send error response: {:?}", e); } @@ -201,12 +191,17 @@ impl ProxyHttp for LB { if let Some(hostname) = ctx.hostname.as_deref() { upstream_request.insert_header("Host", hostname)?; } - if let Some(peer) = &ctx.upstream_peer { - upstream_request.insert_header("X-Forwarded-For", &*peer.address)?; + + if let Some(client_ip) = session.client_addr() { + IP_BUFFER.with(|buffer| { + let mut buf = buffer.borrow_mut(); + buf.clear(); + write!(buf, "{}", client_ip).unwrap_or(()); + upstream_request.append_header("x-forward-for", buf.as_str()).unwrap_or(false); + }); } let hostname = ctx.hostname.as_deref().unwrap_or("localhost"); let path = session.req_header().uri.path(); - let GetHostsReturHeaders { server_headers, client_headers } = match self.get_header(hostname, path) { Some(h) => h, None => return Ok(()), @@ -225,7 +220,6 @@ impl ProxyHttp for LB { async fn response_filter(&self, session: &mut Session, _upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX) -> Result<()> { if ctx.sticky_sessions { if let Some(bid) = ctx.backend_id.clone() { - // CHECK ALLOCATIONS TOMORROW if REVERSE_STORE.get(&*bid).is_none() { let mut hasher = Sha256::new(); hasher.update(bid.clone().into_bytes()); @@ -248,11 +242,13 @@ impl ProxyHttp for LB { session.write_response_header(Box::new(redirect_response), false).await?; } + // ALLOCATIONS ! if let Some(client_headers) = &ctx.client_headers { for (k, v) in client_headers.iter() { - _upstream_response.insert_header(k.to_string(), v.as_ref())?; + _upstream_response.append_header(k.to_string(), v.as_ref())?; } } + // END ALLOCATIONS ! session.set_keepalive(Some(300)); Ok(())