diff --git a/etc/stresstest.yaml b/etc/stresstest.yaml new file mode 100644 index 0000000..798d746 --- /dev/null +++ b/etc/stresstest.yaml @@ -0,0 +1,19 @@ +provider: "file" +globals: + headers: + - "Access-Control-Allow-Origin:*" + - "Access-Control-Allow-Methods:POST, GET, OPTIONS" + - "Access-Control-Max-Age:86400" + - "X-Custom-Header:Something Special" +upstreams: + myip.netangels.net: + paths: + "/": + ssl: false + headers: + - "X-Proxy-From:Gazan" + servers: + - "192.168.221.213:8000" + - "192.168.221.214:8000" + - "192.168.221.210:8000" + - "192.168.221.212:8000" \ No newline at end of file diff --git a/src/web.rs b/src/web.rs index 42b6bdd..6172525 100644 --- a/src/web.rs +++ b/src/web.rs @@ -1,3 +1,5 @@ +pub mod bgservice; +pub mod gethosts; pub mod proxyhttp; pub mod start; pub mod webserver; diff --git a/src/web/bgservice.rs b/src/web/bgservice.rs new file mode 100644 index 0000000..0e27129 --- /dev/null +++ b/src/web/bgservice.rs @@ -0,0 +1,106 @@ +use crate::utils::discovery::{APIUpstreamProvider, ConsulProvider, Discovery, FromFileProvider}; +use crate::utils::parceyaml::Configuration; +use crate::utils::tools::*; +use crate::utils::*; +use crate::web::proxyhttp::LB; +use async_trait::async_trait; +use dashmap::DashMap; +use futures::channel::mpsc; +use futures::StreamExt; +use log::{error, info}; +use pingora_core::server::ShutdownWatch; +use pingora_core::services::background::BackgroundService; + +#[async_trait] +impl BackgroundService for LB { + async fn start(&self, mut shutdown: ShutdownWatch) { + info!("Starting background service"); + let (tx, mut rx) = mpsc::channel::(0); + + let from_file = self.config.get("upstreams_conf"); + match from_file { + Some(from_file) => { + let tx_file = tx.clone(); + let tx_consul = tx.clone(); + + let file_load = FromFileProvider { path: from_file.to_string() }; + let consul_load = ConsulProvider { path: from_file.to_string() }; + + let _ = tokio::spawn(async move { file_load.start(tx_file).await }); + let _ = tokio::spawn(async move { consul_load.start(tx_consul).await }); + } + None => { + error!("Can't read config file"); + } + } + let config_address = self.config.get("config_address"); + let masterkey = self.config.get("master_key").unwrap(); + match config_address { + Some(config_address) => { + let api_load = APIUpstreamProvider { + address: config_address.to_string(), + masterkey: masterkey.value().to_string(), + }; + let tx_api = tx.clone(); + let _ = tokio::spawn(async move { api_load.start(tx_api).await }); + } + None => { + error!("Can't read config file"); + } + } + + let uu = self.ump_upst.clone(); + let ff = self.ump_full.clone(); + let im = self.ump_byid.clone(); + let (hc_method, hc_interval) = (self.config.get("hc_method").unwrap().clone(), self.config.get("hc_interval").unwrap().clone()); + let _ = tokio::spawn(async move { healthcheck::hc2(uu, ff, im, (&*hc_method.to_string(), hc_interval.to_string().parse().unwrap())).await }); + + loop { + tokio::select! { + _ = shutdown.changed() => { + break; + } + val = rx.next() => { + match val { + Some(ss) => { + clone_dashmap_into(&ss.upstreams, &self.ump_full); + clone_dashmap_into(&ss.upstreams, &self.ump_upst); + self.proxyconf.clear(); + match ss.globals { + Some(globals) => { + for (k,v) in globals { + self.proxyconf.insert(k, v); + } + } + None => {} + } + self.headers.clear(); + + for entry in ss.upstreams.iter() { + let global_key = entry.key().clone(); + let global_values = DashMap::new(); + let mut target_entry = ss.headers.entry(global_key).or_insert_with(DashMap::new); + target_entry.extend(global_values); + self.headers.insert(target_entry.key().to_owned(), target_entry.value().to_owned()); + } + + for path in ss.headers.iter() { + let path_key = path.key().clone(); + let path_headers = path.value().clone(); + self.headers.insert(path_key.clone(), path_headers); + if let Some(global_headers) = ss.headers.get("GLOBAL_HEADERS") { + if let Some(existing_headers) = self.headers.get_mut(&path_key) { + merge_headers(&existing_headers, &global_headers); + } + } + } + info!("Upstreams list is changed, updating to:"); + print_upstreams(&self.ump_full); + } + None => {} + } + } + } + } + } +} diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index eeb2d26..040f8d4 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -1,237 +1,81 @@ -use crate::utils::discovery::{APIUpstreamProvider, ConsulProvider, Discovery, FromFileProvider}; +use crate::utils::auth::authenticate; use crate::utils::tools::*; -use crate::utils::*; +use crate::web::gethosts::GetHost; use async_trait::async_trait; use dashmap::DashMap; -use futures::channel::mpsc; -use futures::StreamExt; -use log::{debug, error, info, warn}; +use log::{debug, warn}; use pingora::http::RequestHeader; use pingora::prelude::*; -use pingora_core::prelude::HttpPeer; -use pingora_core::server::ShutdownWatch; -use pingora_core::services::background::BackgroundService; -use pingora_http::ResponseHeader; - -use crate::utils::auth::authenticate; -use crate::utils::parceyaml::Configuration; use pingora_core::listeners::ALPN; +use pingora_core::prelude::HttpPeer; +use pingora_http::ResponseHeader; use pingora_proxy::{ProxyHttp, Session}; use std::ops::Deref; -use std::sync::atomic::Ordering; use std::sync::Arc; -// use http_auth_basic::Credentials; pub struct LB { pub ump_upst: Arc, pub ump_full: Arc, + pub ump_byid: Arc, pub headers: Arc, pub config: Arc>, pub local: Arc<(String, u16)>, pub proxyconf: Arc>>, } -#[async_trait] -impl BackgroundService for LB { - async fn start(&self, mut shutdown: ShutdownWatch) { - info!("Starting background service"); - let (tx, mut rx) = mpsc::channel::(0); - - let from_file = self.config.get("upstreams_conf"); - match from_file { - Some(from_file) => { - let tx_file = tx.clone(); - let tx_consul = tx.clone(); - - let file_load = FromFileProvider { path: from_file.to_string() }; - let consul_load = ConsulProvider { path: from_file.to_string() }; - - let _ = tokio::spawn(async move { file_load.start(tx_file).await }); - let _ = tokio::spawn(async move { consul_load.start(tx_consul).await }); - } - None => { - error!("Can't read config file"); - } - } - let config_address = self.config.get("config_address"); - let masterkey = self.config.get("master_key").unwrap(); - match config_address { - Some(config_address) => { - let api_load = APIUpstreamProvider { - address: config_address.to_string(), - masterkey: masterkey.value().to_string(), - }; - let tx_api = tx.clone(); - let _ = tokio::spawn(async move { api_load.start(tx_api).await }); - } - None => { - error!("Can't read config file"); - } - } - - let uu = self.ump_upst.clone(); - let ff = self.ump_full.clone(); - let (hc_method, hc_interval) = (self.config.get("hc_method").unwrap().clone(), self.config.get("hc_interval").unwrap().clone()); - let _ = tokio::spawn(async move { healthcheck::hc2(uu, ff, (&*hc_method.to_string(), hc_interval.to_string().parse().unwrap())).await }); - - loop { - tokio::select! { - _ = shutdown.changed() => { - break; - } - val = rx.next() => { - match val { - Some(ss) => { - clone_dashmap_into(&ss.upstreams, &self.ump_full); - clone_dashmap_into(&ss.upstreams, &self.ump_upst); - self.proxyconf.clear(); - match ss.globals { - Some(globals) => { - for (k,v) in globals { - self.proxyconf.insert(k, v); - } - } - None => {} - } - self.headers.clear(); - - for entry in ss.upstreams.iter() { - let global_key = entry.key().clone(); - let global_values = DashMap::new(); - let mut target_entry = ss.headers.entry(global_key).or_insert_with(DashMap::new); - target_entry.extend(global_values); - self.headers.insert(target_entry.key().to_owned(), target_entry.value().to_owned()); - } - - for path in ss.headers.iter() { - let path_key = path.key().clone(); - let path_headers = path.value().clone(); - self.headers.insert(path_key.clone(), path_headers); - if let Some(global_headers) = ss.headers.get("GLOBAL_HEADERS") { - if let Some(existing_headers) = self.headers.get_mut(&path_key) { - merge_headers(&existing_headers, &global_headers); - } - } - } - info!("Upstreams list is changed, updating to:"); - print_upstreams(&self.ump_full); - } - None => {} - } - } - } - } - } -} - -#[async_trait] -pub trait GetHost { - async fn get_host(&self, peer: &str, path: &str, upgrade: bool) -> Option<(String, u16, bool)>; - async fn get_header(&self, peer: &str, path: &str) -> Option>; -} -#[async_trait] -impl GetHost for LB { - /* - async fn get_host(&self, peer: &str, path: &str, _upgrade: bool) -> Option<(String, u16, bool)> { - let host_entry = self.ump_upst.get(peer); - match host_entry { - Some(host_entry) => { - let upstream = if let Some(entry) = host_entry.get(path) { - let (servers, index) = entry.value(); - if servers.is_empty() { - return None; - } - let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len(); - Some(servers[idx].clone()) - } else { - None - }; - upstream - } - None => None, - } - } - */ - async fn get_host(&self, peer: &str, path: &str, _upgrade: bool) -> Option<(String, u16, bool)> { - // println!(" ==> {:?}", self.config); - let host_entry = self.ump_upst.get(peer)?; - let mut current_path = path.to_string(); - let mut best_match: Option<(String, u16, bool)> = None; - loop { - if let Some(entry) = host_entry.get(¤t_path) { - let (servers, index) = entry.value(); - if !servers.is_empty() { - let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len(); - best_match = Some(servers[idx].clone()); - break; - } - } - if let Some(pos) = current_path.rfind('/') { - current_path.truncate(pos); - } else { - break; - } - } - if best_match.is_none() { - if let Some(entry) = host_entry.get("/") { - let (servers, index) = entry.value(); - if !servers.is_empty() { - let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len(); - best_match = Some(servers[idx].clone()); - } - } - } - best_match - } - async fn get_header(&self, peer: &str, path: &str) -> Option> { - let host_entry = self.headers.get(peer)?; - let mut current_path = path.to_string(); - let mut best_match: Option> = None; - - loop { - if let Some(entry) = host_entry.get(¤t_path) { - if !entry.value().is_empty() { - best_match = Some(entry.value().clone()); - break; - } - } - if let Some(pos) = current_path.rfind('/') { - current_path.truncate(pos); - } else { - break; - } - } - if best_match.is_none() { - if let Some(entry) = host_entry.get("/") { - if !entry.value().is_empty() { - best_match = Some(entry.value().clone()); - } - } - } - best_match - } +pub struct MyCtx { + backend_id: String, } #[async_trait] impl ProxyHttp for LB { - type CTX = (); - fn new_ctx(&self) -> Self::CTX {} + // type CTX = (); + // fn new_ctx(&self) -> Self::CTX {} + type CTX = MyCtx; + fn new_ctx(&self) -> Self::CTX { + MyCtx { backend_id: String::new() } + } async fn upstream_peer(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result> { - // println!("upstream_peer called for {:?}", session.req_header()); - // println!("=============================================="); let host_name = return_header_host(&session); + match host_name { Some(host) => { // session.req_header_mut().headers.insert("X-Host-Name", host.to_string().parse().unwrap()); - let ddr = self.get_host(host, host, session.is_upgrade_req()); + // let mut backend_id = Option::<&str>::None; + // if let Some(_) = self.config.get("sticky_sessions") { + // if let Some(cookies) = session.req_header().headers.get("cookie") { + // let cookie_str = cookies.to_str().unwrap_or("").split(" ").collect::>()[1]; + // backend_id = Some(cookie_str); + // } + // } - match ddr.await { + let mut backend_id = None; + + if let Some(_) = self.config.get("sticky_sessions") { + if let Some(cookies) = session.req_header().headers.get("cookie") { + if let Ok(cookie_str) = cookies.to_str() { + for cookie in cookie_str.split(';') { + let trimmed = cookie.trim(); + if let Some(value) = trimmed.strip_prefix("backend_id=") { + backend_id = Some(value); + break; + } + } + } + } + } + + let ddr = self.get_host(host, host, backend_id); + + match ddr { Some((host, port, ssl)) => { - let mut peer = Box::new(HttpPeer::new((host, port), ssl, String::new())); + // let mut peer = Box::new(HttpPeer::new((host, port), ssl, String::new())); + let mut peer = Box::new(HttpPeer::new((host.clone(), port.clone()), ssl, String::new())); if session.is_http2() { peer.options.alpn = ALPN::H2; } + _ctx.backend_id = format!("{}:{}:{}", host.clone(), port.clone(), ssl); Ok(peer) } None => { @@ -248,10 +92,7 @@ impl ProxyHttp for LB { } } } - async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result - where - Self::CTX: Send + Sync, - { + async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result { if let Some(auth) = self.proxyconf.get("authorization") { let authenticated = authenticate(&auth.value(), &session); if !authenticated { @@ -267,10 +108,7 @@ impl ProxyHttp for LB { }; Ok(false) } - async fn upstream_request_filter(&self, _session: &mut Session, _upstream_request: &mut RequestHeader, _ctx: &mut Self::CTX) -> Result<()> - where - Self::CTX: Send + Sync, - { + async fn upstream_request_filter(&self, _session: &mut Session, _upstream_request: &mut RequestHeader, _ctx: &mut Self::CTX) -> Result<()> { let clientip = _session.client_addr(); match clientip { Some(ip) => { @@ -291,12 +129,17 @@ impl ProxyHttp for LB { Ok(()) } - async fn response_filter(&self, _session: &mut Session, _upstream_response: &mut ResponseHeader, _ctx: &mut Self::CTX) -> Result<()> - where - Self::CTX: Send + Sync, - { + async fn response_filter(&self, _session: &mut Session, _upstream_response: &mut ResponseHeader, _ctx: &mut Self::CTX) -> Result<()> { // _upstream_response.insert_header("X-Proxied-From", "Fooooooooooooooo").unwrap(); + if let Some(_) = self.config.get("sticky_sessions") { + let backend_id = _ctx.backend_id.clone(); + if let Some(bid) = self.ump_byid.get(&backend_id) { + // let _ = _upstream_response.insert_header("set-cookie", format!("backend {}", bid.0)); + let _ = _upstream_response.insert_header("set-cookie", format!("backend_id={}; Path=/; Max-Age=600; HttpOnly; SameSite=Lax", bid.0)); + } + } + let host_name = return_header_host(&_session); match host_name { Some(host) => { @@ -305,7 +148,7 @@ impl ProxyHttp for LB { let split_header = host_header.split_once(':'); match split_header { Some(sh) => { - let yoyo = self.get_header(sh.0, path).await; + let yoyo = self.get_header(sh.0, path); for k in yoyo.iter() { for t in k.iter() { _upstream_response.insert_header(t.0.clone(), t.1.clone()).unwrap(); @@ -313,7 +156,7 @@ impl ProxyHttp for LB { } } None => { - let yoyo = self.get_header(host_header, path).await; + let yoyo = self.get_header(host_header, path); for k in yoyo.iter() { for t in k.iter() { _upstream_response.insert_header(t.0.clone(), t.1.clone()).unwrap();