diff --git a/.gitignore b/.gitignore index 080988b..79f725c 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,8 @@ *.dll *.exe *.sh +/docs/ +/docs /target/ *.iml .idea/ diff --git a/Cargo.toml b/Cargo.toml index 1d9606b..26a6743 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ lazy_static = "1.5.0" #openssl = "0.10.73" x509-parser = "0.17.0" rustls-pemfile = "2.2.0" +#hickory-client = { version = "0.25.2" } tower-http = { version = "0.6.6", features = ["fs"] } once_cell = "1.20.2" #moka = { version = "0.12.10", features = ["sync"] } diff --git a/METRICS.md b/METRICS.md deleted file mode 100644 index 72d4579..0000000 --- a/METRICS.md +++ /dev/null @@ -1,120 +0,0 @@ -# πŸ“ˆ Aralez Prometheus Metrics Reference - -This document outlines Prometheus metrics for the [Aralez](https://github.com/sadoyan/aralez) reverse proxy. -These metrics can be used for monitoring, alerting and performance analysis. - -Exposed to `http://config_address/metrics` - -By default `http://127.0.0.1:3000/metrics` - -# πŸ“Š Example Grafana dashboard during stress test : - -![Aralez](https://netangels.net/utils/dash.png) - ---- - -## πŸ› οΈ Prometheus Metrics - -### 1. `aralez_requests_total` - -- **Type**: `Counter` -- **Purpose**: Total amount requests served by Aralez. - -**PromQL example:** - -```promql -rate(aralez_requests_total[5m]) -``` - ---- - -### 2. `aralez_errors_total` - -- **Type**: `Counter` -- **Purpose**: Count of requests that resulted in an error. - -**PromQL example:** - -```promql -rate(aralez_errors_total[5m]) -``` - ---- - -### 3. `aralez_responses_total{status="200"}` - -- **Type**: `CounterVec` -- **Purpose**: Count of responses by HTTP status code. - -**PromQL example:** - -```promql -rate(aralez_responses_total{status=~"5.."}[5m]) > 0 -``` - -> Useful for alerting on 5xx errors. - ---- - -### 4. `aralez_response_latency_seconds` - -- **Type**: `Histogram` -- **Purpose**: Tracks the latency of responses in seconds. - -**Example bucket output:** - -```prometheus -aralez_response_latency_seconds_bucket{le="0.01"} 15 -aralez_response_latency_seconds_bucket{le="0.1"} 120 -aralez_response_latency_seconds_bucket{le="0.25"} 245 -aralez_response_latency_seconds_bucket{le="0.5"} 500 -... -aralez_response_latency_seconds_count 1023 -aralez_response_latency_seconds_sum 42.6 -``` - -| Metric | Meaning | -|-------------------------|---------------------------------------------------------------| -| `bucket{le="0.1"} 120` | 120 requests were ≀ 100ms | -| `bucket{le="0.25"} 245` | 245 requests were ≀ 250ms | -| `count` | Total number of observations (i.e., total responses measured) | -| `sum` | Total time of all responses, in seconds | - -### πŸ” How to interpret: - -- `le` means β€œless than or equal to”. -- `count` is total amount of observations. -- `sum` is the total time (in seconds) of all responses. - -**PromQL examples:** - -πŸ”Ή **95th percentile latency** - -```promql -histogram_quantile(0.95, rate(aralez_response_latency_seconds_bucket[5m])) - -``` - -πŸ”Ή **Average latency** - -```promql -rate(aralez_response_latency_seconds_sum[5m]) / rate(aralez_response_latency_seconds_count[5m]) -``` - ---- - -## βœ… Notes - -- Metrics are registered after the first served request. - ---- -βœ… Summary of key metrics - -| Metric Name | Type | What it Tells You | -|---------------------------------------|------------|---------------------------| -| `aralez_requests_total` | Counter | Total requests served | -| `aralez_errors_total` | Counter | Number of failed requests | -| `aralez_responses_total{status="200"}` | CounterVec | Response status breakdown | -| `aralez_response_latency_seconds` | Histogram | How fast responses are | - -πŸ“˜ *Last updated: May 2025* diff --git a/etc/main.yaml b/etc/main.yaml index 2adc0c0..8a0a277 100644 --- a/etc/main.yaml +++ b/etc/main.yaml @@ -14,7 +14,7 @@ config_tls_certificate: /etc/server.crt # Mandatory if config_tls_address is set config_tls_key_file: /etc/key.pem # Mandatory if config_tls_address is set proxy_address_http: 0.0.0.0:6193 # Proxy HTTP bind address proxy_address_tls: 0.0.0.0:6194 # Optional, Proxy TLS bind address -proxy_certificates: /etc/yoyo # Mandatory if proxy_address_tls set, should contain a certificate and key files strictly in a format {NAME}.crt, {NAME}.key. +proxy_certificates: /etc/certs # Mandatory if proxy_address_tls set, should contain a certificate and key files strictly in a format {NAME}.crt, {NAME}.key. proxy_tls_grade: a+ # Grade of TLS suite for proxy (a+, a, b, c, unsafe), matching grades of Qualys SSL Labs upstreams_conf: /etc/upstreams.yaml # the location of upstreams file file_server_folder: /opt/storage # Optional, local folder to serve @@ -22,4 +22,4 @@ file_server_address: 127.0.0.1:3002 # Optional, Local address for file server. C log_level: info # info, warn, error, debug, trace, off hc_method: HEAD # Healthcheck method (HEAD, GET, POST are supported) UPPERCASE hc_interval: 2 #Interval for health checks in seconds -master_key: 910517d9-f9a1-48de-8826-dbadacbd84af-cb6f830e-ab16-47ec-9d8f-0090de732774 # Mater key for working with API server and JWT Secret +master_key: 910517d9-f9a1-48de-8826-dbadacbd84af-cb6f830e-ab16-47ec-9d8f-0090de732774 # Mater key for working with API server and JWT Secret \ No newline at end of file diff --git a/etc/upstreams.yaml b/etc/upstreams.yaml index c4bb6df..db38474 100644 --- a/etc/upstreams.yaml +++ b/etc/upstreams.yaml @@ -1,5 +1,5 @@ # The file under watch and hot reload, changes are applied immediately, no need to restart or reload. -provider: "file" # consul +provider: "file" # consul, kubernetes sticky_sessions: false to_ssl: false #rate_limit: 100 @@ -24,6 +24,17 @@ consul: # If the provider is consul. Otherwise, ignored. - proxy: "proxy-frontend-dev-frontend-srv" real: "frontend-dev-frontend-srv" token: "8e2db809-845b-45e1-8b47-2c8356a09da0-a4370955-18c2-4d6e-a8f8-ffcc0b47be81" # Consul server access token, If Consul auth is enabled +kubernetes: + servers: + - "172.16.0.11:5443" # KUBERNETES_SERVICE_HOST : KUBERNETES_SERVICE_PORT_HTTPS + services: + - proxy: "vt-api-service-v2" + real: "vt-api-service-v2" + - proxy: "vt-search-service" + real: "vt-search-service" + - proxy: "vt-websocket-service" + real: "vt-websocket-service" + tokenpath: "/tmp/token.txt" # /var/run/secrets/kubernetes.io/serviceaccount/token upstreams: myip.mydomain.com: paths: diff --git a/src/utils.rs b/src/utils.rs index 4b1bf6c..7c5d0ae 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,9 +1,11 @@ pub mod auth; pub mod consul; pub mod discovery; +pub mod dnsclient; mod filewatch; pub mod healthcheck; pub mod jwt; +pub mod kuber; pub mod metrics; pub mod parceyaml; pub mod structs; diff --git a/src/utils/consul.rs b/src/utils/consul.rs index 7176ec8..726c93e 100644 --- a/src/utils/consul.rs +++ b/src/utils/consul.rs @@ -1,6 +1,5 @@ -use crate::utils::parceyaml::load_configuration; use crate::utils::structs::{Configuration, InnerMap, ServiceMapping, UpstreamsDashMap}; -use crate::utils::tools::{clone_dashmap_into, compare_dashmaps}; +use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams}; use dashmap::DashMap; use futures::channel::mpsc::Sender; use futures::SinkExt; @@ -11,6 +10,7 @@ use reqwest::header::{HeaderMap, HeaderValue}; use serde::Deserialize; use std::collections::HashMap; use std::sync::atomic::AtomicUsize; +use std::sync::Arc; use std::time::Duration; #[derive(Debug, Deserialize)] @@ -27,59 +27,53 @@ struct TaggedAddress { port: u16, } -pub async fn start(fp: String, mut toreturn: Sender) { - let config = load_configuration(fp.as_str(), "filepath").await; +pub async fn start(mut toreturn: Sender, config: Arc) { let headers = DashMap::new(); - match config { - Some(config) => { - if config.typecfg.to_string() != "consul" { - info!("Not running Consul discovery, requested type is: {}", config.typecfg); - return; - } + info!("Consul Discovery is enabled : {}", config.typecfg); + let consul = config.consul.clone(); + let prev_upstreams = UpstreamsDashMap::new(); + match consul { + Some(consul) => { + let servers = consul.servers.unwrap(); + info!("Consul Servers => {:?}", servers); + let end = servers.len() - 1; - info!("Consul Discovery is enabled : {}", config.typecfg); - let consul = config.consul.clone(); - let prev_upstreams = UpstreamsDashMap::new(); - match consul { - Some(consul) => { - let servers = consul.servers.unwrap(); - info!("Consul Servers => {:?}", servers); - let end = servers.len(); - - loop { - let num = rand::rng().random_range(1..end); - headers.clear(); - for (k, v) in config.headers.clone() { - headers.insert(k.to_string(), v); - } - let consul_data = servers.get(num).unwrap().to_string(); - let upstreams = consul_request(consul_data, consul.services.clone(), consul.token.clone()); - match upstreams.await { - Some(upstreams) => { - if !compare_dashmaps(&upstreams, &prev_upstreams) { - let mut tosend: Configuration = Configuration { - upstreams: Default::default(), - headers: Default::default(), - consul: None, - typecfg: "".to_string(), - extraparams: config.extraparams.clone(), - }; - - clone_dashmap_into(&upstreams, &prev_upstreams); - clone_dashmap_into(&upstreams, &tosend.upstreams); - tosend.headers = headers.clone(); - tosend.extraparams.authentication = config.extraparams.authentication.clone(); - tosend.typecfg = config.typecfg.clone(); - tosend.consul = config.consul.clone(); - toreturn.send(tosend).await.unwrap(); - } - } - None => {} - } - sleep(Duration::from_secs(5)).await; - } + loop { + let mut num = 0; + if end > 0 { + num = rand::rng().random_range(0..end); } - None => {} + headers.clear(); + for (k, v) in config.headers.clone() { + headers.insert(k.to_string(), v); + } + let consul_data = servers.get(num).unwrap().to_string(); + let upstreams = consul_request(consul_data, consul.services.clone(), consul.token.clone()); + match upstreams.await { + Some(upstreams) => { + if !compare_dashmaps(&upstreams, &prev_upstreams) { + let mut tosend: Configuration = Configuration { + upstreams: Default::default(), + headers: Default::default(), + consul: None, + kubernetes: None, + typecfg: "".to_string(), + extraparams: config.extraparams.clone(), + }; + + clone_dashmap_into(&upstreams, &prev_upstreams); + clone_dashmap_into(&upstreams, &tosend.upstreams); + tosend.headers = headers.clone(); + tosend.extraparams.authentication = config.extraparams.authentication.clone(); + tosend.typecfg = config.typecfg.clone(); + tosend.consul = config.consul.clone(); + print_upstreams(&tosend.upstreams); + toreturn.send(tosend).await.unwrap(); + } + } + None => {} + } + sleep(Duration::from_secs(5)).await; } } None => {} diff --git a/src/utils/discovery.rs b/src/utils/discovery.rs index ec5568f..826f87a 100644 --- a/src/utils/discovery.rs +++ b/src/utils/discovery.rs @@ -1,13 +1,11 @@ -use crate::utils::consul; use crate::utils::filewatch; use crate::utils::structs::Configuration; +use crate::utils::{consul, kuber}; use crate::web::webserver; use async_trait::async_trait; use futures::channel::mpsc::Sender; +use std::sync::Arc; -pub struct FromFileProvider { - pub path: String, -} pub struct APIUpstreamProvider { pub config_api_enabled: bool, pub address: String, @@ -19,15 +17,6 @@ pub struct APIUpstreamProvider { pub file_server_folder: Option, } -pub struct ConsulProvider { - pub path: String, -} - -#[async_trait] -pub trait Discovery { - async fn start(&self, tx: Sender); -} - #[async_trait] impl Discovery for APIUpstreamProvider { async fn start(&self, toreturn: Sender) { @@ -35,6 +24,23 @@ impl Discovery for APIUpstreamProvider { } } +pub struct FromFileProvider { + pub path: String, +} + +pub struct ConsulProvider { + pub config: Arc, +} + +pub struct KubernetesProvider { + pub config: Arc, +} + +#[async_trait] +pub trait Discovery { + async fn start(&self, tx: Sender); +} + #[async_trait] impl Discovery for FromFileProvider { async fn start(&self, tx: Sender) { @@ -45,6 +51,13 @@ impl Discovery for FromFileProvider { #[async_trait] impl Discovery for ConsulProvider { async fn start(&self, tx: Sender) { - tokio::spawn(consul::start(self.path.clone(), tx.clone())); + tokio::spawn(consul::start(tx.clone(), self.config.clone())); + } +} + +#[async_trait] +impl Discovery for KubernetesProvider { + async fn start(&self, tx: Sender) { + tokio::spawn(kuber::start(tx.clone(), self.config.clone())); } } diff --git a/src/utils/dnsclient.rs b/src/utils/dnsclient.rs new file mode 100644 index 0000000..10a6aa9 --- /dev/null +++ b/src/utils/dnsclient.rs @@ -0,0 +1,158 @@ +/* +use crate::utils::structs::InnerMap; +use dashmap::DashMap; +use hickory_client::client::{Client, ClientHandle}; +use hickory_client::proto::rr::{DNSClass, Name, RecordType}; +use hickory_client::proto::runtime::TokioRuntimeProvider; +use hickory_client::proto::udp::UdpClientStream; +use std::net::SocketAddr; +use std::str::FromStr; +use std::sync::atomic::AtomicUsize; +use std::time::Duration; +use tokio::sync::Mutex; + +type DnsError = Box; + +pub struct DnsClientPool { + clients: Vec>, +} + +struct DnsClient { + client: Client, +} + +pub async fn start2(mut toreturn: Sender, config: Arc) { + let k8s = config.kubernetes.clone(); + match k8s { + Some(k8s) => { + let dnserver = k8s.servers.unwrap_or(vec!["127.0.0.1:53".to_string()]); + let headers = DashMap::new(); + let end = dnserver.len() - 1; + let mut num = 0; + if end > 0 { + num = rand::rng().random_range(0..end); + } + let srv = dnserver.get(num).unwrap().to_string(); + let pool = DnsClientPool::new(5, srv.clone()).await; + let u = UpstreamsDashMap::new(); + if let Some(whitelist) = k8s.services { + loop { + let upstreams = UpstreamsDashMap::new(); + for service in whitelist.iter() { + let ret = pool.query_srv(service.real.as_str(), srv.clone()).await; + match ret { + Ok(r) => { + upstreams.insert(service.proxy.clone(), r); + } + Err(e) => eprintln!("DNS query failed for {:?}: {:?}", service, e), + } + } + if !compare_dashmaps(&u, &upstreams) { + headers.clear(); + for (k, v) in config.headers.clone() { + headers.insert(k.to_string(), v); + } + + let mut tosend: Configuration = Configuration { + upstreams: Default::default(), + headers: Default::default(), + consul: None, + kubernetes: None, + typecfg: "".to_string(), + extraparams: config.extraparams.clone(), + }; + + clone_dashmap_into(&upstreams, &u); + clone_dashmap_into(&upstreams, &tosend.upstreams); + tosend.headers = headers.clone(); + tosend.extraparams.authentication = config.extraparams.authentication.clone(); + tosend.typecfg = config.typecfg.clone(); + tosend.consul = config.consul.clone(); + print_upstreams(&tosend.upstreams); + toreturn.send(tosend).await.unwrap(); + } + + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + None => {} + } +} + +impl DnsClient { + pub async fn new(server: String) -> Result { + let server_details = server; + let server: SocketAddr = server_details.parse().expect("Unable to parse socket address"); + let conn = UdpClientStream::builder(server, TokioRuntimeProvider::default()).build(); + let (client, bg) = Client::connect(conn).await.unwrap(); + tokio::spawn(bg); + Ok(Self { client }) + } + + pub async fn query_srv(&mut self, name: &str) -> Result, AtomicUsize)>, DnsError> { + let upstreams: DashMap, AtomicUsize)> = DashMap::new(); + let mut values = Vec::new(); + match tokio::time::timeout(Duration::from_secs(5), self.client.query(Name::from_str(name)?, DNSClass::IN, RecordType::SRV)).await { + Ok(Ok(response)) => { + for answer in response.answers() { + if let hickory_client::proto::rr::RData::SRV(srv) = answer.data() { + let to_add = InnerMap { + address: srv.target().to_string(), + port: srv.port(), + is_ssl: false, + is_http2: false, + to_https: false, + rate_limit: None, + }; + values.push(to_add); + } + } + upstreams.insert("/".to_string(), (values, AtomicUsize::new(0))); + Ok(upstreams) + } + Ok(Err(e)) => Err(Box::new(e)), + Err(_) => Err("DNS query timed out".into()), + } + } +} + +impl DnsClientPool { + pub async fn new(pool_size: usize, server: String) -> Self { + let mut clients = Vec::with_capacity(pool_size); + for _ in 0..pool_size { + if let Ok(client) = DnsClient::new(server.clone()).await { + clients.push(Mutex::new(client)); + } + } + Self { clients } + } + + pub async fn query_srv(&self, name: &str, server: String) -> Result, AtomicUsize)>, DnsError> { + // Try to get an available client + for client_mutex in &self.clients { + if let Ok(mut client) = client_mutex.try_lock() { + let vay = client.query_srv(name).await; + match vay { + Ok(_) => return vay, + Err(_) => { + // If query fails, drop this client and create a new one + *client = match DnsClient::new(server).await { + Ok(c) => c, + Err(e) => return Err(e), + }; + // Retry with the new client + return client.query_srv(name).await; + } + } + } + } + + // If all clients are busy, wait for the first one with a timeout + match tokio::time::timeout(Duration::from_secs(2), self.clients[0].lock()).await { + Ok(mut client) => client.query_srv(name).await, + Err(_) => Err("All DNS clients are busy and timeout reached".into()), + } + } +} +*/ diff --git a/src/utils/filewatch.rs b/src/utils/filewatch.rs index f246dc9..1e923e7 100644 --- a/src/utils/filewatch.rs +++ b/src/utils/filewatch.rs @@ -2,7 +2,7 @@ use crate::utils::parceyaml::load_configuration; use crate::utils::structs::Configuration; use futures::channel::mpsc::Sender; use futures::SinkExt; -use log::{error, info, warn}; +use log::error; use notify::event::ModifyKind; use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use pingora::prelude::sleep; @@ -15,19 +15,7 @@ pub async fn start(fp: String, mut toreturn: Sender) { let file_path = fp.as_str(); let parent_dir = Path::new(file_path).parent().unwrap(); let (local_tx, mut local_rx) = tokio::sync::mpsc::channel::>(1); - let snd = load_configuration(file_path, "filepath").await; - match snd { - Some(snd) => { - if snd.typecfg != "file" { - warn!("Disabling file watcher, requested discovery type is: {}", snd.typecfg); - return; - } - info!("Watching for changes in {:?}", parent_dir); - toreturn.send(snd).await.unwrap(); - } - None => {} - } let _watcher_handle = task::spawn_blocking({ let parent_dir = parent_dir.to_path_buf(); // Move directory path into the closure move || { diff --git a/src/utils/kuber.rs b/src/utils/kuber.rs new file mode 100644 index 0000000..8dc18e8 --- /dev/null +++ b/src/utils/kuber.rs @@ -0,0 +1,133 @@ +// use crate::utils::dnsclient::DnsClientPool; +use crate::utils::structs::{Configuration, InnerMap, UpstreamsDashMap}; +use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, print_upstreams}; +use dashmap::DashMap; +use futures::channel::mpsc::Sender; +use futures::SinkExt; +use pingora::prelude::sleep; +use rand::Rng; +use reqwest::Client; +use std::env; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; +use std::time::Duration; +use tokio::fs::File; +use tokio::io::AsyncReadExt; + +// static KUBERNETES_SERVICE_HOST: &str = "140.238.122.18:6443"; +// static TOKEN: &str = "eyJhbGciOiJSUzI1NiIsImtpZCI6InJXMzl2VFlHTDM4V0tBbWxNQnRhbnRVcDlvcS10MjRDVHNwc2p3d1ZXeDAifQ.eyJhdWQiOlsiaHR0cHM6Ly9rdWJlcm5ldGVzLmRlZmF1bHQuc3ZjLmNsdXN0ZXIubG9jYWwiXSwiZXhwIjoxNzg4MjY3OTgxLCJpYXQiOjE3NTY3MzE5ODEsImlzcyI6Imh0dHBzOi8va3ViZXJuZXRlcy5kZWZhdWx0LnN2Yy5jbHVzdGVyLmxvY2FsIiwianRpIjoiMDM4ZWVlODAtNGViZi00MjU5LWI0OTctYmYwNDgxMmYyNWJhIiwia3ViZXJuZXRlcy5pbyI6eyJuYW1lc3BhY2UiOiJzdGFnaW5nIiwibm9kZSI6eyJuYW1lIjoiMTcyLjE2LjAuMTU1IiwidWlkIjoiOGY2ZTk2ZjYtNDdjNS00YjZhLTkwMmUtYmZhZTUwNzcxMWFjIn0sInBvZCI6eyJuYW1lIjoiYXJhbGV6LTY3Njg5OGY5ZDUtaHpienEiLCJ1aWQiOiI2NjAyZDFjNy00ZWM2LTRiZDEtYTc3NS00NzI5OGYyMTc0N2QifSwic2VydmljZWFjY291bnQiOnsibmFtZSI6ImFyYWxlei1zYSIsInVpZCI6IjVjYmYxZTU2LTJjY2YtNDFlMS05OGU2LTc3NmY1ZWY1NGRkOCJ9LCJ3YXJuYWZ0ZXIiOjE3NTY3MzU1ODh9LCJuYmYiOjE3NTY3MzE5ODEsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpzdGFnaW5nOmFyYWxlei1zYSJ9.fnlQoOMEztWx6aE4JNPCMVDVc4s8ygs1tvuU_4FoNTWnTipFivzV3duHAg5sHFnLexYEbeBzWqr1betk__ATfy5RB5UaDdg0kk2AdVjYhvUfW4FcIqPNzXBUd74BOUG8vhN6bhZTQC8Fh0eCLCn9XXwVGIO94LKi9LmLbFiDAhpQDGwbXYnI8kV4nNGRE3kf0fsb6SyHs_8bOGc-t2U6OPdAFqBsk4JliaqiXhJKsoc8JCfUkcxYkT7GqIZxFYpvgisbOdZL7_iVyLU1BiSPMHb0jFa4O60aZ8EzCR7Mio0F5A8eZjSCf_f90ecUCGFuW3eCTCDd02RutXeSyPqxhQ"; + +#[derive(Debug, serde::Deserialize)] +struct Endpoints { + subsets: Option>, +} + +#[derive(Debug, serde::Deserialize)] +struct Subset { + addresses: Option>, + ports: Option>, +} + +#[derive(Debug, serde::Deserialize)] +struct Address { + ip: String, +} + +#[derive(Debug, serde::Deserialize)] +struct Port { + // name: String, + port: u16, +} + +pub async fn start(mut toreturn: Sender, config: Arc) { + let upstreams = UpstreamsDashMap::new(); + let prev_upstreams = UpstreamsDashMap::new(); + loop { + if let Some(kuber) = config.kubernetes.clone() { + let path = kuber.tokenpath.unwrap_or("/var/run/secrets/kubernetes.io/serviceaccount/token".to_string()); + let token = read_token(path.as_str()).await; + let servers = kuber.servers.unwrap_or(vec![format!( + "{}:{}", + env::var("KUBERNETES_SERVICE_HOST").unwrap_or("0.0.0.0".to_string()), + env::var("KUBERNETES_SERVICE_PORT_HTTPS").unwrap_or("0".to_string()) + )]); + let end = servers.len() - 1; + let mut num = 0; + if end > 0 { + num = rand::rng().random_range(0..end); + } + let server = servers.get(num).unwrap().to_string(); + + if let Some(svc) = kuber.services { + for i in svc { + let url = format!("https://{}/api/v1/namespaces/staging/endpoints/{}", server, i.real); + let list = get_by_http(&*url, &*token).await; + if let Some(list) = list { + upstreams.insert(i.proxy.clone(), list); + } + } + } + } + + if !compare_dashmaps(&upstreams, &prev_upstreams) { + let tosend: Configuration = Configuration { + upstreams: Default::default(), + headers: config.headers.clone(), + consul: config.consul.clone(), + kubernetes: config.kubernetes.clone(), + typecfg: config.typecfg.clone(), + extraparams: config.extraparams.clone(), + }; + + clone_dashmap_into(&upstreams, &prev_upstreams); + clone_dashmap_into(&upstreams, &tosend.upstreams); + print_upstreams(&tosend.upstreams); + toreturn.send(tosend).await.unwrap(); + } + sleep(Duration::from_secs(5)).await; + } +} + +pub async fn get_by_http(url: &str, token: &str) -> Option, AtomicUsize)>> { + let client = Client::builder().timeout(Duration::from_secs(2)).danger_accept_invalid_certs(true).build().ok()?; + + let resp = client.get(url).bearer_auth(token).send().await.ok()?; + + if !resp.status().is_success() { + eprintln!("Kubernetes API returned status: {}", resp.status()); + return None; + } + + let endpoints: Endpoints = resp.json().await.ok()?; + let upstreams: DashMap, AtomicUsize)> = DashMap::new(); + + if let Some(subsets) = endpoints.subsets { + for subset in subsets { + if let (Some(addresses), Some(ports)) = (subset.addresses, subset.ports) { + for addr in addresses { + let mut inner_vec = Vec::new(); + for port in &ports { + let to_add = InnerMap { + address: addr.ip.clone(), + port: port.port.clone(), + is_ssl: false, + is_http2: false, + to_https: false, + rate_limit: None, + }; + inner_vec.push(to_add); + } + upstreams.insert("/".to_string(), (inner_vec, AtomicUsize::new(0))); + } + } + } + } + Some(upstreams) +} + +async fn read_token(path: &str) -> String { + let mut file = File::open(path).await.unwrap(); + let mut contents = String::new(); + file.read_to_string(&mut contents).await.unwrap(); + contents.trim().to_string() +} diff --git a/src/utils/parceyaml.rs b/src/utils/parceyaml.rs index a52214f..20ed119 100644 --- a/src/utils/parceyaml.rs +++ b/src/utils/parceyaml.rs @@ -52,13 +52,12 @@ pub async fn load_configuration(d: &str, kind: &str) -> Option { } "consul" => { toreturn.consul = parsed.consul; - if toreturn.consul.is_some() { - Some(toreturn) - } else { - None - } + toreturn.consul.is_some().then_some(toreturn) + } + "kubernetes" => { + toreturn.kubernetes = parsed.kubernetes; + toreturn.kubernetes.is_some().then_some(toreturn) } - "kubernetes" => None, _ => { warn!("Unknown provider {}", parsed.provider); None diff --git a/src/utils/structs.rs b/src/utils/structs.rs index fd4e346..966b5f0 100644 --- a/src/utils/structs.rs +++ b/src/utils/structs.rs @@ -21,6 +21,12 @@ pub struct Extraparams { pub authentication: DashMap>, pub rate_limit: Option, } +#[derive(Clone, Default, Debug, Serialize, Deserialize)] +pub struct Kubernetes { + pub servers: Option>, + pub services: Option>, + pub tokenpath: Option, +} #[derive(Clone, Default, Debug, Serialize, Deserialize)] pub struct Consul { @@ -44,6 +50,8 @@ pub struct Config { #[serde(default)] pub consul: Option, #[serde(default)] + pub kubernetes: Option, + #[serde(default)] pub rate_limit: Option, } @@ -65,6 +73,7 @@ pub struct Configuration { pub upstreams: UpstreamsDashMap, pub headers: Headers, pub consul: Option, + pub kubernetes: Option, pub typecfg: String, pub extraparams: Extraparams, } diff --git a/src/web/bgservice.rs b/src/web/bgservice.rs index 2ba55c2..a0a6444 100644 --- a/src/web/bgservice.rs +++ b/src/web/bgservice.rs @@ -1,4 +1,5 @@ -use crate::utils::discovery::{APIUpstreamProvider, ConsulProvider, Discovery, FromFileProvider}; +use crate::utils::discovery::{APIUpstreamProvider, ConsulProvider, Discovery, FromFileProvider, KubernetesProvider}; +use crate::utils::parceyaml::load_configuration; use crate::utils::structs::Configuration; use crate::utils::tools::*; use crate::utils::*; @@ -6,8 +7,8 @@ use crate::web::proxyhttp::LB; use async_trait::async_trait; use dashmap::DashMap; use futures::channel::mpsc; -use futures::StreamExt; -use log::info; +use futures::{SinkExt, StreamExt}; +use log::{error, info}; use pingora_core::server::ShutdownWatch; use pingora_core::services::background::BackgroundService; use std::sync::Arc; @@ -15,23 +16,38 @@ use std::sync::Arc; #[async_trait] impl BackgroundService for LB { async fn start(&self, mut shutdown: ShutdownWatch) { - info!("Starting background service"); - let (tx, mut rx) = mpsc::channel::(0); + info!("Starting background service"); // tx: Sender + let (mut tx, mut rx) = mpsc::channel::(1); + let tx_api = tx.clone(); + let config = load_configuration(self.config.upstreams_conf.clone().as_str(), "filepath") + .await + .expect("Failed to load configuration"); - let tx_file = tx.clone(); - let tx_consul = tx.clone(); - - let file_load = FromFileProvider { - path: self.config.upstreams_conf.clone(), - }; - let consul_load = ConsulProvider { - path: self.config.upstreams_conf.clone(), - }; - - let _ = tokio::spawn(async move { file_load.start(tx_file).await }); - let _ = tokio::spawn(async move { consul_load.start(tx_consul).await }); - // let _ = tokio::spawn(tls::watch_certs(self.config.proxy_certificates.clone().unwrap(), self.cert_tx.clone())); - // let _ = tokio::spawn(tls::watch_certs(self.config.proxy_certificates.clone().unwrap(), self.cert_tx.clone())).await; + match config.typecfg.as_str() { + "file" => { + info!("Running File discovery, requested type is: {}", config.typecfg); + tx.send(config).await.unwrap(); + let file_load = FromFileProvider { + path: self.config.upstreams_conf.clone(), + }; + let _ = tokio::spawn(async move { file_load.start(tx).await }); + } + "kubernetes" => { + info!("Running Kubernetes discovery, requested type is: {}", config.typecfg); + let cf = Arc::from(config); + let kuber_load = KubernetesProvider { config: cf.clone() }; + let _ = tokio::spawn(async move { kuber_load.start(tx).await }); + } + "consul" => { + info!("Running Consul discovery, requested type is: {}", config.typecfg); + let cf = Arc::from(config); + let consul_load = ConsulProvider { config: cf.clone() }; + let _ = tokio::spawn(async move { consul_load.start(tx).await }); + } + _ => { + error!("Unknown discovery type: {}", config.typecfg); + } + } let api_load = APIUpstreamProvider { address: self.config.config_address.clone(), @@ -43,7 +59,7 @@ impl BackgroundService for LB { file_server_address: self.config.file_server_address.clone(), file_server_folder: self.config.file_server_folder.clone(), }; - let tx_api = tx.clone(); + // let tx_api = tx.clone(); let _ = tokio::spawn(async move { api_load.start(tx_api).await }); let uu = self.ump_upst.clone();