From 2ad3a059ab70301cedc51eb7fea7432e68424c24 Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Thu, 24 Jul 2025 13:34:15 +0200 Subject: [PATCH] Per path rate limiter --- README.md | 4 ++ etc/main.yaml | 22 +++++----- etc/upstreams.yaml | 11 ++--- src/utils/consul.rs | 1 + src/utils/healthcheck.rs | 2 + src/utils/parceyaml.rs | 19 ++++++++- src/utils/structs.rs | 19 +++++++++ src/utils/tools.rs | 1 + src/web/proxyhttp.rs | 92 +++++++++++++++++++++++----------------- 9 files changed, 114 insertions(+), 57 deletions(-) diff --git a/README.md b/README.md index d0b739e..b6c6478 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,8 @@ Built on Rust, on top of **Cloudflare’s Pingora engine**, **Aralez** delivers - **Automatic load of certificates** — Automatically reads and loads certificates from a folder, without a restart. - **Upstreams TLS detection** — Aralez will automatically detect if upstreams uses secure connection. - **Built in rate limiter** — Limit requests to server, by setting up upper limit for requests per seconds, per virtualhost. + - **Global rate limiter** — Set rate limit for all virtualhosts. + - **Per path rate limiter** — Set rate limit for specific paths. Path limits will override global limits. - **Authentication** — Supports Basic Auth, API tokens, and JWT verification. - **Basic Auth** - **API Key** via `x-api-key` header @@ -177,6 +179,7 @@ authorization: myhost.mydomain.com: paths: "/": + rate_limit: 20 to_https: false headers: - "X-Some-Thing:Yaaaaaaaaaaaaaaa" @@ -201,6 +204,7 @@ myhost.mydomain.com: - Requests limits are calculated per requester ip plus requested virtualhost. - If the requester exceeds the limit it will receive `429 Too Many Requests` error. - Optional. Rate limiter will be disabled if the parameter is entirely removed from config. +- Requests to `myhost.mydomain.com/` will be limited to 20 requests per second. - Requests to `myhost.mydomain.com/` will be proxied to `127.0.0.1` and `127.0.0.2`. - Plain HTTP to `myhost.mydomain.com/foo` will get 301 redirect to configured TLS port of Aralez. - Requests to `myhost.mydomain.com/foo` will be proxied to `127.0.0.4` and `127.0.0.5`. diff --git a/etc/main.yaml b/etc/main.yaml index 5a4b43b..4c25722 100644 --- a/etc/main.yaml +++ b/etc/main.yaml @@ -1,7 +1,7 @@ -# Main configuration file , applied on startup -threads: 12 # Nubber of daemon threads default setting -#user: aralez # Username for running aralez after dropping root privileges, requires program to start as root -#group: aralez # Group for running aralez after dropping root privileges, requires program to start as root +# Main configuration file, applied on startup +threads: 12 # Number of daemon threads default setting +#user: pastor # Username for running aralez after dropping root privileges, requires program to start as root +#group: pastor # Group for running aralez after dropping root privileges, requires program to start as root daemon: false # Run in background upstream_keepalive_pool_size: 500 # Pool size for upstream keepalive connections pid_file: /tmp/aralez.pid # Path to PID file @@ -10,15 +10,15 @@ upgrade_sock: /tmp/aralez.sock # Path to socket file config_api_enabled: true # Boolean to enable/disable remote config push capability. config_address: 0.0.0.0:3000 # HTTP API address for pushing upstreams.yaml from remote location config_tls_address: 0.0.0.0:3001 # HTTP TLS API address for pushing upstreams.yaml from remote location -config_tls_certificate: etc/server.crt # Mandatory if config_tls_address is set -config_tls_key_file: etc/key.pem # Mandatory if config_tls_address is set +config_tls_certificate: /opt/Rust/Projects/asyncweb/etc/server.crt # Mandatory if config_tls_address is set +config_tls_key_file: /opt/Rust/Projects/asyncweb/etc/key.pem # Mandatory if config_tls_address is set proxy_address_http: 0.0.0.0:6193 # Proxy HTTP bind address proxy_address_tls: 0.0.0.0:6194 # Optional, Proxy TLS bind address -proxy_certificates: etc/certificates # Mandatory if proxy_address_tls set, should contain certificate and key files strictly in a format {NAME}.crt, {NAME}.key. -upstreams_conf: etc/upstreams.yaml # the location of upstreams file -#file_server_folder: /some/path # Optional, local folder to serve -#file_server_address: 127.0.0.1:3002 # Optional, Local address for file server. Can set as upstream for public access. +proxy_certificates: /opt/Rust/Projects/asyncweb/etc/yoyo # Mandatory if proxy_address_tls set, should contain certificate and key files strictly in a format {NAME}.crt, {NAME}.key. +upstreams_conf: /opt/Rust/Projects/asyncweb/etc/upstreams.yaml # the location of upstreams file +file_server_folder: /opt/storage # Optional, local folder to serve +file_server_address: 127.0.0.1:3002 # Optional, Local address for file server. Can set as upstream for public access. log_level: info # info, warn, error, debug, trace, off hc_method: HEAD # Healthcheck method (HEAD, GET, POST are supported) UPPERCASE hc_interval: 2 #Interval for health checks in seconds -master_key: 910517d9-f9a1-48de-8826-dbadacbd84af-cb6f830e-ab16-47ec-9d8f-0090de732774 # Mater key for working with API server and JWT Secret \ No newline at end of file +master_key: 910517d9-f9a1-48de-8826-dbadacbd84af-cb6f830e-ab16-47ec-9d8f-0090de732774 # Mater key for working with API server and JWT Secret diff --git a/etc/upstreams.yaml b/etc/upstreams.yaml index 1f9982f..c4bb6df 100644 --- a/etc/upstreams.yaml +++ b/etc/upstreams.yaml @@ -17,9 +17,9 @@ authorization: # creds: "5ecbf799-1343-4e94-a9b5-e278af5cd313-56b45249-1839-4008-a450-a60dc76d2bae" consul: # If the provider is consul. Otherwise, ignored. servers: - - "http://master1:8500" - - "http://192.168.22.1:8500" - - "http://master1.foo.local:8500" + - "http://consul1:8500" + - "http://consul2:8500" + - "http://consul3:8500" services: # proxy: The hostname to access the proxy server, real : The real service name in Consul database. - proxy: "proxy-frontend-dev-frontend-srv" real: "frontend-dev-frontend-srv" @@ -27,10 +27,11 @@ consul: # If the provider is consul. Otherwise, ignored. upstreams: myip.mydomain.com: paths: + rate_limit: 10 # Per path rate limit have higher priority than global rate limit. If not set, the global rate limit will be used "/": to_https: false headers: - - "X-Proxy-From:Gazan" + - "X-Proxy-From:Aralez" servers: # List of upstreams HOST:PORT - "127.0.0.1:8000" - "127.0.0.2:8000" @@ -40,7 +41,7 @@ upstreams: to_https: true headers: - "X-Some-Thing:Yaaaaaaaaaaaaaaa" - - "X-Proxy-From:Gazan" + - "X-Proxy-From:Aralez" servers: - "127.0.0.1:8000" - "127.0.0.2:8000" diff --git a/src/utils/consul.rs b/src/utils/consul.rs index afb94a7..09c8826 100644 --- a/src/utils/consul.rs +++ b/src/utils/consul.rs @@ -133,6 +133,7 @@ async fn get_by_http(url: String, token: Option) -> Option, fullist: Arc, is_ssl: tls.0, is_http2: is_h2, to_https: k.1.to_https, + rate_limit: k.1.rate_limit, }; let resp = http_request(_link.as_str(), params.0, "", &client).await; match resp.0 { @@ -55,6 +56,7 @@ pub async fn hc2(upslist: Arc, fullist: Arc, is_ssl: tls.0, is_http2: is_h2, to_https: k.1.to_https, + rate_limit: k.1.rate_limit, }; } innervec.push(scheme); diff --git a/src/utils/parceyaml.rs b/src/utils/parceyaml.rs index 4b0b300..8b0c28a 100644 --- a/src/utils/parceyaml.rs +++ b/src/utils/parceyaml.rs @@ -80,6 +80,10 @@ fn populate_headers_and_auth(config: &mut Configuration, parsed: &Config) { config.extraparams.to_https = parsed.to_https; config.extraparams.rate_limit = parsed.rate_limit; + if let Some(rate) = &parsed.rate_limit { + info!("Applied Global Rate Limit : {} request per second", rate); + } + if let Some(auth) = &parsed.authorization { let name = auth.get("type").unwrap_or(&"".to_string()).to_string(); let creds = auth.get("creds").unwrap_or(&"".to_string()).to_string(); @@ -94,8 +98,11 @@ fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) { for (hostname, host_config) in upstreams { let path_map = DashMap::new(); let header_list = DashMap::new(); - for (path, path_config) in &host_config.paths { + if let Some(rate) = &path_config.rate_limit { + info!("Applied Rate Limit for {} : {} request per second", hostname, rate); + } + let mut server_list = Vec::new(); let mut hl = Vec::new(); @@ -109,6 +116,14 @@ fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) { header_list.insert(path.clone(), hl); for server in &path_config.servers { + // let mut rate: Option = None; + // let size: isize = path_config.servers.len() as isize; + // if let Some(limit) = &path_config.rate_limit { + // if size > 0 { + // rate = Some(limit / size); + // } + // } + if let Some((ip, port_str)) = server.split_once(':') { if let Ok(port) = port_str.parse::() { server_list.push(InnerMap { @@ -117,6 +132,8 @@ fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) { is_ssl: true, is_http2: false, to_https: path_config.to_https.unwrap_or(false), + // rate_limit: rate, + rate_limit: path_config.rate_limit, }); } } diff --git a/src/utils/structs.rs b/src/utils/structs.rs index 6a251bc..c10cb67 100644 --- a/src/utils/structs.rs +++ b/src/utils/structs.rs @@ -5,6 +5,23 @@ use std::sync::atomic::AtomicUsize; // pub type InnerMap = BackendConfig; pub type UpstreamsDashMap = DashMap, AtomicUsize)>>; + +// #[derive(Debug, Default)] +// pub struct UpstreamsMap { +// pub upstreams: DashMap, AtomicUsize)>>, +// pub ratelimit: DashMap>, +// } +// impl UpstreamsMap { +// pub fn new() -> Self { +// Self { +// upstreams: Default::default(), +// ratelimit: Default::default(), +// } +// } +// } +// +// pub type XUpstreamsDashMap = DashMap; + pub type UpstreamsIdMap = DashMap; pub type Headers = DashMap>>; @@ -97,6 +114,7 @@ pub struct InnerMap { pub is_ssl: bool, pub is_http2: bool, pub to_https: bool, + pub rate_limit: Option, } impl InnerMap { @@ -107,6 +125,7 @@ impl InnerMap { is_ssl: Default::default(), is_http2: Default::default(), to_https: Default::default(), + rate_limit: Default::default(), } } } diff --git a/src/utils/tools.rs b/src/utils/tools.rs index d7a2e1d..b414bd9 100644 --- a/src/utils/tools.rs +++ b/src/utils/tools.rs @@ -154,6 +154,7 @@ pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) { is_ssl: false, is_http2: false, to_https: false, + rate_limit: None, }; cloned.insert(id, to_add); cloned.insert(hh, x.to_owned()); diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index 4210d39..27ff689 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -1,6 +1,6 @@ use crate::utils::auth::authenticate; use crate::utils::metrics::*; -use crate::utils::structs::{AppConfig, Extraparams, Headers, UpstreamsDashMap, UpstreamsIdMap}; +use crate::utils::structs::{AppConfig, Extraparams, Headers, InnerMap, UpstreamsDashMap, UpstreamsIdMap}; use crate::web::gethosts::GetHost; use arc_swap::ArcSwap; use async_trait::async_trait; @@ -36,6 +36,8 @@ pub struct Context { redirect_to: String, start_time: Instant, hostname: Option, + upstream_peer: Option, + extraparams: arc_swap::Guard>, } #[async_trait] @@ -48,14 +50,18 @@ impl ProxyHttp for LB { redirect_to: String::new(), start_time: Instant::now(), hostname: None, + upstream_peer: None, + extraparams: self.extraparams.load(), } } async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result { - if let Some(auth) = self.extraparams.load().authentication.get("authorization") { + let ep = _ctx.extraparams.clone(); + + if let Some(auth) = ep.authentication.get("authorization") { let authenticated = authenticate(&auth.value(), &session); if !authenticated { let _ = session.respond_error(401).await; - warn!("Forbidden: {:?}, {}", session.client_addr(), session.req_header().uri.path().to_string()); + warn!("Forbidden: {:?}, {}", session.client_addr(), session.req_header().uri.path()); return Ok(true); } }; @@ -63,23 +69,48 @@ impl ProxyHttp for LB { let hostname = return_header_host(&session); _ctx.hostname = hostname.clone(); - if let Some(rate) = self.extraparams.load().rate_limit { - match hostname { - None => return Ok(false), - Some(host) => { - let curr_window_requests = RATE_LIMITER.observe(&host, 1); - if curr_window_requests > rate { - let mut header = ResponseHeader::build(429, None).unwrap(); - header.insert_header("X-Rate-Limit-Limit", rate.to_string()).unwrap(); - header.insert_header("X-Rate-Limit-Remaining", "0").unwrap(); - header.insert_header("X-Rate-Limit-Reset", "1").unwrap(); - session.set_keepalive(None); - session.write_response_header(Box::new(header), true).await?; - debug!("Rate limited: {:?}, {}", session.client_addr(), rate); - return Ok(true); + let mut backend_id = None; + + if ep.sticky_sessions { + if let Some(cookies) = session.req_header().headers.get("cookie") { + if let Ok(cookie_str) = cookies.to_str() { + for cookie in cookie_str.split(';') { + let trimmed = cookie.trim(); + if let Some(value) = trimmed.strip_prefix("backend_id=") { + backend_id = Some(value); + break; + } } } - }; + } + } + + match hostname { + None => return Ok(false), + Some(host) => { + let optioninnermap = self.get_host(host.as_str(), host.as_str(), backend_id); + match optioninnermap { + None => return Ok(false), + Some(ref innermap) => { + if let Some(rate) = innermap.rate_limit.or(ep.rate_limit) { + // let rate_key = session.client_addr().and_then(|addr| addr.as_inet()).map(|inet| inet.ip().to_string()).unwrap_or_else(|| host.to_string()); + let rate_key = session.client_addr().and_then(|addr| addr.as_inet()).map(|inet| inet.ip()); + let curr_window_requests = RATE_LIMITER.observe(&rate_key, 1); + if curr_window_requests > rate { + let mut header = ResponseHeader::build(429, None).unwrap(); + header.insert_header("X-Rate-Limit-Limit", rate.to_string()).unwrap(); + header.insert_header("X-Rate-Limit-Remaining", "0").unwrap(); + header.insert_header("X-Rate-Limit-Reset", "1").unwrap(); + session.set_keepalive(None); + session.write_response_header(Box::new(header), true).await?; + debug!("Rate limited: {:?}, {}", rate_key, rate); + return Ok(true); + } + } + } + } + _ctx.upstream_peer = optioninnermap; + } } Ok(false) } @@ -87,25 +118,7 @@ impl ProxyHttp for LB { // let host_name = return_header_host(&session); match ctx.hostname.as_ref() { Some(hostname) => { - let mut backend_id = None; - - if self.extraparams.load().sticky_sessions { - if let Some(cookies) = session.req_header().headers.get("cookie") { - if let Ok(cookie_str) = cookies.to_str() { - for cookie in cookie_str.split(';') { - let trimmed = cookie.trim(); - if let Some(value) = trimmed.strip_prefix("backend_id=") { - backend_id = Some(value); - break; - } - } - } - } - } - - let optioninnermap = self.get_host(hostname, hostname, backend_id); - - match optioninnermap { + match ctx.upstream_peer.as_ref() { // Some((address, port, ssl, is_h2, to_https)) => { Some(innermap) => { let mut peer = Box::new(HttpPeer::new((innermap.address.clone(), innermap.port.clone()), innermap.is_ssl, String::new())); @@ -118,8 +131,7 @@ impl ProxyHttp for LB { peer.options.verify_cert = false; peer.options.verify_hostname = false; } - - if self.extraparams.load().to_https.unwrap_or(false) || innermap.to_https { + if ctx.to_https || innermap.to_https { if let Some(stream) = session.stream() { if stream.get_ssl().is_none() { if let Some(addr) = session.server_addr() { @@ -190,7 +202,7 @@ impl ProxyHttp for LB { // } async fn response_filter(&self, session: &mut Session, _upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX) -> Result<()> { // _upstream_response.insert_header("X-Proxied-From", "Fooooooooooooooo").unwrap(); - if self.extraparams.load().sticky_sessions { + if ctx.extraparams.sticky_sessions { let backend_id = ctx.backend_id.clone(); if let Some(bid) = self.ump_byid.get(&backend_id) { let _ = _upstream_response.insert_header("set-cookie", format!("backend_id={}; Path=/; Max-Age=600; HttpOnly; SameSite=Lax", bid.address));