diff --git a/etc/upstreams.yaml b/etc/upstreams.yaml index 69ecc74..7ffaf0d 100644 --- a/etc/upstreams.yaml +++ b/etc/upstreams.yaml @@ -1,57 +1,76 @@ # The file under watch and hot reload, changes are applied immediately, no need to restart or reload. -provider: "file" # consul, kubernetes +provider: "file" # "file" "consul" "kubernetes" sticky_sessions: false -to_ssl: false -#rate_limit: 100 +to_https: false +rate_limit: 100 headers: - "Access-Control-Allow-Origin:*" - "Access-Control-Allow-Methods:POST, GET, OPTIONS" - "Access-Control-Max-Age:86400" - - "X-Custom-Header:Something Special" -authorization: - type: "jwt" - creds: "910517d9-f9a1-48de-8826-dbadacbd84af-cb6f830e-ab16-47ec-9d8f-0090de732774" + - "Strict-Transport-Security:max-age=31536000; includeSubDomains; preload" +#authorization: +# type: "jwt" +# creds: "910517d9-f9a1-48de-8826-dbadacbd84af-cb6f830e-ab16-47ec-9d8f-0090de732774" # type: "basic" -# creds: "user:Passw0rd" +# creds: "username:Pa$$w0rd" # type: "apikey" # creds: "5ecbf799-1343-4e94-a9b5-e278af5cd313-56b45249-1839-4008-a450-a60dc76d2bae" -consul: # If the provider is consul. Otherwise, ignored. +consul: servers: - - "http://consul1:8500" - - "http://consul2:8500" - - "http://consul3:8500" - services: # proxy: The hostname to access the proxy server, real : The real service name in Consul database. - - proxy: "proxy-frontend-dev-frontend-srv" - real: "frontend-dev-frontend-srv" + - "http://192.168.1.199:8500" + - "http://192.168.1.200:8500" + - "http://192.168.1.201:8500" + services: # hostname: The hostname to access the proxy server, upstream : The real service name in Consul database. + - hostname: "vt-webapi-service" + upstream: "vt-webapi-service-health" + path: "/one" + headers: + - "X-Some-Thing:Yaaaaaaaaaaaaaaa" + - "X-Proxy-From:Aralez" + rate_limit: 1 + to_https: false + - hostname: "vt-webapi-service" + upstream: "vt-webapi-service-health" + path: "/" token: "8e2db809-845b-45e1-8b47-2c8356a09da0-a4370955-18c2-4d6e-a8f8-ffcc0b47be81" # Consul server access token, If Consul auth is enabled kubernetes: servers: - - "172.16.0.11:5443" # KUBERNETES_SERVICE_HOST : KUBERNETES_SERVICE_PORT_HTTPS + - "192.168.1.55:443" #For testing only, overrides with KUBERNETES_SERVICE_HOST : KUBERNETES_SERVICE_PORT_HTTPS env variables. services: - - proxy: "api-service" - real: "api-service" + - hostname: "vt-webapi-service" path: "/" - - proxy: "api-service" - real: "search-service" - path: "/search" - - proxy: "websocket-service" - real: "websocket-service" - tokenpath: "/tmp/token.txt" # /var/run/secrets/kubernetes.io/serviceaccount/token + upstream: "vt-webapi-service" + - hostname: "vt-webapi-service" + upstream: "vt-console-service" + path: "/one" + headers: + - "X-Some-Thing:Yaaaaaaaaaaaaaaa" + - "X-Proxy-From:Aralez" + rate_limit: 100 + to_https: false + - hostname: "vt-webapi-service" + upstream: "vt-rambulik-service" + path: "/two" + - hostname: "vt-websocket-service" + upstream: "vt-websocket-service" + path: "/" + tokenpath: "/path/to/kubetoken.txt" #If not set, will default to /var/run/secrets/kubernetes.io/serviceaccount/token upstreams: myip.mydomain.com: paths: - rate_limit: 10 # Per path rate limit have higher priority than global rate limit. If not set, the global rate limit will be used "/": + rate_limit: 200 to_https: false headers: - "X-Proxy-From:Aralez" - servers: # List of upstreams HOST:PORT + servers: - "127.0.0.1:8000" - "127.0.0.2:8000" - "127.0.0.3:8000" - "127.0.0.4:8000" + - "127.0.0.5:8000" "/ping": - to_https: true + to_https: false headers: - "X-Some-Thing:Yaaaaaaaaaaaaaaa" - "X-Proxy-From:Aralez" @@ -64,14 +83,28 @@ upstreams: polo.mydomain.com: paths: "/": + to_https: false headers: - "X-Some-Thing:Yaaaaaaaaaaaaaaa" servers: + - "192.168.1.1:8000" + - "192.168.1.10:8000" - "127.0.0.1:8000" - "127.0.0.2:8000" - "127.0.0.3:8000" - "127.0.0.4:8000" - "/.well-known/acme-challenge": - healthcheck: false - servers: - - "127.0.0.1:8001" \ No newline at end of file + apt.mydomain.com: + paths: + "/": + servers: + - "192.168.1.10:443" + "/.well-known/acme-challenge": + healthcheck: false + servers: + - "127.0.0.1:8001" + localpost: + paths: + "/": + to_https: false + servers: + - "127.0.0.1:9000" \ No newline at end of file diff --git a/src/utils.rs b/src/utils.rs index b7c9de1..61fd5b7 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -6,6 +6,7 @@ mod filewatch; pub mod healthcheck; pub mod jwt; pub mod kuber; +pub mod kuberconsul; pub mod metrics; pub mod parceyaml; pub mod state; diff --git a/src/utils/consul.rs b/src/utils/consul.rs index 491ef57..37526df 100644 --- a/src/utils/consul.rs +++ b/src/utils/consul.rs @@ -1,14 +1,17 @@ +use crate::utils::kuberconsul::{list_to_upstreams, match_path}; +use crate::utils::parceyaml::build_headers; use crate::utils::structs::{Configuration, InnerMap, ServiceMapping, UpstreamsDashMap}; use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams}; use dashmap::DashMap; use futures::channel::mpsc::Sender; use futures::SinkExt; -use log::{info, warn}; use pingora::prelude::sleep; use rand::Rng; use reqwest::header::{HeaderMap, HeaderValue}; +use reqwest::Client; use serde::Deserialize; use std::collections::HashMap; +use std::env; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::time::Duration; @@ -28,116 +31,89 @@ struct TaggedAddress { } pub async fn start(mut toreturn: Sender, config: Arc) { - let headers = DashMap::new(); - info!("Consul Discovery is enabled : {}", config.typecfg); - let consul = config.consul.clone(); let prev_upstreams = UpstreamsDashMap::new(); - match consul { - Some(consul) => { - let servers = consul.servers.unwrap(); - info!("Consul Servers => {:?}", servers); + loop { + if let Some(consul) = config.consul.clone() { + let servers = consul.servers.unwrap_or(vec![format!( + "{}:{}", + env::var("CONSUL_SERVICE_HOST").unwrap_or("0.0.0.0".to_string()), + env::var("CONSUL_SERVICE_PORT").unwrap_or("0".to_string()) + )]); let end = servers.len() - 1; - - loop { - let mut num = 0; - if end > 0 { - num = rand::rng().random_range(0..end); - } - headers.clear(); - for (k, v) in config.headers.clone() { - headers.insert(k.to_string(), v); - } - let consul_data = servers.get(num).unwrap().to_string(); - let upstreams = consul_request(consul_data, consul.services.clone(), consul.token.clone()); - match upstreams.await { - Some(upstreams) => { - if !compare_dashmaps(&upstreams, &prev_upstreams) { - let mut tosend: Configuration = Configuration { - upstreams: Default::default(), - headers: Default::default(), - consul: None, - kubernetes: None, - typecfg: "".to_string(), - extraparams: config.extraparams.clone(), - }; - - clone_dashmap_into(&upstreams, &prev_upstreams); - clone_dashmap_into(&upstreams, &tosend.upstreams); - tosend.headers = headers.clone(); - tosend.extraparams.authentication = config.extraparams.authentication.clone(); - tosend.typecfg = config.typecfg.clone(); - tosend.consul = config.consul.clone(); - print_upstreams(&tosend.upstreams); - toreturn.send(tosend).await.unwrap(); - } + let upstreams = UpstreamsDashMap::new(); + let mut num = 0; + if end > 0 { + num = rand::rng().random_range(0..end); + } + let consul_data = servers.get(num).unwrap().to_string(); + let ss = consul_data + "/v1/catalog/service/"; + if let Some(ref svc) = consul.services { + for i in svc { + let header_list = DashMap::new(); + let mut hl = Vec::new(); + build_headers(&i.headers, config.as_ref(), &mut hl); + if hl.len() > 0 { + header_list.insert(i.path.clone().unwrap_or("/".to_string()), hl); + config.headers.insert(i.hostname.clone(), header_list); } - None => {} + let pref: String = ss.clone() + &i.upstream; + let list = get_by_http(pref, consul.token.clone(), &i).await; + list_to_upstreams(list, &upstreams, &i); } - sleep(Duration::from_secs(5)).await; + } + if !compare_dashmaps(&upstreams, &prev_upstreams) { + let mut tosend: Configuration = Configuration { + upstreams: Default::default(), + headers: Default::default(), + consul: None, + kubernetes: None, + typecfg: "".to_string(), + extraparams: config.extraparams.clone(), + }; + + clone_dashmap_into(&upstreams, &prev_upstreams); + clone_dashmap_into(&upstreams, &tosend.upstreams); + tosend.headers = config.headers.clone(); + tosend.extraparams.authentication = config.extraparams.authentication.clone(); + tosend.typecfg = config.typecfg.clone(); + tosend.consul = config.consul.clone(); + print_upstreams(&tosend.upstreams); + toreturn.send(tosend).await.unwrap(); } } - None => {} + sleep(Duration::from_secs(5)).await; } } -async fn consul_request(url: String, whitelist: Option>, token: Option) -> Option { - let upstreams = UpstreamsDashMap::new(); - let ss = url.clone() + "/v1/catalog/service/"; - match whitelist { - Some(whitelist) => { - for k in whitelist.iter() { - let pref: String = ss.clone() + &k.hostname; - let list = get_by_http(pref.clone(), token.clone()).await; - match list { - Some(list) => { - upstreams.insert(k.upstream.clone(), list); - } - None => { - warn!("Whitelist not found for {}", k.upstream); - } - } - } - } - None => {} - } - Some(upstreams) -} - -async fn get_by_http(url: String, token: Option) -> Option, AtomicUsize)>> { - let client = reqwest::Client::new(); +async fn get_by_http(url: String, token: Option, conf: &ServiceMapping) -> Option, AtomicUsize)>> { + let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().ok()?; let mut headers = HeaderMap::new(); if let Some(token) = token { headers.insert("X-Consul-Token", HeaderValue::from_str(&token).unwrap()); } let to = Duration::from_secs(1); - let u = client.get(url).timeout(to).send(); - let mut values = Vec::new(); - let upstreams: DashMap, AtomicUsize)> = DashMap::new(); - match u.await { - Ok(r) => { - let jason = r.json::>().await; - match jason { - Ok(whitelist) => { - for service in whitelist { - let addr = service.tagged_addresses.get("lan_ipv4").unwrap().address.clone(); - let prt = service.tagged_addresses.get("lan_ipv4").unwrap().port.clone(); - let to_add = InnerMap { - address: addr, - port: prt, - is_ssl: false, - is_http2: false, - to_https: false, - rate_limit: None, - healthcheck: None, - }; - values.push(to_add); - } - } - Err(_) => return None, - } - } - Err(_) => return None, + let resp = client.get(url).timeout(to).send().await.ok()?; + if !resp.status().is_success() { + eprintln!("Consul API returned status: {}", resp.status()); + return None; } - upstreams.insert("/".to_string(), (values, AtomicUsize::new(0))); + let mut inner_vec = Vec::new(); + let upstreams: DashMap, AtomicUsize)> = DashMap::new(); + let endpoints: Vec = resp.json().await.ok()?; + for subsets in endpoints { + let addr = subsets.tagged_addresses.get("lan_ipv4").unwrap().address.clone(); + let prt = subsets.tagged_addresses.get("lan_ipv4").unwrap().port.clone(); + let to_add = InnerMap { + address: addr, + port: prt, + is_ssl: false, + is_http2: false, + to_https: conf.to_https.unwrap_or(false), + rate_limit: conf.rate_limit, + healthcheck: None, + }; + inner_vec.push(to_add); + } + match_path(&conf, &upstreams, inner_vec.clone()); Some(upstreams) } diff --git a/src/utils/kuber.rs b/src/utils/kuber.rs index 4716652..a852a44 100644 --- a/src/utils/kuber.rs +++ b/src/utils/kuber.rs @@ -1,3 +1,4 @@ +use crate::utils::kuberconsul::{list_to_upstreams, match_path}; use crate::utils::parceyaml::build_headers; use crate::utils::structs::{Configuration, InnerMap, ServiceMapping, UpstreamsDashMap}; use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams}; @@ -36,10 +37,10 @@ struct Port { } pub async fn start(mut toreturn: Sender, config: Arc) { - let upstreams = UpstreamsDashMap::new(); let prev_upstreams = UpstreamsDashMap::new(); loop { if let Some(kuber) = config.kubernetes.clone() { + let upstreams = UpstreamsDashMap::new(); let path = kuber.tokenpath.unwrap_or("/var/run/secrets/kubernetes.io/serviceaccount/token".to_string()); let token = read_token(path.as_str()).await; let servers = kuber.servers.unwrap_or(vec![format!( @@ -64,38 +65,26 @@ pub async fn start(mut toreturn: Sender, config: Arc { - for (k, v) in list { - upstr.value().insert(k, v); - } - } - None => { - upstreams.insert(i.upstream.clone(), list); - } - }; - } + list_to_upstreams(list, &upstreams, &i); } } - } - if !compare_dashmaps(&upstreams, &prev_upstreams) { - let tosend: Configuration = Configuration { - upstreams: Default::default(), - headers: config.headers.clone(), - consul: config.consul.clone(), - kubernetes: config.kubernetes.clone(), - typecfg: config.typecfg.clone(), - extraparams: config.extraparams.clone(), - }; - clone_dashmap_into(&upstreams, &prev_upstreams); - clone_dashmap_into(&upstreams, &tosend.upstreams); - print_upstreams(&tosend.upstreams); - toreturn.send(tosend).await.unwrap(); + if !compare_dashmaps(&upstreams, &prev_upstreams) { + let tosend: Configuration = Configuration { + upstreams: Default::default(), + headers: config.headers.clone(), + consul: config.consul.clone(), + kubernetes: config.kubernetes.clone(), + typecfg: config.typecfg.clone(), + extraparams: config.extraparams.clone(), + }; + clone_dashmap_into(&upstreams, &prev_upstreams); + clone_dashmap_into(&upstreams, &tosend.upstreams); + print_upstreams(&tosend.upstreams); + toreturn.send(tosend).await.unwrap(); + } } sleep(Duration::from_secs(5)).await; } @@ -103,17 +92,14 @@ pub async fn start(mut toreturn: Sender, config: Arc Option, AtomicUsize)>> { let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().ok()?; - - let resp = client.get(url).bearer_auth(token).send().await.ok()?; - + let to = Duration::from_secs(1); + let resp = client.get(url).timeout(to).bearer_auth(token).send().await.ok()?; if !resp.status().is_success() { eprintln!("Kubernetes API returned status: {}", resp.status()); return None; } - let endpoints: Endpoints = resp.json().await.ok()?; let upstreams: DashMap, AtomicUsize)> = DashMap::new(); - // println!(" ===> {:?} : {:?}", conf.to_https.unwrap_or(false), conf.rate_limit); if let Some(subsets) = endpoints.subsets { for subset in subsets { if let (Some(addresses), Some(ports)) = (subset.addresses, subset.ports) { @@ -132,14 +118,7 @@ pub async fn get_by_http(url: &str, token: &str, conf: &ServiceMapping) -> Optio inner_vec.push(to_add); } } - match conf.path { - Some(ref p) => { - upstreams.insert(p.to_string(), (inner_vec, AtomicUsize::new(0))); - } - None => { - upstreams.insert("/".to_string(), (inner_vec, AtomicUsize::new(0))); - } - } + match_path(&conf, &upstreams, inner_vec.clone()); } } } diff --git a/src/utils/kuberconsul.rs b/src/utils/kuberconsul.rs new file mode 100644 index 0000000..7a9b460 --- /dev/null +++ b/src/utils/kuberconsul.rs @@ -0,0 +1,29 @@ +use crate::utils::structs::{InnerMap, ServiceMapping, UpstreamsDashMap}; +use dashmap::DashMap; +use std::sync::atomic::AtomicUsize; + +pub fn list_to_upstreams(lt: Option, AtomicUsize)>>, upstreams: &UpstreamsDashMap, i: &ServiceMapping) { + if let Some(list) = lt { + match upstreams.get(&i.hostname.clone()) { + Some(upstr) => { + for (k, v) in list { + upstr.value().insert(k, v); + } + } + None => { + upstreams.insert(i.hostname.clone(), list); + } + }; + } +} + +pub fn match_path(conf: &ServiceMapping, upstreams: &DashMap, AtomicUsize)>, values: Vec) { + match conf.path { + Some(ref p) => { + upstreams.insert(p.to_string(), (values, AtomicUsize::new(0))); + } + None => { + upstreams.insert("/".to_string(), (values, AtomicUsize::new(0))); + } + } +} diff --git a/src/utils/parceyaml.rs b/src/utils/parceyaml.rs index 2a3d7a0..35262d0 100644 --- a/src/utils/parceyaml.rs +++ b/src/utils/parceyaml.rs @@ -108,11 +108,11 @@ async fn populate_file_upstreams(config: &mut Configuration, parsed: &Config) { info!("Applied Rate Limit for {} : {} request per second", hostname, rate); } - let mut server_list = Vec::new(); let mut hl: Vec<(String, String)> = Vec::new(); build_headers(&path_config.headers, config, &mut hl); header_list.insert(path.clone(), hl); + let mut server_list = Vec::new(); for server in &path_config.servers { if let Some((ip, port_str)) = server.split_once(':') { if let Ok(port) = port_str.parse::() {