diff --git a/src/utils/dnsclient.rs b/src/utils/dnsclient.rs index 10a6aa9..d3d25e3 100644 --- a/src/utils/dnsclient.rs +++ b/src/utils/dnsclient.rs @@ -103,6 +103,7 @@ impl DnsClient { is_ssl: false, is_http2: false, to_https: false, + sticky_sessions: false, rate_limit: None, }; values.push(to_add); diff --git a/src/utils/parceyaml.rs b/src/utils/parceyaml.rs index 5a6aa61..da962e3 100644 --- a/src/utils/parceyaml.rs +++ b/src/utils/parceyaml.rs @@ -89,9 +89,8 @@ async fn populate_headers_and_auth(config: &mut Configuration, parsed: &Config) let server_global_headers: DashMap, Vec<(Arc, Arc)>> = DashMap::new(); server_global_headers.insert(Arc::from("/"), sh); config.server_headers.insert(Arc::from("GLOBAL_SERVER_HEADERS"), server_global_headers); - - config.extraparams.sticky_sessions = parsed.sticky_sessions; config.extraparams.to_https = parsed.to_https; + config.extraparams.sticky_sessions = parsed.sticky_sessions; config.extraparams.rate_limit = parsed.rate_limit; if let Some(rate) = &parsed.rate_limit { @@ -121,7 +120,6 @@ async fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) { if let Some(rate) = &path_config.rate_limit { info!("Applied Rate Limit for {} : {} request per second", hostname, rate); } - let mut hl: Vec<(Arc, Arc)> = Vec::new(); let mut sl: Vec<(Arc, Arc)> = Vec::new(); build_headers(&path_config.client_headers, config, &mut hl); diff --git a/src/utils/structs.rs b/src/utils/structs.rs index f2b3807..4403fad 100644 --- a/src/utils/structs.rs +++ b/src/utils/structs.rs @@ -6,13 +6,13 @@ use std::sync::Arc; pub type UpstreamsDashMap = DashMap, DashMap, (Vec>, AtomicUsize)>>; -pub type UpstreamsIdMap = DashMap, Arc>; +pub type UpstreamsIdMap = DashMap>; pub type Headers = DashMap, DashMap, Vec<(Arc, Arc)>>>; #[derive(Clone, Debug, Default)] pub struct Extraparams { - pub sticky_sessions: bool, pub to_https: Option, + pub sticky_sessions: bool, pub authentication: DashMap, Vec>>, pub rate_limit: Option, } @@ -23,6 +23,7 @@ pub struct ServiceMapping { pub hostname: String, pub path: Option, pub to_https: Option, + pub sticky_sessions: Option, pub rate_limit: Option, pub client_headers: Option>, pub server_headers: Option>, @@ -44,8 +45,8 @@ pub struct Consul { #[derive(Debug, Default, Serialize, Deserialize)] pub struct Config { pub provider: String, - pub sticky_sessions: bool, pub to_https: Option, + pub sticky_sessions: bool, #[serde(default)] pub upstreams: Option>, #[serde(default)] @@ -74,6 +75,7 @@ pub struct HostConfig { pub struct PathConfig { pub servers: Vec, pub to_https: Option, + pub sticky_sessions: Option, pub client_headers: Option>, pub server_headers: Option>, pub rate_limit: Option, diff --git a/src/utils/tools.rs b/src/utils/tools.rs index 4ef1721..7f2212c 100644 --- a/src/utils/tools.rs +++ b/src/utils/tools.rs @@ -169,8 +169,10 @@ pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) { rate_limit: None, healthcheck: None, }; - cloned.insert(Arc::from(id.as_str()), Arc::from(to_add)); - cloned.insert(Arc::from(hh.as_str()), Arc::from(x.to_owned())); + + cloned.insert(id, Arc::from(to_add)); + cloned.insert(hh, Arc::from(x.to_owned())); + // println!("CLONNED :===========> {:?}", cloned); } new_inner_map.insert(path.clone(), new_vec); } diff --git a/src/web/bgservice.rs b/src/web/bgservice.rs index 8d58b06..f58b6a6 100644 --- a/src/web/bgservice.rs +++ b/src/web/bgservice.rs @@ -83,8 +83,8 @@ impl BackgroundService for LB { clone_dashmap_into(&ss.upstreams, &self.ump_upst); let current = self.extraparams.load_full(); let mut new = (*current).clone(); - new.sticky_sessions = ss.extraparams.sticky_sessions; new.to_https = ss.extraparams.to_https; + new.sticky_sessions = ss.extraparams.sticky_sessions; new.authentication = ss.extraparams.authentication.clone(); new.rate_limit = ss.extraparams.rate_limit; self.extraparams.store(Arc::new(new)); diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index 53f5883..73b1cd6 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -5,6 +5,7 @@ use crate::web::gethosts::{GetHost, GetHostsReturHeaders}; use arc_swap::ArcSwap; use async_trait::async_trait; use axum::body::Bytes; +use dashmap::DashMap; use log::{debug, error, warn}; use once_cell::sync::Lazy; use pingora::http::{RequestHeader, ResponseHeader, StatusCode}; @@ -14,11 +15,17 @@ use pingora_core::listeners::ALPN; use pingora_core::prelude::HttpPeer; use pingora_limits::rate::Rate; use pingora_proxy::{ProxyHttp, Session}; +use sha2::{Digest, Sha256}; +use std::fmt::Write; +// use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use tokio::time::Instant; static RATE_LIMITER: Lazy = Lazy::new(|| Rate::new(Duration::from_secs(1))); +static REVERSE_STORE: Lazy> = Lazy::new(|| DashMap::new()); + +// static BARADI: String = String::new(); #[derive(Clone)] pub struct LB { @@ -32,10 +39,11 @@ pub struct LB { } pub struct Context { - backend_id: Arc, - // backend_id: Arc<(IpAddr, u16, bool)>, + backend_id: Option, + // backend_id: Option<(IpAddr, u16, bool)>, to_https: bool, - redirect_to: Arc, + sticky_sessions: bool, + redirect_to: Option, start_time: Instant, hostname: Option>, upstream_peer: Option>, @@ -48,9 +56,10 @@ impl ProxyHttp for LB { type CTX = Context; fn new_ctx(&self) -> Self::CTX { Context { - backend_id: Arc::from(""), + backend_id: None, to_https: false, - redirect_to: Arc::from(""), + sticky_sessions: false, + redirect_to: None, start_time: Instant::now(), hostname: None, upstream_peer: None, @@ -88,12 +97,11 @@ impl ProxyHttp for LB { } } } - + // println!("backend_id ===========> {:?}", backend_id); match _ctx.hostname.as_ref() { None => return Ok(false), Some(host) => { let optioninnermap = self.get_host(host, session.req_header().uri.path(), backend_id); - match optioninnermap { None => return Ok(false), Some(ref innermap) => { @@ -117,35 +125,47 @@ impl ProxyHttp for LB { _ctx.upstream_peer = optioninnermap; } } - Ok(false) } async fn upstream_peer(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result> { + // let before = crate::utils::fordebug::ALLOC_COUNT.load(Ordering::Relaxed); match ctx.hostname.as_ref() { Some(hostname) => match ctx.upstream_peer.as_ref() { Some(innermap) => { - let mut peer = Box::new(HttpPeer::new((&*innermap.address, innermap.port), innermap.is_ssl, String::new())); + let mut peer = Box::new(HttpPeer::new((&*innermap.address, innermap.port), innermap.is_ssl, hostname.to_string())); + if innermap.is_http2 { peer.options.alpn = ALPN::H2; } if innermap.is_ssl { - peer.sni = hostname.to_string(); peer.options.verify_cert = false; peer.options.verify_hostname = false; } - if ctx.to_https || innermap.to_https { + + if ctx.extraparams.to_https.unwrap_or(false) || innermap.to_https { if let Some(stream) = session.stream() { if stream.get_ssl().is_none() { if let Some(host) = ctx.hostname.as_ref() { let uri = session.req_header().uri.path_and_query().map_or("/", |pq| pq.as_str()); - let port = self.config.proxy_port_tls.unwrap_or(403); + let port = self.config.proxy_port_tls.unwrap_or(443); ctx.to_https = true; - ctx.redirect_to = Arc::from(format!("https://{}:{}{}", host, port, uri)); + let mut s = String::with_capacity(64); + write!(&mut s, "https://{}:{}{}", host, port, uri).unwrap(); + ctx.redirect_to = Some(s); } } } } - ctx.backend_id = Arc::from(format!("{}:{}:{}", innermap.address, innermap.port, innermap.is_ssl)); + + if ctx.extraparams.sticky_sessions { + let mut s = String::with_capacity(64); + write!(&mut s, "{}:{}:{}", innermap.address, innermap.port, innermap.is_ssl).unwrap(); + ctx.backend_id = Some(s); + ctx.sticky_sessions = true; + } + // let after = crate::utils::fordebug::ALLOC_COUNT.load(Ordering::Relaxed); + // println!("Allocations : {} : {:?}", after - before, ctx.backend_id); + Ok(peer) } None => { @@ -203,14 +223,27 @@ impl ProxyHttp for LB { Ok(()) } async fn response_filter(&self, session: &mut Session, _upstream_response: &mut ResponseHeader, ctx: &mut Self::CTX) -> Result<()> { - if ctx.extraparams.sticky_sessions { - if let Some(bid) = self.ump_byid.get(ctx.backend_id.as_ref()) { - let _ = _upstream_response.insert_header("set-cookie", format!("backend_id={}; Path=/; Max-Age=600; HttpOnly; SameSite=Lax", bid.address)); + if ctx.sticky_sessions { + if let Some(bid) = ctx.backend_id.clone() { + // CHECK ALLOCATIONS TOMORROW + if REVERSE_STORE.get(&*bid).is_none() { + let mut hasher = Sha256::new(); + hasher.update(bid.clone().into_bytes()); + let hash = hasher.finalize(); + let hex_hash = base16ct::lower::encode_string(&hash); + let hh = hex_hash[0..50].to_string(); + REVERSE_STORE.insert(bid.clone(), hh.clone()); + REVERSE_STORE.insert(hh.clone(), bid.clone()); + } + if let Some(tt) = REVERSE_STORE.get(&*bid) { + let _ = _upstream_response.insert_header("set-cookie", format!("backend_id={}; Path=/; Max-Age=600; HttpOnly; SameSite=Lax", tt.value())); + } } } + if ctx.to_https { let mut redirect_response = ResponseHeader::build(StatusCode::MOVED_PERMANENTLY, None)?; - redirect_response.insert_header("Location", ctx.redirect_to.as_ref())?; + redirect_response.insert_header("Location", ctx.redirect_to.clone().unwrap_or(String::from("/")))?; redirect_response.insert_header("Content-Length", "0")?; session.write_response_header(Box::new(redirect_response), false).await?; } diff --git a/src/web/start.rs b/src/web/start.rs index 9e89fd7..8f0ae41 100644 --- a/src/web/start.rs +++ b/src/web/start.rs @@ -31,8 +31,8 @@ pub fn run() { let sh_config = Arc::new(DashMap::new()); let ec_config = Arc::new(ArcSwap::from_pointee(Extraparams { - sticky_sessions: false, to_https: None, + sticky_sessions: false, authentication: DashMap::new(), rate_limit: None, }));