Added inject response header functionality

This commit is contained in:
Ara Sadoyan
2025-03-24 18:54:38 +01:00
parent 2c87a330ef
commit f3bca5a001
7 changed files with 106 additions and 15 deletions

View File

@@ -3,6 +3,10 @@ upstreams:
paths: paths:
"/": "/":
ssl: false ssl: false
headers:
- "Access-Control-Allow-Origin:*"
- "Access-Control-Allow-Methods:POST, GET, OPTIONS"
- "Access-Control-Max-Age:86400"
servers: servers:
- "127.0.0.1:8000" - "127.0.0.1:8000"
- "127.0.0.2:8000" - "127.0.0.2:8000"
@@ -10,27 +14,42 @@ upstreams:
- "127.0.0.4:8000" - "127.0.0.4:8000"
"/ping": "/ping":
ssl: false ssl: false
headers:
- "X-Some-Thing:Yaaaaaaaaaaaaaaa"
- "X-Prox-From:Hopaaaaaaaaaaaar"
servers: servers:
- "127.0.0.1:8000" - "127.0.0.1:8000"
- "127.0.0.2:8000" - "127.0.0.2:8000"
"/draw": "/draw":
ssl: false ssl: false
headers:
- "X-Some-Thing:Yaaaaaaaaaaaaaaa"
- "X-Prox-From:Hopaaaaaaaaaaaar"
servers: servers:
- "192.168.1.1:8000" - "192.168.1.1:8000"
polo.netangels.net: polo.netangels.net:
paths: paths:
"/": "/":
ssl: false ssl: false
headers:
- "X-Some-Thing:Yaaaaaaaaaaaaaaa"
- "X-Prox-From:Hopaaaaaaaaaaaar"
servers: servers:
- "192.168.1.10:8000" - "192.168.1.10:8000"
"/ws": "/ws":
ssl: false ssl: false
headers:
- "X-Some-Thing:Yaaaaaaaaaaaaaaa"
- "X-Prox-From:Hopaaaaaaaaaaaar"
servers: servers:
- "192.168.1.1:8000" - "192.168.1.1:8000"
glop.netangels.net: glop.netangels.net:
paths: paths:
"/": "/":
ssl: false ssl: false
headers:
- "X-Some-Thing:Yaaaaaaaaaaaaaaa"
- "X-Prox-From:Hopaaaaaaaaaaaar"
servers: servers:
- "192.168.1.10:8000" - "192.168.1.10:8000"
- "192.168.1.10:8000" - "192.168.1.10:8000"
@@ -38,12 +57,18 @@ upstreams:
paths: paths:
"/": "/":
ssl: true ssl: true
headers:
- "X-Some-Thing:Yaaaaaaaaaaaaaaa"
- "X-Prox-From:Hopaaaaaaaaaaaar"
servers: servers:
- "apt.netangels.net:443" - "apt.netangels.net:443"
127.0.0.1: 127.0.0.1:
paths: paths:
"/camerastatus": "/camerastatus":
ssl: false ssl: false
headers:
- "X-Some-Thing:Yaaaaaaaaaaaaaaa"
- "X-Prox-From:Hopaaaaaaaaaaaar"
servers: servers:
- "192.168.1.5:8080" - "192.168.1.5:8080"
# 127.0.0.1: # 127.0.0.1:

View File

@@ -21,29 +21,30 @@ pub struct APIUpstreamProvider {
#[async_trait] #[async_trait]
pub trait Discovery { pub trait Discovery {
async fn start(&self, tx: Sender<UpstreamsDashMap>); async fn start(&self, tx: Sender<(UpstreamsDashMap, Headers)>);
} }
#[async_trait] #[async_trait]
impl Discovery for APIUpstreamProvider { impl Discovery for APIUpstreamProvider {
async fn start(&self, toreturn: Sender<UpstreamsDashMap>) { async fn start(&self, toreturn: Sender<(UpstreamsDashMap, Headers)>) {
webserver::run_server(self.address.clone(), toreturn).await; webserver::run_server(self.address.clone(), toreturn).await;
} }
} }
#[async_trait] #[async_trait]
impl Discovery for FromFileProvider { impl Discovery for FromFileProvider {
async fn start(&self, tx: Sender<UpstreamsDashMap>) { async fn start(&self, tx: Sender<(UpstreamsDashMap, Headers)>) {
tokio::spawn(watch_file(self.path.clone(), tx.clone())); tokio::spawn(watch_file(self.path.clone(), tx.clone()));
} }
} }
pub async fn watch_file(fp: String, mut toreturn: Sender<UpstreamsDashMap>) { pub async fn watch_file(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)>) {
sleep(Duration::from_millis(50)).await; // For having nice logs :-) sleep(Duration::from_millis(50)).await; // For having nice logs :-)
let file_path = fp.as_str(); let file_path = fp.as_str();
let parent_dir = Path::new(file_path).parent().unwrap(); let parent_dir = Path::new(file_path).parent().unwrap();
let (local_tx, mut local_rx) = tokio::sync::mpsc::channel::<notify::Result<Event>>(1); let (local_tx, mut local_rx) = tokio::sync::mpsc::channel::<notify::Result<Event>>(1);
info!("Watching for changes in {:?}", parent_dir); info!("Watching for changes in {:?}", parent_dir);
let snd = load_yaml_to_dashmap(file_path, "filepath"); let snd = load_yaml_to_dashmap(file_path, "filepath");
match snd { match snd {
Some(snd) => { Some(snd) => {
toreturn.send(snd).await.unwrap(); toreturn.send(snd).await.unwrap();

View File

@@ -21,10 +21,12 @@ struct HostConfig {
struct PathConfig { struct PathConfig {
ssl: bool, ssl: bool,
servers: Vec<String>, servers: Vec<String>,
headers: Vec<String>,
} }
pub fn load_yaml_to_dashmap(d: &str, kind: &str) -> Option<UpstreamsDashMap> { pub fn load_yaml_to_dashmap(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Headers)> {
let dashmap = UpstreamsDashMap::new(); let dashmap = UpstreamsDashMap::new();
let headers = DashMap::new();
let mut yaml_data = d.to_string(); let mut yaml_data = d.to_string();
match kind { match kind {
"filepath" => { "filepath" => {
@@ -35,7 +37,7 @@ pub fn load_yaml_to_dashmap(d: &str, kind: &str) -> Option<UpstreamsDashMap> {
} }
Err(e) => { Err(e) => {
error!("Reading: {}: {:?}", d, e.to_string()); error!("Reading: {}: {:?}", d, e.to_string());
warn!("Running with empty upstreams list, chane it via API"); warn!("Running with empty upstreams list, update it via API");
return None; return None;
} }
}; };
@@ -51,8 +53,17 @@ pub fn load_yaml_to_dashmap(d: &str, kind: &str) -> Option<UpstreamsDashMap> {
Ok(parsed) => { Ok(parsed) => {
for (hostname, host_config) in parsed.upstreams { for (hostname, host_config) in parsed.upstreams {
let path_map = DashMap::new(); let path_map = DashMap::new();
let header_list = DashMap::new();
for (path, path_config) in host_config.paths { for (path, path_config) in host_config.paths {
let mut server_list = Vec::new(); let mut server_list = Vec::new();
let mut hl = Vec::new();
for header in path_config.headers.iter().by_ref() {
if let Some((key, val)) = header.split_once(':') {
hl.push((key.to_string(), val.to_string()));
}
}
header_list.insert(path.clone(), hl);
// println!(" {:?} == {:?}", hostname, header_list);
for server in path_config.servers { for server in path_config.servers {
if let Some((ip, port_str)) = server.split_once(':') { if let Some((ip, port_str)) = server.split_once(':') {
if let Ok(port) = port_str.parse::<u16>() { if let Ok(port) = port_str.parse::<u16>() {
@@ -62,9 +73,10 @@ pub fn load_yaml_to_dashmap(d: &str, kind: &str) -> Option<UpstreamsDashMap> {
} }
path_map.insert(path, (server_list, AtomicUsize::new(0))); path_map.insert(path, (server_list, AtomicUsize::new(0)));
} }
headers.insert(hostname.clone(), header_list);
dashmap.insert(hostname, path_map); dashmap.insert(hostname, path_map);
} }
Some(dashmap) Some((dashmap, headers))
} }
Err(e) => { Err(e) => {
error!("Failed to parse upstreams file: {}", e); error!("Failed to parse upstreams file: {}", e);

View File

@@ -21,6 +21,8 @@ pub fn print_upstreams(upstreams: &UpstreamsDashMap) {
} }
pub type UpstreamsDashMap = DashMap<String, DashMap<String, (Vec<(String, u16, bool)>, AtomicUsize)>>; pub type UpstreamsDashMap = DashMap<String, DashMap<String, (Vec<(String, u16, bool)>, AtomicUsize)>>;
// pub type HeadersList = DashMap<String, Vec<(String, String)>>;
pub type Headers = DashMap<String, DashMap<String, Vec<(String, String)>>>;
// pub type UpstreamMap = DashMap<String, (Vec<(String, u16)>, AtomicUsize)>; // pub type UpstreamMap = DashMap<String, (Vec<(String, u16)>, AtomicUsize)>;
#[allow(dead_code)] #[allow(dead_code)]

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
pub struct LB { pub struct LB {
pub ump_upst: Arc<UpstreamsDashMap>, pub ump_upst: Arc<UpstreamsDashMap>,
pub ump_full: Arc<UpstreamsDashMap>, pub ump_full: Arc<UpstreamsDashMap>,
pub headers: Arc<Headers>,
pub config: Arc<DashMap<String, String>>, pub config: Arc<DashMap<String, String>>,
pub local: Arc<(String, u16)>, pub local: Arc<(String, u16)>,
} }
@@ -27,7 +28,7 @@ pub struct LB {
impl BackgroundService for LB { impl BackgroundService for LB {
async fn start(&self, mut shutdown: ShutdownWatch) { async fn start(&self, mut shutdown: ShutdownWatch) {
info!("Starting background service"); info!("Starting background service");
let (tx, mut rx) = mpsc::channel::<UpstreamsDashMap>(0); let (tx, mut rx) = mpsc::channel::<(UpstreamsDashMap, Headers)>(0);
let from_file = self.config.get("upstreams_conf"); let from_file = self.config.get("upstreams_conf");
match from_file { match from_file {
@@ -67,10 +68,13 @@ impl BackgroundService for LB {
val = rx.next() => { val = rx.next() => {
match val { match val {
Some(ss) => { Some(ss) => {
let foo = compare_dashmaps(&*self.ump_full, &ss); let foo = compare_dashmaps(&*self.ump_full, &ss.0);
if !foo { if !foo {
clone_dashmap_into(&ss, &self.ump_full); clone_dashmap_into(&ss.0, &self.ump_full);
clone_dashmap_into(&ss, &self.ump_upst); clone_dashmap_into(&ss.0, &self.ump_upst);
for (k,v) in ss.1 {
self.headers.insert(k,v);
}
print_upstreams(&self.ump_full); print_upstreams(&self.ump_full);
} }
} }
@@ -85,6 +89,7 @@ impl BackgroundService for LB {
#[async_trait] #[async_trait]
pub trait GetHost { pub trait GetHost {
async fn get_host(&self, peer: &str, path: &str, upgrade: bool) -> Option<(String, u16, bool)>; async fn get_host(&self, peer: &str, path: &str, upgrade: bool) -> Option<(String, u16, bool)>;
async fn get_header(&self, peer: &str, path: &str) -> Option<Vec<(String, String)>>;
} }
#[async_trait] #[async_trait]
impl GetHost for LB { impl GetHost for LB {
@@ -111,8 +116,6 @@ impl GetHost for LB {
*/ */
async fn get_host(&self, peer: &str, path: &str, _upgrade: bool) -> Option<(String, u16, bool)> { async fn get_host(&self, peer: &str, path: &str, _upgrade: bool) -> Option<(String, u16, bool)> {
let host_entry = self.ump_upst.get(peer)?; let host_entry = self.ump_upst.get(peer)?;
// Check if an exact match exists first
let mut current_path = path.to_string(); let mut current_path = path.to_string();
let mut best_match: Option<(String, u16, bool)> = None; let mut best_match: Option<(String, u16, bool)> = None;
@@ -142,6 +145,33 @@ impl GetHost for LB {
} }
best_match best_match
} }
async fn get_header(&self, peer: &str, path: &str) -> Option<Vec<(String, String)>> {
let host_entry = self.headers.get(peer)?;
let mut current_path = path.to_string();
let mut best_match: Option<Vec<(String, String)>> = None;
loop {
if let Some(entry) = host_entry.get(&current_path) {
if !entry.value().is_empty() {
best_match = Some(entry.value().clone());
break;
}
}
if let Some(pos) = current_path.rfind('/') {
current_path.truncate(pos);
} else {
break;
}
}
if best_match.is_none() {
if let Some(entry) = host_entry.get("/") {
if !entry.value().is_empty() {
best_match = Some(entry.value().clone());
}
}
}
best_match
}
} }
#[async_trait] #[async_trait]
@@ -213,7 +243,22 @@ impl ProxyHttp for LB {
where where
Self::CTX: Send + Sync, Self::CTX: Send + Sync,
{ {
_upstream_response.insert_header("X-Proxied-From", "Fooooooooooooooo").unwrap(); // _upstream_response.insert_header("X-Proxied-From", "Fooooooooooooooo").unwrap();
let host = _session.req_header().headers.get("Host");
match host {
Some(host) => {
let path = _session.req_header().uri.path();
let yoyo = self.get_header(host.to_str().unwrap(), path).await;
for k in yoyo.iter() {
for t in k.iter() {
_upstream_response.insert_header(t.0.clone(), t.1.clone()).unwrap();
}
}
}
None => {}
}
Ok(()) Ok(())
} }

View File

@@ -23,8 +23,12 @@ pub fn run() {
let uf: UpstreamsDashMap = DashMap::new(); let uf: UpstreamsDashMap = DashMap::new();
let ff: UpstreamsDashMap = DashMap::new(); let ff: UpstreamsDashMap = DashMap::new();
let hh: Headers = DashMap::new();
let uf_config = Arc::new(uf); let uf_config = Arc::new(uf);
let ff_config = Arc::new(ff); let ff_config = Arc::new(ff);
let hh_config = Arc::new(hh);
let cfg = Arc::new(maincfg); let cfg = Arc::new(maincfg);
let local = Arc::new(local_conf); let local = Arc::new(local_conf);
@@ -33,12 +37,14 @@ pub fn run() {
ump_full: ff_config.clone(), ump_full: ff_config.clone(),
config: cfg.clone(), config: cfg.clone(),
local: local.clone(), local: local.clone(),
headers: hh_config.clone(),
}; };
let bg = LB { let bg = LB {
ump_upst: uf_config.clone(), ump_upst: uf_config.clone(),
ump_full: ff_config.clone(), ump_full: ff_config.clone(),
config: cfg.clone(), config: cfg.clone(),
local: local.clone(), local: local.clone(),
headers: hh_config.clone(),
}; };
// env_logger::Env::new(); // env_logger::Env::new();

View File

@@ -10,7 +10,7 @@ use log::info;
use tokio::net::TcpListener; use tokio::net::TcpListener;
#[allow(unused_mut)] #[allow(unused_mut)]
pub async fn run_server(bindaddress: String, mut toreturn: Sender<UpstreamsDashMap>) { pub async fn run_server(bindaddress: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)>) {
let mut tr = toreturn.clone(); let mut tr = toreturn.clone();
let app = Router::new() let app = Router::new()
.route("/{*wildcard}", get(getconfig)) .route("/{*wildcard}", get(getconfig))