New features, 4xx counter

This commit is contained in:
Ara Sadoyan
2026-05-22 16:47:40 +02:00
parent df02e523e4
commit d301f7225f
8 changed files with 51 additions and 3 deletions

View File

@@ -69,6 +69,7 @@ async fn build_upstreams(fullist: &UpstreamsDashMap, method: &str, client: &Clie
is_http2: is_h2, is_http2: is_h2,
to_https: upstream.to_https, to_https: upstream.to_https,
rate_limit: upstream.rate_limit, rate_limit: upstream.rate_limit,
x4xx_limit: upstream.x4xx_limit,
healthcheck: upstream.healthcheck, healthcheck: upstream.healthcheck,
redirect_to: upstream.redirect_to.clone(), redirect_to: upstream.redirect_to.clone(),
authorization: upstream.authorization.clone(), authorization: upstream.authorization.clone(),

View File

@@ -32,6 +32,7 @@ pub async fn for_consul(url: String, token: Option<String>, conf: &GlobalService
is_http2: false, is_http2: false,
to_https: conf.to_https.unwrap_or(false), to_https: conf.to_https.unwrap_or(false),
rate_limit: conf.rate_limit, rate_limit: conf.rate_limit,
x4xx_limit: conf.x4xx_limit,
redirect_to: None, redirect_to: None,
healthcheck: None, healthcheck: None,
authorization: None, authorization: None,
@@ -68,6 +69,7 @@ pub async fn for_kuber(url: &str, token: &str, conf: &GlobalServiceMapping) -> O
is_http2: false, is_http2: false,
to_https: conf.to_https.unwrap_or(false), to_https: conf.to_https.unwrap_or(false),
rate_limit: conf.rate_limit, rate_limit: conf.rate_limit,
x4xx_limit: conf.x4xx_limit,
healthcheck: None, healthcheck: None,
redirect_to: None, redirect_to: None,
authorization: None, authorization: None,

View File

@@ -156,6 +156,7 @@ async fn populate_headers_and_auth(config: &mut Configuration, parsed: &Config)
config.extraparams.to_https = parsed.to_https; config.extraparams.to_https = parsed.to_https;
config.extraparams.sticky_sessions = parsed.sticky_sessions; config.extraparams.sticky_sessions = parsed.sticky_sessions;
config.extraparams.rate_limit = parsed.rate_limit; config.extraparams.rate_limit = parsed.rate_limit;
config.extraparams.x4xx_limit = parsed.x4xx_limit;
if let Some(rate) = &parsed.rate_limit { if let Some(rate) = &parsed.rate_limit {
info!("Applied Global Rate Limit : {} request per second", rate); info!("Applied Global Rate Limit : {} request per second", rate);
@@ -208,6 +209,7 @@ async fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) {
is_http2: false, is_http2: false,
to_https: path_config.to_https.unwrap_or(false), to_https: path_config.to_https.unwrap_or(false),
rate_limit: path_config.rate_limit, rate_limit: path_config.rate_limit,
x4xx_limit: path_config.x4xx_limit,
healthcheck: path_config.healthcheck, healthcheck: path_config.healthcheck,
redirect_to: redirect_link, redirect_to: redirect_link,
authorization: path_auth, authorization: path_auth,

View File

@@ -17,6 +17,7 @@ pub struct Extraparams {
pub sticky_sessions: Option<u64>, pub sticky_sessions: Option<u64>,
pub authentication: Option<Arc<InnerAuth>>, pub authentication: Option<Arc<InnerAuth>>,
pub rate_limit: Option<isize>, pub rate_limit: Option<isize>,
pub x4xx_limit: Option<u32>,
} }
#[derive(Debug, Default, Clone, Serialize, Deserialize)] #[derive(Debug, Default, Clone, Serialize, Deserialize)]
@@ -27,6 +28,7 @@ pub struct GlobalServiceMapping {
pub to_https: Option<bool>, pub to_https: Option<bool>,
pub sticky_sessions: Option<u64>, pub sticky_sessions: Option<u64>,
pub rate_limit: Option<isize>, pub rate_limit: Option<isize>,
pub x4xx_limit: Option<u32>,
pub client_headers: Option<Vec<String>>, pub client_headers: Option<Vec<String>>,
pub server_headers: Option<Vec<String>>, pub server_headers: Option<Vec<String>>,
} }
@@ -65,12 +67,14 @@ pub struct Config {
pub kubernetes: Option<Kubernetes>, pub kubernetes: Option<Kubernetes>,
#[serde(default)] #[serde(default)]
pub rate_limit: Option<isize>, pub rate_limit: Option<isize>,
pub x4xx_limit: Option<u32>,
} }
#[derive(Debug, Default, Serialize, Deserialize)] #[derive(Debug, Default, Serialize, Deserialize)]
pub struct HostConfig { pub struct HostConfig {
pub paths: HashMap<String, PathConfig>, pub paths: HashMap<String, PathConfig>,
pub rate_limit: Option<isize>, pub rate_limit: Option<isize>,
pub x4xx_limit: Option<u32>,
} }
#[derive(Debug, Default, Serialize, Deserialize, Clone)] #[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct Auth { pub struct Auth {
@@ -86,6 +90,7 @@ pub struct PathConfig {
pub client_headers: Option<Vec<String>>, pub client_headers: Option<Vec<String>>,
pub server_headers: Option<Vec<String>>, pub server_headers: Option<Vec<String>>,
pub rate_limit: Option<isize>, pub rate_limit: Option<isize>,
pub x4xx_limit: Option<u32>,
pub healthcheck: Option<bool>, pub healthcheck: Option<bool>,
pub redirect_to: Option<String>, pub redirect_to: Option<String>,
pub authorization: Option<Auth>, pub authorization: Option<Auth>,
@@ -141,6 +146,7 @@ pub struct InnerMap {
pub is_http2: bool, pub is_http2: bool,
pub to_https: bool, pub to_https: bool,
pub rate_limit: Option<isize>, pub rate_limit: Option<isize>,
pub x4xx_limit: Option<u32>,
pub healthcheck: Option<bool>, pub healthcheck: Option<bool>,
pub redirect_to: Option<Arc<str>>, pub redirect_to: Option<Arc<str>>,
pub authorization: Option<Arc<InnerAuth>>, pub authorization: Option<Arc<InnerAuth>>,
@@ -156,6 +162,7 @@ impl InnerMap {
is_http2: Default::default(), is_http2: Default::default(),
to_https: Default::default(), to_https: Default::default(),
rate_limit: Default::default(), rate_limit: Default::default(),
x4xx_limit: Default::default(),
healthcheck: Default::default(), healthcheck: Default::default(),
redirect_to: Default::default(), redirect_to: Default::default(),
authorization: Default::default(), authorization: Default::default(),
@@ -171,6 +178,7 @@ pub struct InnerMapForJson {
pub is_http2: bool, pub is_http2: bool,
pub to_https: bool, pub to_https: bool,
pub rate_limit: Option<isize>, pub rate_limit: Option<isize>,
pub x4xx_limit: Option<u32>,
pub healthcheck: Option<bool>, pub healthcheck: Option<bool>,
} }
#[derive(Debug, Default, Serialize, Deserialize)] #[derive(Debug, Default, Serialize, Deserialize)]

View File

@@ -29,13 +29,14 @@ pub fn print_upstreams(upstreams: &UpstreamsDashMap) {
for f in path_entry.value().0.clone() { for f in path_entry.value().0.clone() {
writeln!( writeln!(
out, out,
" IP: {}, Port: {}, SSL: {}, H2: {}, To HTTPS: {}, Rate Limit: {}", " IP: {}, Port: {}, SSL: {}, H2: {}, To HTTPS: {}, Rate Limit: {}, 4xx Limit: {}",
f.address, f.address,
f.port, f.port,
f.is_ssl, f.is_ssl,
f.is_http2, f.is_http2,
f.to_https, f.to_https,
f.rate_limit.unwrap_or(0) f.rate_limit.unwrap_or(0),
f.x4xx_limit.unwrap_or(0)
) )
.unwrap(); .unwrap();
} }
@@ -152,13 +153,14 @@ pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) {
let mut id = String::new(); let mut id = String::new();
write!( write!(
&mut id, &mut id,
"{}:{}:{}:{}:{}:{}:{}:{:?}", "{}:{}:{}:{}:{}:{}:{}:{}:{:?}",
outer_entry.key(), outer_entry.key(),
x.address, x.address,
x.port, x.port,
x.is_http2, x.is_http2,
x.to_https, x.to_https,
x.rate_limit.unwrap_or_default(), x.rate_limit.unwrap_or_default(),
x.x4xx_limit.unwrap_or_default(),
x.healthcheck.unwrap_or_default(), x.healthcheck.unwrap_or_default(),
x.authorization x.authorization
) )
@@ -177,6 +179,7 @@ pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) {
is_http2: false, is_http2: false,
to_https: false, to_https: false,
rate_limit: None, rate_limit: None,
x4xx_limit: None,
healthcheck: None, healthcheck: None,
redirect_to: None, redirect_to: None,
authorization: None, authorization: None,
@@ -303,6 +306,7 @@ pub fn upstreams_to_json(upstreams: &UpstreamsDashMap) -> serde_json::Result<Str
is_http2: a.is_http2, is_http2: a.is_http2,
to_https: a.to_https, to_https: a.to_https,
rate_limit: a.rate_limit, rate_limit: a.rate_limit,
x4xx_limit: a.x4xx_limit,
healthcheck: a.healthcheck, healthcheck: a.healthcheck,
}) })
.collect(), .collect(),

View File

@@ -99,6 +99,7 @@ impl BackgroundService for LB {
new.sticky_sessions = ss.extraparams.sticky_sessions; new.sticky_sessions = ss.extraparams.sticky_sessions;
new.authentication = ss.extraparams.authentication.clone(); new.authentication = ss.extraparams.authentication.clone();
new.rate_limit = ss.extraparams.rate_limit; new.rate_limit = ss.extraparams.rate_limit;
new.x4xx_limit = ss.extraparams.x4xx_limit;
self.extraparams.store(Arc::new(new)); self.extraparams.store(Arc::new(new));
self.client_headers.clear(); self.client_headers.clear();
self.server_headers.clear(); self.server_headers.clear();

View File

@@ -7,6 +7,7 @@ use async_trait::async_trait;
use axum::body::Bytes; use axum::body::Bytes;
use dashmap::DashMap; use dashmap::DashMap;
use log::{debug, error, warn}; use log::{debug, error, warn};
use moka::sync::Cache;
use pingora::http::{RequestHeader, ResponseHeader, StatusCode}; use pingora::http::{RequestHeader, ResponseHeader, StatusCode};
use pingora::prelude::*; use pingora::prelude::*;
use pingora::ErrorSource::Upstream; use pingora::ErrorSource::Upstream;
@@ -17,6 +18,7 @@ use pingora_proxy::{ProxyHttp, Session};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::cell::RefCell; use std::cell::RefCell;
use std::fmt::Write; use std::fmt::Write;
use std::net::IpAddr;
use std::sync::{Arc, LazyLock}; use std::sync::{Arc, LazyLock};
use std::time::Duration; use std::time::Duration;
use tokio::time::Instant; use tokio::time::Instant;
@@ -24,6 +26,7 @@ use tokio::time::Instant;
static REVERSE_STORE: LazyLock<DashMap<String, String>> = LazyLock::new(DashMap::new); static REVERSE_STORE: LazyLock<DashMap<String, String>> = LazyLock::new(DashMap::new);
thread_local! {static IP_BUFFER: RefCell<String> = RefCell::new(String::with_capacity(50));} thread_local! {static IP_BUFFER: RefCell<String> = RefCell::new(String::with_capacity(50));}
pub static RATE_LIMITER: LazyLock<Rate> = LazyLock::new(|| Rate::new(Duration::from_secs(1))); pub static RATE_LIMITER: LazyLock<Rate> = LazyLock::new(|| Rate::new(Duration::from_secs(1)));
pub static REQUESTS_4XX: LazyLock<Cache<IpAddr, u32>> = LazyLock::new(|| Cache::builder().time_to_live(Duration::from_secs(1)).build());
pub static LOCALHOST: LazyLock<Arc<str>> = LazyLock::new(|| Arc::from("localhost")); pub static LOCALHOST: LazyLock<Arc<str>> = LazyLock::new(|| Arc::from("localhost"));
#[derive(Clone)] #[derive(Clone)]
@@ -44,6 +47,7 @@ pub struct Context {
upstream_peer: Option<Arc<InnerMap>>, upstream_peer: Option<Arc<InnerMap>>,
extraparams: arc_swap::Guard<Arc<Extraparams>>, extraparams: arc_swap::Guard<Arc<Extraparams>>,
client_headers: Option<Vec<(String, Arc<str>)>>, client_headers: Option<Vec<(String, Arc<str>)>>,
x4xx_limit: Option<u32>,
} }
#[async_trait] #[async_trait]
@@ -57,6 +61,7 @@ impl ProxyHttp for LB {
upstream_peer: None, upstream_peer: None,
extraparams: self.extraparams.load(), extraparams: self.extraparams.load(),
client_headers: None, client_headers: None,
x4xx_limit: None,
} }
} }
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> { async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
@@ -89,6 +94,22 @@ impl ProxyHttp for LB {
return Ok(true); return Ok(true);
} }
} }
if let Some(rate) = innermap.x4xx_limit.or(_ctx.extraparams.x4xx_limit) {
_ctx.x4xx_limit = innermap.x4xx_limit;
let rate_key = session.client_addr().and_then(|addr| addr.as_inet()).map(|inet| inet.ip());
if let Some(rk) = rate_key {
let count = REQUESTS_4XX.get(&rk).unwrap_or(0);
if count > rate {
let header = ResponseHeader::build(429, None)?;
session.set_keepalive(None);
session.write_response_header(Box::new(header), true).await?;
if let (Some(oi), Some(oa)) = (&_ctx.hostname, rate_key) {
warn!("Limit 4XX: {}-rps exceed on {} from {}", rate, oi, oa);
}
return Ok(true);
}
}
}
if let Some(rate) = innermap.rate_limit.or(_ctx.extraparams.rate_limit) { if let Some(rate) = innermap.rate_limit.or(_ctx.extraparams.rate_limit) {
let rate_key = session.client_addr().and_then(|addr| addr.as_inet()).map(|inet| inet.ip()); let rate_key = session.client_addr().and_then(|addr| addr.as_inet()).map(|inet| inet.ip());
let curr_window_requests = RATE_LIMITER.observe(&rate_key, 1); let curr_window_requests = RATE_LIMITER.observe(&rate_key, 1);
@@ -282,6 +303,14 @@ impl ProxyHttp for LB {
}; };
calc_metrics(m); calc_metrics(m);
ACTIVE_SESSIONS.dec(); ACTIVE_SESSIONS.dec();
if let Some(_) = ctx.x4xx_limit.or(ctx.extraparams.x4xx_limit) {
if 400 <= response_code && response_code <= 499 {
if let Some(ip) = session.client_addr().and_then(|a| a.as_inet()).map(|i| i.ip()) {
let current = REQUESTS_4XX.get(&ip).unwrap_or(0);
REQUESTS_4XX.insert(ip, current + 1);
}
}
}
} }
} }

View File

@@ -36,6 +36,7 @@ pub fn run() {
sticky_sessions: None, sticky_sessions: None,
authentication: None, authentication: None,
rate_limit: None, rate_limit: None,
x4xx_limit: None,
})); }));
let cfg = Arc::new(maincfg); let cfg = Arc::new(maincfg);