From c2d847f2aa03b8274b1874860eac50ada48ef1df Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Fri, 9 May 2025 17:43:03 +0200 Subject: [PATCH] Improvements and cleanup --- Cargo.lock | 1 + Cargo.toml | 2 ++ etc/upstreams.yaml | 10 +++---- src/utils/consul.rs | 3 +- src/utils/parceyaml.rs | 15 +++++++--- src/utils/structs.rs | 5 ++-- src/web/bgservice.rs | 19 ++++-------- src/web/proxyhttp.rs | 65 +++++++++++++++++++----------------------- src/web/start.rs | 40 +++++++------------------- 9 files changed, 69 insertions(+), 91 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1432a30..f0e859d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -762,6 +762,7 @@ dependencies = [ name = "gazan" version = "0.1.0" dependencies = [ + "arc-swap", "async-trait", "axum", "base16ct", diff --git a/Cargo.toml b/Cargo.toml index df34310..bc4c61a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,4 +30,6 @@ tonic = "0.13.0" sha2 = { version = "0.10.8", default-features = false } base16ct = { version = "0.2.0", features = ["alloc"] } urlencoding = "2.1.3" +arc-swap = "1.7.1" + diff --git a/etc/upstreams.yaml b/etc/upstreams.yaml index c222acd..638f4e4 100644 --- a/etc/upstreams.yaml +++ b/etc/upstreams.yaml @@ -1,6 +1,6 @@ # The file is under watch and hot reload , changes are applied immediately, no need to restart or reload provider: "file" # consul -stickysessions: false +stickysessions: true globals: headers: # Global headers, appended for all upstreams and all paths. - "Access-Control-Allow-Origin:*" @@ -22,10 +22,10 @@ consul: # If the provider is consul. Otherwise ignored services: # proxy: The hostname to access proxy server, real : The real service name in Consul - proxy: "proxy-frontend-dev-frontend-srv" real: "frontend-dev-frontend-srv" - - proxy: "proxy-gateway-test-gateway-srv" - real: "gateway-test-gateway-srv" - - proxy: "proxy-backoffice-dev-backoffice-srv" - real: "backoffice-dev-backoffice-srv" + # - proxy: "proxy-gateway-test-gateway-srv" + # real: "gateway-test-gateway-srv" + # - proxy: "proxy-backoffice-dev-backoffice-srv" + # real: "backoffice-dev-backoffice-srv" token: "8e2db809-845b-45e1-8b47-2c8356a09da0-a4370955-18c2-4d6e-a8f8-ffcc0b47be81" # Consul server access token, If Consul auth is enabled upstreams: # If provider is files. Otherwise ignored myip.netangels.net: # Hostname, or header host to access the upstream diff --git a/src/utils/consul.rs b/src/utils/consul.rs index f619a0f..008ec4e 100644 --- a/src/utils/consul.rs +++ b/src/utils/consul.rs @@ -63,13 +63,12 @@ pub async fn start(fp: String, mut toreturn: Sender) { consul: None, typecfg: "".to_string(), extraparams: config.extraparams.clone(), - globals: Default::default(), }; clone_dashmap_into(&upstreams, &prev_upstreams); clone_dashmap_into(&upstreams, &tosend.upstreams); tosend.headers = headers.clone(); - tosend.globals = config.globals.clone(); + tosend.extraparams.authentication = config.extraparams.authentication.clone(); tosend.typecfg = config.typecfg.clone(); tosend.consul = config.consul.clone(); toreturn.send(tosend).await.unwrap(); diff --git a/src/utils/parceyaml.rs b/src/utils/parceyaml.rs index f49f52f..626b018 100644 --- a/src/utils/parceyaml.rs +++ b/src/utils/parceyaml.rs @@ -12,8 +12,10 @@ pub fn load_configuration(d: &str, kind: &str) -> Option { headers: Default::default(), consul: None, typecfg: "".to_string(), - extraparams: Extraparams { stickysessions: false }, - globals: Default::default(), + extraparams: Extraparams { + stickysessions: false, + authentication: DashMap::new(), + }, }; toreturn.upstreams = UpstreamsDashMap::new(); toreturn.headers = Headers::new(); @@ -58,9 +60,9 @@ pub fn load_configuration(d: &str, kind: &str) -> Option { let cfg = DashMap::new(); if let Some(k) = globals.get("authorization") { cfg.insert("authorization".to_string(), k.to_owned()); - toreturn.globals = Some(cfg); + toreturn.extraparams.authentication = cfg; } else { - toreturn.globals = None; + toreturn.extraparams.authentication = DashMap::new(); } } match parsed.provider.as_str() { @@ -131,5 +133,10 @@ pub fn parce_main_config(path: &str) -> AppConfig { for (k, v) in cfg { reply.insert(k.to_string(), v.to_string()); } + if let Some((ip, port_str)) = cfo.config_address.split_once(':') { + if let Ok(port) = port_str.parse::() { + cfo.local_server = Option::from((ip.to_string(), port)); + } + } cfo } diff --git a/src/utils/structs.rs b/src/utils/structs.rs index f92c808..a28d281 100644 --- a/src/utils/structs.rs +++ b/src/utils/structs.rs @@ -13,9 +13,10 @@ pub struct ServiceMapping { pub real: String, } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug)] pub struct Extraparams { pub stickysessions: bool, + pub authentication: DashMap>, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -51,7 +52,6 @@ pub struct Configuration { pub consul: Option, pub typecfg: String, pub extraparams: Extraparams, - pub globals: Option>>, } #[derive(Debug, Deserialize)] @@ -66,4 +66,5 @@ pub struct AppConfig { pub proxy_address_tls: Option, pub tls_certificate: Option, pub tls_key_file: Option, + pub local_server: Option<(String, u16)>, } diff --git a/src/web/bgservice.rs b/src/web/bgservice.rs index 7fdb946..4e329a0 100644 --- a/src/web/bgservice.rs +++ b/src/web/bgservice.rs @@ -10,6 +10,7 @@ use futures::StreamExt; use log::info; use pingora_core::server::ShutdownWatch; use pingora_core::services::background::BackgroundService; +use std::sync::Arc; #[async_trait] impl BackgroundService for LB { @@ -53,19 +54,11 @@ impl BackgroundService for LB { Some(ss) => { clone_dashmap_into(&ss.upstreams, &self.ump_full); clone_dashmap_into(&ss.upstreams, &self.ump_upst); - self.proxyconf.clear(); - { - let mut write_guard = self.extraparams.write().await; - write_guard.stickysessions = ss.extraparams.stickysessions; - } - match ss.globals { - Some(globals) => { - for (k,v) in globals { - self.proxyconf.insert(k, v); - } - } - None => {} - } + let current = self.extraparams.load_full(); + let mut new = (*current).clone(); + new.stickysessions = ss.extraparams.stickysessions; + new.authentication = ss.extraparams.authentication.clone(); + self.extraparams.store(Arc::new(new)); self.headers.clear(); for entry in ss.upstreams.iter() { diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index 4d72b50..190e96e 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -1,8 +1,8 @@ use crate::utils::auth::authenticate; use crate::utils::structs::{AppConfig, Extraparams, Headers, UpstreamsDashMap, UpstreamsIdMap}; use crate::web::gethosts::GetHost; +use arc_swap::ArcSwap; use async_trait::async_trait; -use dashmap::DashMap; use log::{debug, warn}; use pingora::http::RequestHeader; use pingora::prelude::*; @@ -10,9 +10,7 @@ use pingora_core::listeners::ALPN; use pingora_core::prelude::HttpPeer; use pingora_http::ResponseHeader; use pingora_proxy::{ProxyHttp, Session}; -use std::ops::Deref; use std::sync::Arc; -use tokio::sync::RwLock; pub struct LB { pub ump_upst: Arc, @@ -20,9 +18,7 @@ pub struct LB { pub ump_byid: Arc, pub headers: Arc, pub config: Arc, - pub local: Arc<(String, u16)>, - pub proxyconf: Arc>>, - pub extraparams: Arc>, + pub extraparams: Arc>, } pub struct Context { @@ -45,17 +41,14 @@ impl ProxyHttp for LB { // session.req_header_mut().headers.insert("X-Host-Name", host.to_string().parse().unwrap()); let mut backend_id = None; - { - let read_guard = self.extraparams.read().await; - if read_guard.stickysessions { - if let Some(cookies) = session.req_header().headers.get("cookie") { - if let Ok(cookie_str) = cookies.to_str() { - for cookie in cookie_str.split(';') { - let trimmed = cookie.trim(); - if let Some(value) = trimmed.strip_prefix("backend_id=") { - backend_id = Some(value); - break; - } + if self.extraparams.load().stickysessions { + if let Some(cookies) = session.req_header().headers.get("cookie") { + if let Ok(cookie_str) = cookies.to_str() { + for cookie in cookie_str.split(';') { + let trimmed = cookie.trim(); + if let Some(value) = trimmed.strip_prefix("backend_id=") { + backend_id = Some(value); + break; } } } @@ -76,20 +69,18 @@ impl ProxyHttp for LB { } None => { warn!("Upstream not found. Host: {:?}, Path: {}", host, session.req_header().uri); - let peer = Box::new(HttpPeer::new(self.local.deref(), false, String::new())); - Ok(peer) + Ok(return_no_host(&self.config.local_server)) } } } None => { warn!("Upstream not found. Host: {:?}, Path: {}", host_name, session.req_header().uri); - let peer = Box::new(HttpPeer::new(self.local.deref(), false, String::new())); - Ok(peer) + Ok(return_no_host(&self.config.local_server)) } } } async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result { - if let Some(auth) = self.proxyconf.get("authorization") { + if let Some(auth) = self.extraparams.load().authentication.get("authorization") { let authenticated = authenticate(&auth.value(), &session); if !authenticated { let _ = session.respond_error(401).await; @@ -97,11 +88,11 @@ impl ProxyHttp for LB { return Ok(true); } }; - if session.req_header().uri.path().starts_with("/denied") { - let _ = session.respond_error(403).await; - warn!("Forbidden: {:?}, {}", session.client_addr(), session.req_header().uri.path().to_string()); - return Ok(true); - }; + // if session.req_header().uri.path().starts_with("/denied") { + // let _ = session.respond_error(403).await; + // warn!("Forbidden: {:?}, {}", session.client_addr(), session.req_header().uri.path().to_string()); + // return Ok(true); + // }; Ok(false) } async fn upstream_request_filter(&self, _session: &mut Session, _upstream_request: &mut RequestHeader, _ctx: &mut Self::CTX) -> Result<()> { @@ -128,14 +119,11 @@ impl ProxyHttp for LB { async fn response_filter(&self, _session: &mut Session, _upstream_response: &mut ResponseHeader, _ctx: &mut Self::CTX) -> Result<()> { // _upstream_response.insert_header("X-Proxied-From", "Fooooooooooooooo").unwrap(); - { - let read_guard = self.extraparams.read().await; - if read_guard.stickysessions { - let backend_id = _ctx.backend_id.clone(); - if let Some(bid) = self.ump_byid.get(&backend_id) { - // let _ = _upstream_response.insert_header("set-cookie", format!("backend {}", bid.0)); - let _ = _upstream_response.insert_header("set-cookie", format!("backend_id={}; Path=/; Max-Age=600; HttpOnly; SameSite=Lax", bid.0)); - } + if self.extraparams.load().stickysessions { + let backend_id = _ctx.backend_id.clone(); + if let Some(bid) = self.ump_byid.get(&backend_id) { + // let _ = _upstream_response.insert_header("set-cookie", format!("backend {}", bid.0)); + let _ = _upstream_response.insert_header("set-cookie", format!("backend_id={}; Path=/; Max-Age=600; HttpOnly; SameSite=Lax", bid.0)); } } @@ -191,3 +179,10 @@ fn return_header_host(session: &Session) -> Option<&str> { } } } + +fn return_no_host(inp: &Option<(String, u16)>) -> Box { + match inp { + Some(t) => Box::new(HttpPeer::new(t, false, String::new())), + None => Box::new(HttpPeer::new(("0.0.0.0", 0), false, String::new())), + } +} diff --git a/src/web/start.rs b/src/web/start.rs index cba1764..bb0636c 100644 --- a/src/web/start.rs +++ b/src/web/start.rs @@ -1,56 +1,38 @@ -use crate::utils::structs::{Extraparams, Headers, UpstreamsDashMap, UpstreamsIdMap}; +use crate::utils::structs::Extraparams; use crate::web::proxyhttp::LB; +use arc_swap::ArcSwap; use dashmap::DashMap; use log::info; use pingora_core::prelude::{background_service, Opt}; use pingora_core::server::Server; use std::env; use std::sync::Arc; -use tokio::sync::RwLock; pub fn run() { let parameters = Some(Opt::parse_args()).unwrap(); let file = parameters.conf.clone().unwrap(); let maincfg = crate::utils::parceyaml::parce_main_config(file.as_str()); - // println!("{:?}", maincfg); - - let mut local_conf: (String, u16) = ("0.0.0.0".to_string(), 0); - if let Some((ip, port_str)) = maincfg.config_address.split_once(':') { - if let Ok(port) = port_str.parse::() { - local_conf = (ip.to_string(), port); - } - } - let mut server = Server::new(parameters).unwrap(); server.bootstrap(); - let uf: UpstreamsDashMap = DashMap::new(); - let ff: UpstreamsDashMap = DashMap::new(); - let im: UpstreamsIdMap = DashMap::new(); - let hh: Headers = DashMap::new(); - let ec: Extraparams = Extraparams { stickysessions: false }; - - let uf_config = Arc::new(uf); - let ff_config = Arc::new(ff); - let im_config = Arc::new(im); - let hh_config = Arc::new(hh); - let ec_config = Arc::new(RwLock::new(ec)); + let uf_config = Arc::new(DashMap::new()); + let ff_config = Arc::new(DashMap::new()); + let im_config = Arc::new(DashMap::new()); + let hh_config = Arc::new(DashMap::new()); + let ec_config = Arc::new(ArcSwap::from_pointee(Extraparams { + stickysessions: false, + authentication: DashMap::new(), + })); let cfg = Arc::new(maincfg); - let local = Arc::new(local_conf); - - let proxyconf: DashMap> = Default::default(); - let pconf = Arc::new(proxyconf); let lb = LB { ump_upst: uf_config.clone(), ump_full: ff_config.clone(), ump_byid: im_config.clone(), config: cfg.clone(), - local: local.clone(), headers: hh_config.clone(), - proxyconf: pconf.clone(), extraparams: ec_config.clone(), }; let bg = LB { @@ -58,9 +40,7 @@ pub fn run() { ump_full: ff_config.clone(), ump_byid: im_config.clone(), config: cfg.clone(), - local: local.clone(), headers: hh_config.clone(), - proxyconf: pconf.clone(), extraparams: ec_config.clone(), };