minor improvements

This commit is contained in:
Ara Sadoyan
2026-02-17 17:03:52 +01:00
parent 3618687ad5
commit abb5fef1d6
3 changed files with 17 additions and 21 deletions

View File

@@ -20,10 +20,10 @@ pingora-proxy = "0.7.0"
pingora-http = "0.7.0"
pingora-limits = "0.7.0"
async-trait = "0.1.89"
env_logger = "0.11.8"
env_logger = "0.11.9"
log = "0.4.29"
futures = "0.3.31"
notify = "9.0.0-rc.1"
futures = "0.3.32"
notify = "9.0.0-rc.2"
axum = { version = "0.8.8" }
axum-server = { version = "0.8.0", features = ["tls-openssl"] }
reqwest = { version = "0.13.1", features = ["json", "stream"] }

View File

@@ -40,7 +40,7 @@ lazy_static::lazy_static! {
pub static ref REQUESTS_BY_UPSTREAM: IntCounterVec = register_int_counter_vec!(
"aralez_requests_by_upstream",
"Number of requests by UPSTREAM server",
&["method"]
&["upstream"]
).unwrap();
pub static ref REQUESTS_BY_VERSION: IntCounterVec = register_int_counter_vec!(
"aralez_requests_by_version_total",

View File

@@ -16,16 +16,15 @@ use pingora_core::prelude::HttpPeer;
use pingora_limits::rate::Rate;
use pingora_proxy::{ProxyHttp, Session};
use sha2::{Digest, Sha256};
use std::cell::RefCell;
use std::fmt::Write;
// use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
static RATE_LIMITER: Lazy<Rate> = Lazy::new(|| Rate::new(Duration::from_secs(1)));
static REVERSE_STORE: Lazy<DashMap<String, String>> = Lazy::new(|| DashMap::new());
// static BARADI: String = String::new();
thread_local! {static IP_BUFFER: RefCell<String> = RefCell::new(String::with_capacity(50));}
#[derive(Clone)]
pub struct LB {
@@ -40,7 +39,6 @@ pub struct LB {
pub struct Context {
backend_id: Option<String>,
// backend_id: Option<(IpAddr, u16, bool)>,
to_https: bool,
sticky_sessions: bool,
redirect_to: Option<String>,
@@ -69,7 +67,6 @@ impl ProxyHttp for LB {
}
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
let ep = _ctx.extraparams.as_ref();
if let Some(auth) = ep.authentication.get("authorization") {
let authenticated = authenticate(auth.value(), &session);
if !authenticated {
@@ -97,7 +94,6 @@ impl ProxyHttp for LB {
}
}
}
// println!("backend_id ===========> {:?}", backend_id);
match _ctx.hostname.as_ref() {
None => return Ok(false),
Some(host) => {
@@ -106,7 +102,6 @@ impl ProxyHttp for LB {
None => return Ok(false),
Some(ref innermap) => {
if let Some(rate) = innermap.rate_limit.or(ep.rate_limit) {
// let rate_key = session.client_addr().and_then(|addr| addr.as_inet()).map(|inet| inet.ip().to_string()).unwrap_or_else(|| host.to_string());
let rate_key = session.client_addr().and_then(|addr| addr.as_inet()).map(|inet| inet.ip());
let curr_window_requests = RATE_LIMITER.observe(&rate_key, 1);
if curr_window_requests > rate {
@@ -128,7 +123,6 @@ impl ProxyHttp for LB {
Ok(false)
}
async fn upstream_peer(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<Box<HttpPeer>> {
// let before = crate::utils::fordebug::ALLOC_COUNT.load(Ordering::Relaxed);
match ctx.hostname.as_ref() {
Some(hostname) => match ctx.upstream_peer.as_ref() {
Some(innermap) => {
@@ -163,9 +157,6 @@ impl ProxyHttp for LB {
ctx.backend_id = Some(s);
ctx.sticky_sessions = true;
}
// let after = crate::utils::fordebug::ALLOC_COUNT.load(Ordering::Relaxed);
// println!("Allocations : {} : {:?}", after - before, ctx.backend_id);
Ok(peer)
}
None => {
@@ -182,7 +173,6 @@ impl ProxyHttp for LB {
}
},
None => {
// session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await.expect("Failed to send error");
if let Err(e) = session.respond_error_with_body(502, Bytes::from("502 Bad Gateway\n")).await {
error!("Failed to send error response: {:?}", e);
}
@@ -201,12 +191,17 @@ impl ProxyHttp for LB {
if let Some(hostname) = ctx.hostname.as_deref() {
upstream_request.insert_header("Host", hostname)?;
}
if let Some(peer) = &ctx.upstream_peer {
upstream_request.insert_header("X-Forwarded-For", &*peer.address)?;
if let Some(client_ip) = session.client_addr() {
IP_BUFFER.with(|buffer| {
let mut buf = buffer.borrow_mut();
buf.clear();
write!(buf, "{}", client_ip).unwrap_or(());
upstream_request.append_header("x-forward-for", buf.as_str()).unwrap_or(false);
});
}
let hostname = ctx.hostname.as_deref().unwrap_or("localhost");
let path = session.req_header().uri.path();
let GetHostsReturHeaders { server_headers, client_headers } = match self.get_header(hostname, path) {
Some(h) => h,
None => return Ok(()),
@@ -225,7 +220,6 @@ impl ProxyHttp for LB {
async fn response_filter(&self, session: &mut Session, _upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX) -> Result<()> {
if ctx.sticky_sessions {
if let Some(bid) = ctx.backend_id.clone() {
// CHECK ALLOCATIONS TOMORROW
if REVERSE_STORE.get(&*bid).is_none() {
let mut hasher = Sha256::new();
hasher.update(bid.clone().into_bytes());
@@ -248,11 +242,13 @@ impl ProxyHttp for LB {
session.write_response_header(Box::new(redirect_response), false).await?;
}
// ALLOCATIONS !
if let Some(client_headers) = &ctx.client_headers {
for (k, v) in client_headers.iter() {
_upstream_response.insert_header(k.to_string(), v.as_ref())?;
_upstream_response.append_header(k.to_string(), v.as_ref())?;
}
}
// END ALLOCATIONS !
session.set_keepalive(Some(300));
Ok(())