From d301f7225f0f8319be73842b19b721e328a7277d Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Fri, 22 May 2026 16:47:40 +0200 Subject: [PATCH] New features, 4xx counter --- src/utils/healthcheck.rs | 1 + src/utils/httpclient.rs | 2 ++ src/utils/parceyaml.rs | 2 ++ src/utils/structs.rs | 8 ++++++++ src/utils/tools.rs | 10 +++++++--- src/web/bgservice.rs | 1 + src/web/proxyhttp.rs | 29 +++++++++++++++++++++++++++++ src/web/start.rs | 1 + 8 files changed, 51 insertions(+), 3 deletions(-) diff --git a/src/utils/healthcheck.rs b/src/utils/healthcheck.rs index 19e9922..b4d9a01 100644 --- a/src/utils/healthcheck.rs +++ b/src/utils/healthcheck.rs @@ -69,6 +69,7 @@ async fn build_upstreams(fullist: &UpstreamsDashMap, method: &str, client: &Clie is_http2: is_h2, to_https: upstream.to_https, rate_limit: upstream.rate_limit, + x4xx_limit: upstream.x4xx_limit, healthcheck: upstream.healthcheck, redirect_to: upstream.redirect_to.clone(), authorization: upstream.authorization.clone(), diff --git a/src/utils/httpclient.rs b/src/utils/httpclient.rs index 04f4c9c..af8c3d8 100644 --- a/src/utils/httpclient.rs +++ b/src/utils/httpclient.rs @@ -32,6 +32,7 @@ pub async fn for_consul(url: String, token: Option, conf: &GlobalService is_http2: false, to_https: conf.to_https.unwrap_or(false), rate_limit: conf.rate_limit, + x4xx_limit: conf.x4xx_limit, redirect_to: None, healthcheck: None, authorization: None, @@ -68,6 +69,7 @@ pub async fn for_kuber(url: &str, token: &str, conf: &GlobalServiceMapping) -> O is_http2: false, to_https: conf.to_https.unwrap_or(false), rate_limit: conf.rate_limit, + x4xx_limit: conf.x4xx_limit, healthcheck: None, redirect_to: None, authorization: None, diff --git a/src/utils/parceyaml.rs b/src/utils/parceyaml.rs index 35066d8..0180285 100644 --- a/src/utils/parceyaml.rs +++ b/src/utils/parceyaml.rs @@ -156,6 +156,7 @@ async fn populate_headers_and_auth(config: &mut Configuration, parsed: &Config) config.extraparams.to_https = parsed.to_https; config.extraparams.sticky_sessions = parsed.sticky_sessions; config.extraparams.rate_limit = parsed.rate_limit; + config.extraparams.x4xx_limit = parsed.x4xx_limit; if let Some(rate) = &parsed.rate_limit { info!("Applied Global Rate Limit : {} request per second", rate); @@ -208,6 +209,7 @@ async fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) { is_http2: false, to_https: path_config.to_https.unwrap_or(false), rate_limit: path_config.rate_limit, + x4xx_limit: path_config.x4xx_limit, healthcheck: path_config.healthcheck, redirect_to: redirect_link, authorization: path_auth, diff --git a/src/utils/structs.rs b/src/utils/structs.rs index a2c28eb..1a64ef9 100644 --- a/src/utils/structs.rs +++ b/src/utils/structs.rs @@ -17,6 +17,7 @@ pub struct Extraparams { pub sticky_sessions: Option, pub authentication: Option>, pub rate_limit: Option, + pub x4xx_limit: Option, } #[derive(Debug, Default, Clone, Serialize, Deserialize)] @@ -27,6 +28,7 @@ pub struct GlobalServiceMapping { pub to_https: Option, pub sticky_sessions: Option, pub rate_limit: Option, + pub x4xx_limit: Option, pub client_headers: Option>, pub server_headers: Option>, } @@ -65,12 +67,14 @@ pub struct Config { pub kubernetes: Option, #[serde(default)] pub rate_limit: Option, + pub x4xx_limit: Option, } #[derive(Debug, Default, Serialize, Deserialize)] pub struct HostConfig { pub paths: HashMap, pub rate_limit: Option, + pub x4xx_limit: Option, } #[derive(Debug, Default, Serialize, Deserialize, Clone)] pub struct Auth { @@ -86,6 +90,7 @@ pub struct PathConfig { pub client_headers: Option>, pub server_headers: Option>, pub rate_limit: Option, + pub x4xx_limit: Option, pub healthcheck: Option, pub redirect_to: Option, pub authorization: Option, @@ -141,6 +146,7 @@ pub struct InnerMap { pub is_http2: bool, pub to_https: bool, pub rate_limit: Option, + pub x4xx_limit: Option, pub healthcheck: Option, pub redirect_to: Option>, pub authorization: Option>, @@ -156,6 +162,7 @@ impl InnerMap { is_http2: Default::default(), to_https: Default::default(), rate_limit: Default::default(), + x4xx_limit: Default::default(), healthcheck: Default::default(), redirect_to: Default::default(), authorization: Default::default(), @@ -171,6 +178,7 @@ pub struct InnerMapForJson { pub is_http2: bool, pub to_https: bool, pub rate_limit: Option, + pub x4xx_limit: Option, pub healthcheck: Option, } #[derive(Debug, Default, Serialize, Deserialize)] diff --git a/src/utils/tools.rs b/src/utils/tools.rs index e8ffa5d..0fcc199 100644 --- a/src/utils/tools.rs +++ b/src/utils/tools.rs @@ -29,13 +29,14 @@ pub fn print_upstreams(upstreams: &UpstreamsDashMap) { for f in path_entry.value().0.clone() { writeln!( out, - " IP: {}, Port: {}, SSL: {}, H2: {}, To HTTPS: {}, Rate Limit: {}", + " IP: {}, Port: {}, SSL: {}, H2: {}, To HTTPS: {}, Rate Limit: {}, 4xx Limit: {}", f.address, f.port, f.is_ssl, f.is_http2, f.to_https, - f.rate_limit.unwrap_or(0) + f.rate_limit.unwrap_or(0), + f.x4xx_limit.unwrap_or(0) ) .unwrap(); } @@ -152,13 +153,14 @@ pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) { let mut id = String::new(); write!( &mut id, - "{}:{}:{}:{}:{}:{}:{}:{:?}", + "{}:{}:{}:{}:{}:{}:{}:{}:{:?}", outer_entry.key(), x.address, x.port, x.is_http2, x.to_https, x.rate_limit.unwrap_or_default(), + x.x4xx_limit.unwrap_or_default(), x.healthcheck.unwrap_or_default(), x.authorization ) @@ -177,6 +179,7 @@ pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) { is_http2: false, to_https: false, rate_limit: None, + x4xx_limit: None, healthcheck: None, redirect_to: None, authorization: None, @@ -303,6 +306,7 @@ pub fn upstreams_to_json(upstreams: &UpstreamsDashMap) -> serde_json::Result> = LazyLock::new(DashMap::new); thread_local! {static IP_BUFFER: RefCell = RefCell::new(String::with_capacity(50));} pub static RATE_LIMITER: LazyLock = LazyLock::new(|| Rate::new(Duration::from_secs(1))); +pub static REQUESTS_4XX: LazyLock> = LazyLock::new(|| Cache::builder().time_to_live(Duration::from_secs(1)).build()); pub static LOCALHOST: LazyLock> = LazyLock::new(|| Arc::from("localhost")); #[derive(Clone)] @@ -44,6 +47,7 @@ pub struct Context { upstream_peer: Option>, extraparams: arc_swap::Guard>, client_headers: Option)>>, + x4xx_limit: Option, } #[async_trait] @@ -57,6 +61,7 @@ impl ProxyHttp for LB { upstream_peer: None, extraparams: self.extraparams.load(), client_headers: None, + x4xx_limit: None, } } async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result { @@ -89,6 +94,22 @@ impl ProxyHttp for LB { return Ok(true); } } + if let Some(rate) = innermap.x4xx_limit.or(_ctx.extraparams.x4xx_limit) { + _ctx.x4xx_limit = innermap.x4xx_limit; + let rate_key = session.client_addr().and_then(|addr| addr.as_inet()).map(|inet| inet.ip()); + if let Some(rk) = rate_key { + let count = REQUESTS_4XX.get(&rk).unwrap_or(0); + if count > rate { + let header = ResponseHeader::build(429, None)?; + session.set_keepalive(None); + session.write_response_header(Box::new(header), true).await?; + if let (Some(oi), Some(oa)) = (&_ctx.hostname, rate_key) { + warn!("Limit 4XX: {}-rps exceed on {} from {}", rate, oi, oa); + } + return Ok(true); + } + } + } if let Some(rate) = innermap.rate_limit.or(_ctx.extraparams.rate_limit) { let rate_key = session.client_addr().and_then(|addr| addr.as_inet()).map(|inet| inet.ip()); let curr_window_requests = RATE_LIMITER.observe(&rate_key, 1); @@ -282,6 +303,14 @@ impl ProxyHttp for LB { }; calc_metrics(m); ACTIVE_SESSIONS.dec(); + if let Some(_) = ctx.x4xx_limit.or(ctx.extraparams.x4xx_limit) { + if 400 <= response_code && response_code <= 499 { + if let Some(ip) = session.client_addr().and_then(|a| a.as_inet()).map(|i| i.ip()) { + let current = REQUESTS_4XX.get(&ip).unwrap_or(0); + REQUESTS_4XX.insert(ip, current + 1); + } + } + } } } diff --git a/src/web/start.rs b/src/web/start.rs index 5c683bc..f620b20 100644 --- a/src/web/start.rs +++ b/src/web/start.rs @@ -36,6 +36,7 @@ pub fn run() { sticky_sessions: None, authentication: None, rate_limit: None, + x4xx_limit: None, })); let cfg = Arc::new(maincfg);