Add sticky session support

This commit is contained in:
Ara Sadoyan
2025-04-29 19:54:44 +02:00
parent 9ba2aaebc8
commit 4e86f7b22a
6 changed files with 145 additions and 10 deletions

View File

@@ -1,6 +1,8 @@
use dashmap::DashMap;
use sha2::{Digest, Sha256};
use std::any::type_name;
use std::collections::HashSet;
use std::fmt::Write;
use std::sync::atomic::AtomicUsize;
#[allow(dead_code)]
@@ -21,9 +23,8 @@ pub fn print_upstreams(upstreams: &UpstreamsDashMap) {
}
pub type UpstreamsDashMap = DashMap<String, DashMap<String, (Vec<(String, u16, bool)>, AtomicUsize)>>;
// pub type HeadersList = DashMap<String, Vec<(String, String)>>;
pub type Headers = DashMap<String, DashMap<String, Vec<(String, String)>>>;
// pub type UpstreamMap = DashMap<String, (Vec<(String, u16)>, AtomicUsize)>;
pub type UpstreamsIdMap = DashMap<String, (String, u16, bool)>;
#[allow(dead_code)]
pub fn typeoff<T>(_: T) {
@@ -70,9 +71,7 @@ pub fn clone_dashmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsDashMap
for outer_entry in original.iter() {
let hostname = outer_entry.key();
let inner_map = outer_entry.value();
let new_inner_map = DashMap::new();
for inner_entry in inner_map.iter() {
let path = inner_entry.key();
let (vec, _) = inner_entry.value();
@@ -127,3 +126,30 @@ pub fn merge_headers(target: &DashMap<String, Vec<(String, String)>>, source: &D
target_entry.extend(global_values);
}
}
pub fn clone_idmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsIdMap) {
cloned.clear();
for outer_entry in original.iter() {
let inner_map = outer_entry.value();
let new_inner_map = DashMap::new();
for inner_entry in inner_map.iter() {
let path = inner_entry.key();
let (vec, _) = inner_entry.value();
let new_vec = vec.clone();
for x in vec.iter() {
// let id = format!("{}:{}:{}", x.0.to_string(), x.1.to_string(), x.2.to_string());
let mut id = String::new();
write!(&mut id, "{}:{}:{}", x.0, x.1, x.2).unwrap();
let mut hasher = Sha256::new();
hasher.update(id.clone().into_bytes());
let hash = hasher.finalize();
let hex_hash = base16ct::lower::encode_string(&hash);
let hh = hex_hash[0..50].to_string();
cloned.insert(id, (hh.clone(), 0000, false));
cloned.insert(hh, x.to_owned());
}
new_inner_map.insert(path.clone(), new_vec);
}
}
}

77
src/web/gethosts.rs Normal file
View File

@@ -0,0 +1,77 @@
use crate::web::proxyhttp::LB;
use async_trait::async_trait;
use std::sync::atomic::Ordering;
#[async_trait]
pub trait GetHost {
fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<(String, u16, bool)>;
fn get_header(&self, peer: &str, path: &str) -> Option<Vec<(String, String)>>;
}
#[async_trait]
impl GetHost for LB {
fn get_host(&self, peer: &str, path: &str, backend_id: Option<&str>) -> Option<(String, u16, bool)> {
if let Some(b) = backend_id {
if let Some(bb) = self.ump_byid.get(b) {
println!("BIB :===> {:?}", Some(bb.value()));
return Some(bb.value().clone());
}
}
let host_entry = self.ump_upst.get(peer)?;
let mut current_path = path.to_string();
let mut best_match: Option<(String, u16, bool)> = None;
loop {
if let Some(entry) = host_entry.get(&current_path) {
let (servers, index) = entry.value();
if !servers.is_empty() {
let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len();
best_match = Some(servers[idx].clone());
break;
}
}
if let Some(pos) = current_path.rfind('/') {
current_path.truncate(pos);
} else {
break;
}
}
if best_match.is_none() {
if let Some(entry) = host_entry.get("/") {
let (servers, index) = entry.value();
if !servers.is_empty() {
let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len();
best_match = Some(servers[idx].clone());
}
}
}
println!("BMT :===> {:?}", best_match);
best_match
}
fn get_header(&self, peer: &str, path: &str) -> Option<Vec<(String, String)>> {
let host_entry = self.headers.get(peer)?;
let mut current_path = path.to_string();
let mut best_match: Option<Vec<(String, String)>> = None;
loop {
if let Some(entry) = host_entry.get(&current_path) {
if !entry.value().is_empty() {
best_match = Some(entry.value().clone());
break;
}
}
if let Some(pos) = current_path.rfind('/') {
current_path.truncate(pos);
} else {
break;
}
}
if best_match.is_none() {
if let Some(entry) = host_entry.get("/") {
if !entry.value().is_empty() {
best_match = Some(entry.value().clone());
}
}
}
best_match
}
}

View File

@@ -24,10 +24,12 @@ pub fn run() {
let uf: UpstreamsDashMap = DashMap::new();
let ff: UpstreamsDashMap = DashMap::new();
let im: UpstreamsIdMap = DashMap::new();
let hh: Headers = DashMap::new();
let uf_config = Arc::new(uf);
let ff_config = Arc::new(ff);
let im_config = Arc::new(im);
let hh_config = Arc::new(hh);
let cfg = Arc::new(maincfg);
@@ -39,6 +41,7 @@ pub fn run() {
let lb = LB {
ump_upst: uf_config.clone(),
ump_full: ff_config.clone(),
ump_byid: im_config.clone(),
config: cfg.clone(),
local: local.clone(),
headers: hh_config.clone(),
@@ -47,6 +50,7 @@ pub fn run() {
let bg = LB {
ump_upst: uf_config.clone(),
ump_full: ff_config.clone(),
ump_byid: im_config.clone(),
config: cfg.clone(),
local: local.clone(),
headers: hh_config.clone(),
@@ -96,7 +100,6 @@ pub fn run() {
proxy.add_tcp(bind_address_http.as_str());
server.add_service(proxy);
server.add_service(bg_srvc);
// let mut prometheus_service_http = Service::prometheus_http_service();
// prometheus_service_http.add_tcp("0.0.0.0:1234");
// server.add_service(prometheus_service_http);