From 4e86f7b22a8867c9152f8b4353b86e7c356d8b18 Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Tue, 29 Apr 2025 19:54:44 +0200 Subject: [PATCH] Add sticky session support --- Cargo.lock | 28 +++++++++++++++++ Cargo.toml | 5 +-- etc/main.yaml | 6 ++-- src/utils/tools.rs | 34 +++++++++++++++++--- src/web/gethosts.rs | 77 +++++++++++++++++++++++++++++++++++++++++++++ src/web/start.rs | 5 ++- 6 files changed, 145 insertions(+), 10 deletions(-) create mode 100644 src/web/gethosts.rs diff --git a/Cargo.lock b/Cargo.lock index 5291bbb..6ecd12f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -161,6 +161,7 @@ version = "0.1.0" dependencies = [ "async-trait", "axum", + "base16ct", "base64", "dashmap", "env_logger", @@ -176,6 +177,7 @@ dependencies = [ "reqwest", "serde", "serde_yaml", + "sha2", "tokio", "tonic", ] @@ -272,6 +274,12 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "base16ct" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf" + [[package]] name = "base64" version = "0.22.1" @@ -443,6 +451,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + [[package]] name = "crc32fast" version = "1.4.2" @@ -2429,6 +2446,17 @@ dependencies = [ "rust_decimal", ] +[[package]] +name = "sha2" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "shlex" version = "1.3.0" diff --git a/Cargo.toml b/Cargo.toml index 327389a..581ce8b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ jsonwebtoken = "9.3.1" #hmac = "0.12.1" #sha2 = "0.10.8" tonic = "0.13.0" - - +#uuid = { version = "1.16.0", features = ["v4"] } +sha2 = { version = "0.10.8", default-features = false } +base16ct = { version = "0.2.0", features = ["alloc"] } diff --git a/etc/main.yaml b/etc/main.yaml index b42fa38..266bfd3 100644 --- a/etc/main.yaml +++ b/etc/main.yaml @@ -3,8 +3,7 @@ threads: 8 # Pingora default setting #user: pastor # Username for running gazan after dropping root privileges, requires program to start as root #group: pastor # Group for running gazan after dropping root privileges, requires program to start as root daemon: false # Run in background -#idle_timeout: 1000 # Pingora default setting -upstream_keepalive_pool_size: 100 # Pingora default setting +upstream_keepalive_pool_size: 500 # Pingora default setting pid_file: /tmp/gazan.pid # Pingora default setting error_log: /tmp/gazan_err.log # Pingora default setting upgrade_sock: /tmp/gazan.sock # Pingora default setting @@ -15,6 +14,7 @@ tls_certificate: etc/server.crt # Mandatory if proxy_address_tls is set tls_key_file: etc/key.pem # Mandatory if proxy_address_tls is set upstreams_conf: etc/upstreams.yaml # the location of upstreams file log_level: info # info, warn, error, debug, trace, off -hc_method: HEAD # Healthcheck method (HEAD, GET, POST are supported) +hc_method: HEAD # Healthcheck method (HEAD, GET, POST are supported) UPPERCASE hc_interval: 2 #Intervak for Healthcheck in seconds master_key: 910517d9-f9a1-48de-8826-dbadacbd84af-cb6f830e-ab16-47ec-9d8f-0090de732774 +sticky_sessions: # If key exists, the sticky_sessions will be enabled, regardless what is in value. Comment out/delete the line to disable. \ No newline at end of file diff --git a/src/utils/tools.rs b/src/utils/tools.rs index 08208c5..411fa76 100644 --- a/src/utils/tools.rs +++ b/src/utils/tools.rs @@ -1,6 +1,8 @@ use dashmap::DashMap; +use sha2::{Digest, Sha256}; use std::any::type_name; use std::collections::HashSet; +use std::fmt::Write; use std::sync::atomic::AtomicUsize; #[allow(dead_code)] @@ -21,9 +23,8 @@ pub fn print_upstreams(upstreams: &UpstreamsDashMap) { } pub type UpstreamsDashMap = DashMap, AtomicUsize)>>; -// pub type HeadersList = DashMap>; pub type Headers = DashMap>>; -// pub type UpstreamMap = DashMap, AtomicUsize)>; +pub type UpstreamsIdMap = DashMap; #[allow(dead_code)] pub fn typeoff(_: T) { @@ -70,9 +71,7 @@ pub fn clone_dashmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsDashMap for outer_entry in original.iter() { let hostname = outer_entry.key(); let inner_map = outer_entry.value(); - let new_inner_map = DashMap::new(); - for inner_entry in inner_map.iter() { let path = inner_entry.key(); let (vec, _) = inner_entry.value(); @@ -127,3 +126,30 @@ pub fn merge_headers(target: &DashMap>, source: &D target_entry.extend(global_values); } } + +pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) { + cloned.clear(); + for outer_entry in original.iter() { + let inner_map = outer_entry.value(); + let new_inner_map = DashMap::new(); + for inner_entry in inner_map.iter() { + let path = inner_entry.key(); + let (vec, _) = inner_entry.value(); + let new_vec = vec.clone(); + for x in vec.iter() { + // let id = format!("{}:{}:{}", x.0.to_string(), x.1.to_string(), x.2.to_string()); + let mut id = String::new(); + write!(&mut id, "{}:{}:{}", x.0, x.1, x.2).unwrap(); + + let mut hasher = Sha256::new(); + hasher.update(id.clone().into_bytes()); + let hash = hasher.finalize(); + let hex_hash = base16ct::lower::encode_string(&hash); + let hh = hex_hash[0..50].to_string(); + cloned.insert(id, (hh.clone(), 0000, false)); + cloned.insert(hh, x.to_owned()); + } + new_inner_map.insert(path.clone(), new_vec); + } + } +} diff --git a/src/web/gethosts.rs b/src/web/gethosts.rs new file mode 100644 index 0000000..1c4d051 --- /dev/null +++ b/src/web/gethosts.rs @@ -0,0 +1,77 @@ +use crate::web::proxyhttp::LB; +use async_trait::async_trait; +use std::sync::atomic::Ordering; + +#[async_trait] +pub trait GetHost { + fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<(String, u16, bool)>; + fn get_header(&self, peer: &str, path: &str) -> Option>; +} +#[async_trait] +impl GetHost for LB { + fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<(String, u16, bool)> { + if let Some(b) = backend_id { + if let Some(bb) = self.ump_byid.get(b) { + println!("BIB :===> {:?}", Some(bb.value())); + return Some(bb.value().clone()); + } + } + + let host_entry = self.ump_upst.get(peer)?; + let mut current_path = path.to_string(); + let mut best_match: Option<(String, u16, bool)> = None; + loop { + if let Some(entry) = host_entry.get(¤t_path) { + let (servers, index) = entry.value(); + if !servers.is_empty() { + let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len(); + best_match = Some(servers[idx].clone()); + break; + } + } + if let Some(pos) = current_path.rfind('/') { + current_path.truncate(pos); + } else { + break; + } + } + if best_match.is_none() { + if let Some(entry) = host_entry.get("/") { + let (servers, index) = entry.value(); + if !servers.is_empty() { + let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len(); + best_match = Some(servers[idx].clone()); + } + } + } + println!("BMT :===> {:?}", best_match); + best_match + } + fn get_header(&self, peer: &str, path: &str) -> Option> { + let host_entry = self.headers.get(peer)?; + let mut current_path = path.to_string(); + let mut best_match: Option> = None; + + loop { + if let Some(entry) = host_entry.get(¤t_path) { + if !entry.value().is_empty() { + best_match = Some(entry.value().clone()); + break; + } + } + if let Some(pos) = current_path.rfind('/') { + current_path.truncate(pos); + } else { + break; + } + } + if best_match.is_none() { + if let Some(entry) = host_entry.get("/") { + if !entry.value().is_empty() { + best_match = Some(entry.value().clone()); + } + } + } + best_match + } +} diff --git a/src/web/start.rs b/src/web/start.rs index 4a0bbb8..a022517 100644 --- a/src/web/start.rs +++ b/src/web/start.rs @@ -24,10 +24,12 @@ pub fn run() { let uf: UpstreamsDashMap = DashMap::new(); let ff: UpstreamsDashMap = DashMap::new(); + let im: UpstreamsIdMap = DashMap::new(); let hh: Headers = DashMap::new(); let uf_config = Arc::new(uf); let ff_config = Arc::new(ff); + let im_config = Arc::new(im); let hh_config = Arc::new(hh); let cfg = Arc::new(maincfg); @@ -39,6 +41,7 @@ pub fn run() { let lb = LB { ump_upst: uf_config.clone(), ump_full: ff_config.clone(), + ump_byid: im_config.clone(), config: cfg.clone(), local: local.clone(), headers: hh_config.clone(), @@ -47,6 +50,7 @@ pub fn run() { let bg = LB { ump_upst: uf_config.clone(), ump_full: ff_config.clone(), + ump_byid: im_config.clone(), config: cfg.clone(), local: local.clone(), headers: hh_config.clone(), @@ -96,7 +100,6 @@ pub fn run() { proxy.add_tcp(bind_address_http.as_str()); server.add_service(proxy); server.add_service(bg_srvc); - // let mut prometheus_service_http = Service::prometheus_http_service(); // prometheus_service_http.add_tcp("0.0.0.0:1234"); // server.add_service(prometheus_service_http);