From f3bca5a0011efca7af48d2d8f6172da422e94921 Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Mon, 24 Mar 2025 18:54:38 +0100 Subject: [PATCH] Added inject response header functionality --- etc/upstreams.yaml | 25 ++++++++++++++++++ src/utils/discovery.rs | 9 ++++--- src/utils/parceyaml.rs | 18 ++++++++++--- src/utils/tools.rs | 2 ++ src/web/proxyhttp.rs | 59 +++++++++++++++++++++++++++++++++++++----- src/web/start.rs | 6 +++++ src/web/webserver.rs | 2 +- 7 files changed, 106 insertions(+), 15 deletions(-) diff --git a/etc/upstreams.yaml b/etc/upstreams.yaml index 46924eb..f4595d8 100644 --- a/etc/upstreams.yaml +++ b/etc/upstreams.yaml @@ -3,6 +3,10 @@ upstreams: paths: "/": ssl: false + headers: + - "Access-Control-Allow-Origin:*" + - "Access-Control-Allow-Methods:POST, GET, OPTIONS" + - "Access-Control-Max-Age:86400" servers: - "127.0.0.1:8000" - "127.0.0.2:8000" @@ -10,27 +14,42 @@ upstreams: - "127.0.0.4:8000" "/ping": ssl: false + headers: + - "X-Some-Thing:Yaaaaaaaaaaaaaaa" + - "X-Prox-From:Hopaaaaaaaaaaaar" servers: - "127.0.0.1:8000" - "127.0.0.2:8000" "/draw": ssl: false + headers: + - "X-Some-Thing:Yaaaaaaaaaaaaaaa" + - "X-Prox-From:Hopaaaaaaaaaaaar" servers: - "192.168.1.1:8000" polo.netangels.net: paths: "/": ssl: false + headers: + - "X-Some-Thing:Yaaaaaaaaaaaaaaa" + - "X-Prox-From:Hopaaaaaaaaaaaar" servers: - "192.168.1.10:8000" "/ws": ssl: false + headers: + - "X-Some-Thing:Yaaaaaaaaaaaaaaa" + - "X-Prox-From:Hopaaaaaaaaaaaar" servers: - "192.168.1.1:8000" glop.netangels.net: paths: "/": ssl: false + headers: + - "X-Some-Thing:Yaaaaaaaaaaaaaaa" + - "X-Prox-From:Hopaaaaaaaaaaaar" servers: - "192.168.1.10:8000" - "192.168.1.10:8000" @@ -38,12 +57,18 @@ upstreams: paths: "/": ssl: true + headers: + - "X-Some-Thing:Yaaaaaaaaaaaaaaa" + - "X-Prox-From:Hopaaaaaaaaaaaar" servers: - "apt.netangels.net:443" 127.0.0.1: paths: "/camerastatus": ssl: false + headers: + - "X-Some-Thing:Yaaaaaaaaaaaaaaa" + - "X-Prox-From:Hopaaaaaaaaaaaar" servers: - "192.168.1.5:8080" # 127.0.0.1: diff --git a/src/utils/discovery.rs b/src/utils/discovery.rs index 5f8b5ba..271426f 100644 --- a/src/utils/discovery.rs +++ b/src/utils/discovery.rs @@ -21,29 +21,30 @@ pub struct APIUpstreamProvider { #[async_trait] pub trait Discovery { - async fn start(&self, tx: Sender); + async fn start(&self, tx: Sender<(UpstreamsDashMap, Headers)>); } #[async_trait] impl Discovery for APIUpstreamProvider { - async fn start(&self, toreturn: Sender) { + async fn start(&self, toreturn: Sender<(UpstreamsDashMap, Headers)>) { webserver::run_server(self.address.clone(), toreturn).await; } } #[async_trait] impl Discovery for FromFileProvider { - async fn start(&self, tx: Sender) { + async fn start(&self, tx: Sender<(UpstreamsDashMap, Headers)>) { tokio::spawn(watch_file(self.path.clone(), tx.clone())); } } -pub async fn watch_file(fp: String, mut toreturn: Sender) { +pub async fn watch_file(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)>) { sleep(Duration::from_millis(50)).await; // For having nice logs :-) let file_path = fp.as_str(); let parent_dir = Path::new(file_path).parent().unwrap(); let (local_tx, mut local_rx) = tokio::sync::mpsc::channel::>(1); info!("Watching for changes in {:?}", parent_dir); let snd = load_yaml_to_dashmap(file_path, "filepath"); + match snd { Some(snd) => { toreturn.send(snd).await.unwrap(); diff --git a/src/utils/parceyaml.rs b/src/utils/parceyaml.rs index 85f6edd..df55e12 100644 --- a/src/utils/parceyaml.rs +++ b/src/utils/parceyaml.rs @@ -21,10 +21,12 @@ struct HostConfig { struct PathConfig { ssl: bool, servers: Vec, + headers: Vec, } -pub fn load_yaml_to_dashmap(d: &str, kind: &str) -> Option { +pub fn load_yaml_to_dashmap(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Headers)> { let dashmap = UpstreamsDashMap::new(); + let headers = DashMap::new(); let mut yaml_data = d.to_string(); match kind { "filepath" => { @@ -35,7 +37,7 @@ pub fn load_yaml_to_dashmap(d: &str, kind: &str) -> Option { } Err(e) => { error!("Reading: {}: {:?}", d, e.to_string()); - warn!("Running with empty upstreams list, chane it via API"); + warn!("Running with empty upstreams list, update it via API"); return None; } }; @@ -51,8 +53,17 @@ pub fn load_yaml_to_dashmap(d: &str, kind: &str) -> Option { Ok(parsed) => { for (hostname, host_config) in parsed.upstreams { let path_map = DashMap::new(); + let header_list = DashMap::new(); for (path, path_config) in host_config.paths { let mut server_list = Vec::new(); + let mut hl = Vec::new(); + for header in path_config.headers.iter().by_ref() { + if let Some((key, val)) = header.split_once(':') { + hl.push((key.to_string(), val.to_string())); + } + } + header_list.insert(path.clone(), hl); + // println!(" {:?} == {:?}", hostname, header_list); for server in path_config.servers { if let Some((ip, port_str)) = server.split_once(':') { if let Ok(port) = port_str.parse::() { @@ -62,9 +73,10 @@ pub fn load_yaml_to_dashmap(d: &str, kind: &str) -> Option { } path_map.insert(path, (server_list, AtomicUsize::new(0))); } + headers.insert(hostname.clone(), header_list); dashmap.insert(hostname, path_map); } - Some(dashmap) + Some((dashmap, headers)) } Err(e) => { error!("Failed to parse upstreams file: {}", e); diff --git a/src/utils/tools.rs b/src/utils/tools.rs index 47ee52a..364bbf3 100644 --- a/src/utils/tools.rs +++ b/src/utils/tools.rs @@ -21,6 +21,8 @@ pub fn print_upstreams(upstreams: &UpstreamsDashMap) { } pub type UpstreamsDashMap = DashMap, AtomicUsize)>>; +// pub type HeadersList = DashMap>; +pub type Headers = DashMap>>; // pub type UpstreamMap = DashMap, AtomicUsize)>; #[allow(dead_code)] diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index e0e6cbb..dbb22de 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -19,6 +19,7 @@ use std::sync::Arc; pub struct LB { pub ump_upst: Arc, pub ump_full: Arc, + pub headers: Arc, pub config: Arc>, pub local: Arc<(String, u16)>, } @@ -27,7 +28,7 @@ pub struct LB { impl BackgroundService for LB { async fn start(&self, mut shutdown: ShutdownWatch) { info!("Starting background service"); - let (tx, mut rx) = mpsc::channel::(0); + let (tx, mut rx) = mpsc::channel::<(UpstreamsDashMap, Headers)>(0); let from_file = self.config.get("upstreams_conf"); match from_file { @@ -67,10 +68,13 @@ impl BackgroundService for LB { val = rx.next() => { match val { Some(ss) => { - let foo = compare_dashmaps(&*self.ump_full, &ss); + let foo = compare_dashmaps(&*self.ump_full, &ss.0); if !foo { - clone_dashmap_into(&ss, &self.ump_full); - clone_dashmap_into(&ss, &self.ump_upst); + clone_dashmap_into(&ss.0, &self.ump_full); + clone_dashmap_into(&ss.0, &self.ump_upst); + for (k,v) in ss.1 { + self.headers.insert(k,v); + } print_upstreams(&self.ump_full); } } @@ -85,6 +89,7 @@ impl BackgroundService for LB { #[async_trait] pub trait GetHost { async fn get_host(&self, peer: &str, path: &str, upgrade: bool) -> Option<(String, u16, bool)>; + async fn get_header(&self, peer: &str, path: &str) -> Option>; } #[async_trait] impl GetHost for LB { @@ -111,8 +116,6 @@ impl GetHost for LB { */ async fn get_host(&self, peer: &str, path: &str, _upgrade: bool) -> Option<(String, u16, bool)> { let host_entry = self.ump_upst.get(peer)?; - - // Check if an exact match exists first let mut current_path = path.to_string(); let mut best_match: Option<(String, u16, bool)> = None; @@ -142,6 +145,33 @@ impl GetHost for LB { } best_match } + async fn get_header(&self, peer: &str, path: &str) -> Option> { + let host_entry = self.headers.get(peer)?; + let mut current_path = path.to_string(); + let mut best_match: Option> = None; + + loop { + if let Some(entry) = host_entry.get(¤t_path) { + if !entry.value().is_empty() { + best_match = Some(entry.value().clone()); + break; + } + } + if let Some(pos) = current_path.rfind('/') { + current_path.truncate(pos); + } else { + break; + } + } + if best_match.is_none() { + if let Some(entry) = host_entry.get("/") { + if !entry.value().is_empty() { + best_match = Some(entry.value().clone()); + } + } + } + best_match + } } #[async_trait] @@ -213,7 +243,22 @@ impl ProxyHttp for LB { where Self::CTX: Send + Sync, { - _upstream_response.insert_header("X-Proxied-From", "Fooooooooooooooo").unwrap(); + // _upstream_response.insert_header("X-Proxied-From", "Fooooooooooooooo").unwrap(); + + let host = _session.req_header().headers.get("Host"); + match host { + Some(host) => { + let path = _session.req_header().uri.path(); + let yoyo = self.get_header(host.to_str().unwrap(), path).await; + + for k in yoyo.iter() { + for t in k.iter() { + _upstream_response.insert_header(t.0.clone(), t.1.clone()).unwrap(); + } + } + } + None => {} + } Ok(()) } diff --git a/src/web/start.rs b/src/web/start.rs index 1294407..30c9fdf 100644 --- a/src/web/start.rs +++ b/src/web/start.rs @@ -23,8 +23,12 @@ pub fn run() { let uf: UpstreamsDashMap = DashMap::new(); let ff: UpstreamsDashMap = DashMap::new(); + let hh: Headers = DashMap::new(); + let uf_config = Arc::new(uf); let ff_config = Arc::new(ff); + let hh_config = Arc::new(hh); + let cfg = Arc::new(maincfg); let local = Arc::new(local_conf); @@ -33,12 +37,14 @@ pub fn run() { ump_full: ff_config.clone(), config: cfg.clone(), local: local.clone(), + headers: hh_config.clone(), }; let bg = LB { ump_upst: uf_config.clone(), ump_full: ff_config.clone(), config: cfg.clone(), local: local.clone(), + headers: hh_config.clone(), }; // env_logger::Env::new(); diff --git a/src/web/webserver.rs b/src/web/webserver.rs index a6f06c7..092c8cf 100644 --- a/src/web/webserver.rs +++ b/src/web/webserver.rs @@ -10,7 +10,7 @@ use log::info; use tokio::net::TcpListener; #[allow(unused_mut)] -pub async fn run_server(bindaddress: String, mut toreturn: Sender) { +pub async fn run_server(bindaddress: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)>) { let mut tr = toreturn.clone(); let app = Router::new() .route("/{*wildcard}", get(getconfig))