diff --git a/Cargo.lock b/Cargo.lock index a84ac1b..b02d5f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -161,6 +161,7 @@ version = "0.1.0" dependencies = [ "async-trait", "axum", + "base64", "dashmap", "env_logger", "futures", diff --git a/Cargo.toml b/Cargo.toml index 2dbcce5..419faea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,4 +23,6 @@ reqwest = { version = "0.12.15", features = ["json"] } serde_yaml = "0.8.26" #hickory-client = "0.25.1" rand = "0.9.0" +base64 = "0.22.1" + diff --git a/etc/upstreams.yaml b/etc/upstreams.yaml index 0610b9b..2cd71d1 100644 --- a/etc/upstreams.yaml +++ b/etc/upstreams.yaml @@ -4,6 +4,11 @@ globals: - "Access-Control-Allow-Origin:*" - "Access-Control-Allow-Methods:POST, GET, OPTIONS" - "Access-Control-Max-Age:86400" +# authorization: +# - "basic" +# - "zangag:Anhnazand1234" +# - "apikey" +# - "qweasdqweadhbk" consul: servers: - "http://master1:8500" @@ -17,10 +22,6 @@ consul: - proxy: "proxy-backoffice-dev-backoffice-srv" real: "backoffice-dev-backoffice-srv" token: "8e2db809-845b-45e1-8b47-2c8356a09da0-a4370955-18c2-4d6e-a8f8-ffcc0b47be81" -# whitelist: -# - "frontend-dev-frontend-srv" -# - "gateway-test-gateway-srv" -# - "gateway-stage-gateway-srv" upstreams: myip.netangels.net: paths: diff --git a/src/utils.rs b/src/utils.rs index ac416b0..423fc58 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,3 +1,4 @@ +pub mod auth; pub mod consul; pub mod discovery; mod filewatch; diff --git a/src/utils/auth.rs b/src/utils/auth.rs new file mode 100644 index 0000000..aa72a27 --- /dev/null +++ b/src/utils/auth.rs @@ -0,0 +1,52 @@ +use base64::engine::general_purpose::STANDARD; +use base64::Engine; +use pingora_proxy::Session; + +trait AuthValidator { + fn validate(&self, session: &Session) -> bool; +} +struct BasicAuth<'a>(&'a str); +struct ApiKeyAuth<'a>(&'a str); + +impl AuthValidator for BasicAuth<'_> { + fn validate(&self, session: &Session) -> bool { + if let Some(header) = session.get_header("authorization") { + if let Some((_, val)) = header.to_str().ok().unwrap().split_once(' ') { + let decoded = STANDARD.decode(val).ok().unwrap(); + let decoded_str = String::from_utf8(decoded).ok().unwrap(); + return decoded_str == self.0; + } + } + false + } +} + +impl AuthValidator for ApiKeyAuth<'_> { + fn validate(&self, session: &Session) -> bool { + if let Some(header) = session.get_header("x-api-key") { + return header.to_str().ok().unwrap() == self.0; + } + false + } +} + +fn validate(auth: &dyn AuthValidator, session: &Session) -> bool { + auth.validate(session) +} + +pub fn authenticate(c: &[String], session: &Session) -> bool { + match c[0].as_str() { + "basic" => { + let auth = BasicAuth(c[1].as_str().into()); + validate(&auth, session) + } + "apikey" => { + let auth = ApiKeyAuth(c[1].as_str().into()); + validate(&auth, session) + } + _ => { + println!("Unsupported authentication mechanism : {}", c[0]); + false + } + } +} diff --git a/src/utils/consul.rs b/src/utils/consul.rs index 1fc29b0..db68536 100644 --- a/src/utils/consul.rs +++ b/src/utils/consul.rs @@ -1,5 +1,5 @@ -use crate::utils::parceyaml::{load_configuration, ServiceMapping}; -use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, Headers, UpstreamsDashMap}; +use crate::utils::parceyaml::{load_configuration, Configuration, ServiceMapping}; +use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, UpstreamsDashMap}; use dashmap::DashMap; use futures::channel::mpsc::Sender; use futures::SinkExt; @@ -33,7 +33,7 @@ struct TaggedAddress { port: u16, } -pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)>) { +pub async fn start(fp: String, mut toreturn: Sender) { let config = load_configuration(fp.as_str(), "filepath"); let headers = DashMap::new(); @@ -45,7 +45,7 @@ pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)> } info!("Consul Discovery is enabled : {}", config.typecfg); - let consul = config.consul; + let consul = config.consul.clone(); let prev_upstreams = UpstreamsDashMap::new(); match consul { Some(consul) => { @@ -54,20 +54,32 @@ pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)> let end = servers.len(); loop { - // println!(" ==> {:?}", consul.services); let num = rand::rng().random_range(1..end); headers.clear(); for (k, v) in config.headers.clone() { headers.insert(k.to_string(), v); } let consul_data = servers.get(num).unwrap().to_string(); - // let upstreams = http_request(consul_data, consul.whitelist.clone()); let upstreams = consul_request(consul_data, consul.services.clone(), consul.token.clone()); + match upstreams.await { Some(upstreams) => { if !compare_dashmaps(&upstreams, &prev_upstreams) { + let mut tosend: Configuration = Configuration { + upstreams: Default::default(), + headers: Default::default(), + consul: None, + typecfg: "".to_string(), + globals: Default::default(), + }; + clone_dashmap_into(&upstreams, &prev_upstreams); - toreturn.send((upstreams, headers.clone())).await.unwrap(); + clone_dashmap_into(&upstreams, &tosend.upstreams); + tosend.headers = headers.clone(); + tosend.globals = config.globals.clone(); + tosend.typecfg = config.typecfg.clone(); + tosend.consul = config.consul.clone(); + toreturn.send(tosend).await.unwrap(); } } None => {} diff --git a/src/utils/discovery.rs b/src/utils/discovery.rs index 2a37f9a..91b7c81 100644 --- a/src/utils/discovery.rs +++ b/src/utils/discovery.rs @@ -1,6 +1,6 @@ use crate::utils::consul; use crate::utils::filewatch; -use crate::utils::tools::*; +use crate::utils::parceyaml::Configuration; use crate::web::webserver; use async_trait::async_trait; use futures::channel::mpsc::Sender; @@ -18,26 +18,26 @@ pub struct ConsulProvider { #[async_trait] pub trait Discovery { - async fn start(&self, tx: Sender<(UpstreamsDashMap, Headers)>); + async fn start(&self, tx: Sender); } #[async_trait] impl Discovery for APIUpstreamProvider { - async fn start(&self, toreturn: Sender<(UpstreamsDashMap, Headers)>) { + async fn start(&self, toreturn: Sender) { webserver::run_server(self.address.clone(), toreturn).await; } } #[async_trait] impl Discovery for FromFileProvider { - async fn start(&self, tx: Sender<(UpstreamsDashMap, Headers)>) { + async fn start(&self, tx: Sender) { tokio::spawn(filewatch::start(self.path.clone(), tx.clone())); } } #[async_trait] impl Discovery for ConsulProvider { - async fn start(&self, tx: Sender<(UpstreamsDashMap, Headers)>) { + async fn start(&self, tx: Sender) { tokio::spawn(consul::start(self.path.clone(), tx.clone())); } } diff --git a/src/utils/filewatch.rs b/src/utils/filewatch.rs index 5b5c0ff..d389df1 100644 --- a/src/utils/filewatch.rs +++ b/src/utils/filewatch.rs @@ -1,5 +1,4 @@ -use crate::utils::parceyaml::load_configuration; -use crate::utils::tools::*; +use crate::utils::parceyaml::{load_configuration, Configuration}; use futures::channel::mpsc::Sender; use futures::SinkExt; use log::{error, info}; @@ -10,17 +9,16 @@ use std::path::Path; use std::time::{Duration, Instant}; use tokio::task; -pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)>) { +pub async fn start(fp: String, mut toreturn: Sender) { sleep(Duration::from_millis(50)).await; // For having nice logs :-) let file_path = fp.as_str(); let parent_dir = Path::new(file_path).parent().unwrap(); let (local_tx, mut local_rx) = tokio::sync::mpsc::channel::>(1); info!("Watching for changes in {:?}", parent_dir); let snd = load_configuration(file_path, "filepath"); - match snd { Some(snd) => { - toreturn.send((snd.upstreams, snd.headers)).await.unwrap(); + toreturn.send(snd).await.unwrap(); } None => {} } @@ -53,7 +51,7 @@ pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)> let snd = load_configuration(file_path, "filepath"); match snd { Some(snd) => { - toreturn.send((snd.upstreams, snd.headers)).await.unwrap(); + toreturn.send(snd).await.unwrap(); } None => {} } diff --git a/src/utils/parceyaml.rs b/src/utils/parceyaml.rs index c739008..f6fb708 100644 --- a/src/utils/parceyaml.rs +++ b/src/utils/parceyaml.rs @@ -38,12 +38,12 @@ struct PathConfig { servers: Vec, headers: Option>, } - pub struct Configuration { pub upstreams: UpstreamsDashMap, pub headers: Headers, pub consul: Option, pub typecfg: String, + pub globals: Option>>, } // pub fn load_configuration(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Headers, String)> { @@ -53,6 +53,7 @@ pub fn load_configuration(d: &str, kind: &str) -> Option { headers: Default::default(), consul: None, typecfg: "".to_string(), + globals: Default::default(), }; toreturn.upstreams = UpstreamsDashMap::new(); toreturn.headers = Headers::new(); @@ -96,6 +97,14 @@ pub fn load_configuration(d: &str, kind: &str) -> Option { } global_headers.insert("/".to_string(), hl); toreturn.headers.insert("GLOBAL_HEADERS".to_string(), global_headers); + + let cfg = DashMap::new(); + if let Some(k) = globals.get("authorization") { + cfg.insert("authorization".to_string(), k.to_owned()); + toreturn.globals = Some(cfg); + } else { + toreturn.globals = None; + } } match parsed.provider.as_str() { "file" => { @@ -107,17 +116,6 @@ pub fn load_configuration(d: &str, kind: &str) -> Option { for (path, path_config) in host_config.paths { let mut server_list = Vec::new(); let mut hl = Vec::new(); - // Set global headers - // if let Some(globals) = &parsed.globals { - // for headers in globals.get("headers").iter().by_ref() { - // for header in headers.iter() { - // if let Some((key, val)) = header.split_once(':') { - // hl.push((key.to_string(), val.to_string())); - // } - // } - // } - // } - // Set per host/path headers if let Some(headers) = &path_config.headers { for header in headers.iter().by_ref() { if let Some((key, val)) = header.split_once(':') { diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index 1db6a2c..e605ee0 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -13,10 +13,13 @@ use pingora_core::server::ShutdownWatch; use pingora_core::services::background::BackgroundService; use pingora_http::ResponseHeader; +use crate::utils::auth::authenticate; +use crate::utils::parceyaml::Configuration; use pingora_proxy::{ProxyHttp, Session}; use std::ops::Deref; use std::sync::atomic::Ordering; use std::sync::Arc; +// use http_auth_basic::Credentials; pub struct LB { pub ump_upst: Arc, @@ -24,13 +27,14 @@ pub struct LB { pub headers: Arc, pub config: Arc>, pub local: Arc<(String, u16)>, + pub proxyconf: Arc>>, } #[async_trait] impl BackgroundService for LB { async fn start(&self, mut shutdown: ShutdownWatch) { info!("Starting background service"); - let (tx, mut rx) = mpsc::channel::<(UpstreamsDashMap, Headers)>(0); + let (tx, mut rx) = mpsc::channel::(0); let from_file = self.config.get("upstreams_conf"); match from_file { @@ -76,23 +80,32 @@ impl BackgroundService for LB { val = rx.next() => { match val { Some(ss) => { - clone_dashmap_into(&ss.0, &self.ump_full); - clone_dashmap_into(&ss.0, &self.ump_upst); + clone_dashmap_into(&ss.upstreams, &self.ump_full); + clone_dashmap_into(&ss.upstreams, &self.ump_upst); + self.proxyconf.clear(); + match ss.globals { + Some(globals) => { + for (k,v) in globals { + self.proxyconf.insert(k, v); + } + } + None => {} + } self.headers.clear(); - for entry in ss.0.iter() { + for entry in ss.upstreams.iter() { let global_key = entry.key().clone(); let global_values = DashMap::new(); - let mut target_entry = ss.1.entry(global_key).or_insert_with(DashMap::new); + let mut target_entry = ss.headers.entry(global_key).or_insert_with(DashMap::new); target_entry.extend(global_values); self.headers.insert(target_entry.key().to_owned(), target_entry.value().to_owned()); } - for path in ss.1.iter() { + for path in ss.headers.iter() { let path_key = path.key().clone(); let path_headers = path.value().clone(); self.headers.insert(path_key.clone(), path_headers); - if let Some(global_headers) = ss.1.get("GLOBAL_HEADERS") { + if let Some(global_headers) = ss.headers.get("GLOBAL_HEADERS") { if let Some(existing_headers) = self.headers.get_mut(&path_key) { merge_headers(&existing_headers, &global_headers); } @@ -138,6 +151,7 @@ impl GetHost for LB { } */ async fn get_host(&self, peer: &str, path: &str, _upgrade: bool) -> Option<(String, u16, bool)> { + // println!(" ==> {:?}", self.config); let host_entry = self.ump_upst.get(peer)?; let mut current_path = path.to_string(); let mut best_match: Option<(String, u16, bool)> = None; @@ -202,7 +216,6 @@ impl ProxyHttp for LB { fn new_ctx(&self) -> Self::CTX {} async fn upstream_peer(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result> { let host_name = return_header_host(&session); - match host_name { Some(host) => { // session.req_header_mut().headers.insert("X-Host-Name", host.to_string().parse().unwrap()); @@ -227,19 +240,26 @@ impl ProxyHttp for LB { } } } - async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> pingora_core::Result + async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result where Self::CTX: Send + Sync, { - // if !_session.req_header().uri.path().starts_with("/ping") { - if _session.req_header().uri.path().starts_with("/denied") { - let _ = _session.respond_error(403).await; - info!("Forbidded: {:?}, {}", _session.client_addr(), _session.req_header().uri.path().to_string()); + if let Some(auth) = self.proxyconf.get("authorization") { + let authenticated = authenticate(&auth.value(), &session); + if !authenticated { + let _ = session.respond_error(401).await; + info!("Forbidden: {:?}, {}", session.client_addr(), session.req_header().uri.path().to_string()); + return Ok(true); + } + }; + if session.req_header().uri.path().starts_with("/denied") { + let _ = session.respond_error(403).await; + info!("Forbidden: {:?}, {}", session.client_addr(), session.req_header().uri.path().to_string()); return Ok(true); }; Ok(false) } - async fn upstream_request_filter(&self, _session: &mut Session, _upstream_request: &mut RequestHeader, _ctx: &mut Self::CTX) -> pingora_core::Result<()> + async fn upstream_request_filter(&self, _session: &mut Session, _upstream_request: &mut RequestHeader, _ctx: &mut Self::CTX) -> Result<()> where Self::CTX: Send + Sync, { diff --git a/src/web/start.rs b/src/web/start.rs index a40c211..7d83d36 100644 --- a/src/web/start.rs +++ b/src/web/start.rs @@ -33,12 +33,16 @@ pub fn run() { let cfg = Arc::new(maincfg); let local = Arc::new(local_conf); + let proxyconf: DashMap> = Default::default(); + let pconf = Arc::new(proxyconf); + let lb = LB { ump_upst: uf_config.clone(), ump_full: ff_config.clone(), config: cfg.clone(), local: local.clone(), headers: hh_config.clone(), + proxyconf: pconf.clone(), }; let bg = LB { ump_upst: uf_config.clone(), @@ -46,6 +50,7 @@ pub fn run() { config: cfg.clone(), local: local.clone(), headers: hh_config.clone(), + proxyconf: pconf.clone(), }; // env_logger::Env::new(); diff --git a/src/web/webserver.rs b/src/web/webserver.rs index c14b4c9..306220d 100644 --- a/src/web/webserver.rs +++ b/src/web/webserver.rs @@ -1,4 +1,4 @@ -use crate::utils::tools::*; +use crate::utils::parceyaml::Configuration; use axum::body::Body; use axum::http::{Response, StatusCode}; use axum::response::IntoResponse; @@ -10,7 +10,7 @@ use log::info; use tokio::net::TcpListener; #[allow(unused_mut)] -pub async fn run_server(bindaddress: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)>) { +pub async fn run_server(bindaddress: String, mut toreturn: Sender) { let mut tr = toreturn.clone(); let app = Router::new() .route("/{*wildcard}", get(getconfig)) @@ -25,7 +25,7 @@ pub async fn run_server(bindaddress: String, mut toreturn: Sender<(UpstreamsDash match serverlist { Some(serverlist) => { - let _ = tr.send((serverlist.upstreams, serverlist.headers)).await.unwrap(); + let _ = tr.send(serverlist).await.unwrap(); Response::builder().status(StatusCode::CREATED).body(Body::from("Config, conf file, updated!\n")).unwrap() } None => Response::builder()