diff --git a/Cargo.lock b/Cargo.lock index 54d5bdc..f37d6fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,7 +27,7 @@ dependencies = [ "getrandom 0.2.15", "once_cell", "version_check", - "zerocopy", + "zerocopy 0.7.35", ] [[package]] @@ -122,6 +122,17 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "async-recursion" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -164,14 +175,17 @@ dependencies = [ "dashmap", "env_logger", "futures", + "hickory-client", "log", "notify", "pingora", "pingora-core", "pingora-http", "pingora-proxy", + "rand 0.8.5", "reqwest", "serde", + "serde_json", "serde_yaml", "tokio", ] @@ -448,6 +462,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "critical-section" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" + [[package]] name = "crossbeam-channel" version = "0.5.14" @@ -553,6 +573,24 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + +[[package]] +name = "enum-as-inner" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1e6a265c649f3f5979b601d26f1d05ada116434c87741c9493cb56218f76cbc" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "env_filter" version = "0.1.3" @@ -863,6 +901,52 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hickory-client" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bbd1b5def7a1b77783366577e86cb51172196f689823b0f8107da9391ba183f" +dependencies = [ + "cfg-if", + "data-encoding", + "futures-channel", + "futures-util", + "hickory-proto", + "once_cell", + "radix_trie", + "rand 0.9.0", + "thiserror 2.0.12", + "tokio", + "tracing", +] + +[[package]] +name = "hickory-proto" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d844af74f7b799e41c78221be863bade11c430d46042c3b49ca8ae0c6d27287" +dependencies = [ + "async-recursion", + "async-trait", + "cfg-if", + "critical-section", + "data-encoding", + "enum-as-inner", + "futures-channel", + "futures-io", + "futures-util", + "idna", + "ipnet", + "once_cell", + "rand 0.9.0", + "ring", + "thiserror 2.0.12", + "tinyvec", + "tokio", + "tracing", + "url", +] + [[package]] name = "hostname" version = "0.3.1" @@ -1400,6 +1484,15 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "nix" version = "0.24.3" @@ -1460,6 +1553,10 @@ name = "once_cell" version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +dependencies = [ + "critical-section", + "portable-atomic", +] [[package]] name = "openssl" @@ -1651,7 +1748,7 @@ dependencies = [ "pingora-runtime", "pingora-timeout", "prometheus", - "rand", + "rand 0.8.5", "regex", "serde", "serde_yaml", @@ -1727,7 +1824,7 @@ dependencies = [ "pingora-http", "pingora-ketama", "pingora-runtime", - "rand", + "rand 0.8.5", "tokio", ] @@ -1740,7 +1837,7 @@ dependencies = [ "arrayvec", "hashbrown 0.15.2", "parking_lot", - "rand", + "rand 0.8.5", ] [[package]] @@ -1802,7 +1899,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31a7c445ca224630961045684201e3cf8da9af0b01f286ed54ff8b2403aaabff" dependencies = [ "once_cell", - "rand", + "rand 0.8.5", "thread_local", "tokio", ] @@ -1847,7 +1944,7 @@ version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" dependencies = [ - "zerocopy", + "zerocopy 0.7.35", ] [[package]] @@ -1895,7 +1992,7 @@ dependencies = [ "memchr", "parking_lot", "protobuf", - "thiserror", + "thiserror 1.0.69", ] [[package]] @@ -1913,6 +2010,16 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.8.5" @@ -1920,8 +2027,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", + "zerocopy 0.8.24", ] [[package]] @@ -1931,7 +2049,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", ] [[package]] @@ -1943,6 +2071,15 @@ dependencies = [ "getrandom 0.2.15", ] +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.1", +] + [[package]] name = "redox_syscall" version = "0.5.8" @@ -2137,7 +2274,7 @@ checksum = "a44822b10c095e574869de2b891e40c724fef42cadaea040d1cd3bdbb13d36a5" dependencies = [ "backtrace", "crossbeam-channel", - "rand", + "rand 0.8.5", "trackable 0.2.24", ] @@ -2150,7 +2287,7 @@ dependencies = [ "crossbeam-channel", "hostname", "percent-encoding", - "rand", + "rand 0.8.5", "rustracing", "thrift_codec", "trackable 0.2.24", @@ -2470,7 +2607,16 @@ version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.69", +] + +[[package]] +name = "thiserror" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" +dependencies = [ + "thiserror-impl 2.0.12", ] [[package]] @@ -2484,6 +2630,17 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "thiserror-impl" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "thread_local" version = "1.1.8" @@ -2514,6 +2671,21 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinyvec" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09b3661f17e86524eccd4371ab0429194e0d7c008abb45f7a7495b1719463c71" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.44.1" @@ -3147,7 +3319,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ "byteorder", - "zerocopy-derive", + "zerocopy-derive 0.7.35", +] + +[[package]] +name = "zerocopy" +version = "0.8.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2586fea28e186957ef732a5f8b3be2da217d65c5969d4b1e17f973ebbe876879" +dependencies = [ + "zerocopy-derive 0.8.24", ] [[package]] @@ -3161,6 +3342,17 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "zerocopy-derive" +version = "0.8.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a996a8f63c5c4448cd959ac1bab0aaa3306ccfd060472f85943ee0750f0169be" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "zerofrom" version = "0.1.5" diff --git a/Cargo.toml b/Cargo.toml index 747b1d8..17c5642 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,5 +17,9 @@ log = "0.4.26" futures = "0.3.31" notify = "8.0.0" axum = "0.8.1" -reqwest = "0.12.15" -serde_yaml = "0.8.26" \ No newline at end of file +reqwest = { version = "0.12.15", features = ["json"] } +serde_yaml = "0.8.26" +hickory-client = "0.25.1" +serde_json = "1.0.137" +rand = "0.8.5" + diff --git a/etc/upstreams.yaml b/etc/upstreams.yaml index 5a7dc5d..1152f08 100644 --- a/etc/upstreams.yaml +++ b/etc/upstreams.yaml @@ -1,14 +1,26 @@ -provider: "file" +provider: "consul" globals: headers: - "Access-Control-Allow-Origin:*" - "Access-Control-Allow-Methods:POST, GET, OPTIONS" - "Access-Control-Max-Age:86400" +consul: + servers: + - "master1:8500" + - "192.168.22.1:8500" + - "master1.digitai.local:8500" + services: + - "_frontend-dev-frontend-srv._tcp.service.consul." + - "_gateway-test-gateway-srv._tcp.service.consul" + - "_gateway-stage-gateway-srv._tcp.service.consul" upstreams: myip.netangels.net: paths: "/": ssl: false + headers: + - "X-Some-Thing:Yaaaaaaaaaaaaaaa" + - "X-Prox-From:Hopaaaaaaaaaaaar" servers: - "127.0.0.1:8000" - "127.0.0.2:8000" diff --git a/src/utils.rs b/src/utils.rs index db4f48b..ac416b0 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,3 +1,4 @@ +pub mod consul; pub mod discovery; mod filewatch; pub mod healthcheck; diff --git a/src/utils/consul.rs b/src/utils/consul.rs new file mode 100644 index 0000000..90ff394 --- /dev/null +++ b/src/utils/consul.rs @@ -0,0 +1,156 @@ +use crate::utils::tools::{Headers, UpstreamsDashMap}; +use futures::channel::mpsc::Sender; +use std::collections::HashMap; +use std::time::Duration; + +use crate::utils::parceyaml::load_configuration; +use dashmap::DashMap; +use futures::SinkExt; +use hickory_client::client::{Client, ClientHandle}; +use hickory_client::proto::rr::{DNSClass, Name, RecordType}; +use hickory_client::proto::runtime::TokioRuntimeProvider; +use hickory_client::proto::tcp::TcpClientStream; +use log::info; +use pingora::prelude::sleep; +use rand::Rng; +use serde::Deserialize; +use std::str::FromStr; +use std::sync::atomic::AtomicUsize; + +#[derive(Debug, Deserialize)] +struct Service { + // #[serde(rename = "ServiceName")] + // service_name: String, + #[serde(rename = "ServiceTaggedAddresses")] + tagged_addresses: HashMap, +} + +#[derive(Debug, Deserialize)] +struct TaggedAddress { + #[serde(rename = "Address")] + address: String, + #[serde(rename = "Port")] + port: u16, +} + +pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)>) { + let config = load_configuration(fp.as_str(), "filepath"); + let headers = DashMap::new(); + // println!("{:?}", config); + match config { + Some(config) => { + let conf: Vec<&str> = config.2.split_whitespace().collect(); + let y = conf.get(0).unwrap(); + if y.to_string() != "consul" { + info!("Not running Consul discovery, requested type is: {}", config.2); + return; + } + info!("Consul Discovery is enabled : {}", config.2); + let end = conf.len(); + loop { + let num = rand::thread_rng().gen_range(1..end); + sleep(Duration::from_secs(5)).await; + headers.clear(); + for (k, v) in config.1.clone() { + headers.insert(k.to_string(), v); + } + let consul = "http://".to_string() + conf.get(num).unwrap(); + let upstreams = http_request(consul, "GET"); + match upstreams.await { + Some(upstreams) => { + toreturn.send((upstreams, headers.clone())).await.unwrap(); + } + None => {} + } + } + } + None => {} + } +} + +async fn http_request(url: String, method: &str) -> Option { + let client = reqwest::Client::new(); + let to = Duration::from_secs(1); + let upstreams = UpstreamsDashMap::new(); + let excludes = vec!["consul", "nomad", "nomad-client"]; + match method { + "GET" => { + let ss = url.clone() + "/v1/catalog/service"; + let response = client.get(ss.clone() + "s").timeout(to).send().await; + match response { + Ok(r) => { + let json = r.json::>>().await; + match json { + Ok(_j) => { + for (k, _v) in _j { + if !excludes.iter().any(|&i| i == k) { + let mut pref: String = ss.clone() + "/"; + pref.push_str(&k); + let list = get_by_http(pref).await; + match list { + Some(list) => { + upstreams.insert(k.to_string(), list); + } + None => {} + } + } + } + // print_upstreams(&upstreams); + Some(upstreams) + } + Err(_) => None, + } + } + Err(e) => { + println!("Error: {:?}", e); + None + } + } + } + _ => None, + } +} + +async fn get_by_http(url: String) -> Option, AtomicUsize)>> { + let client = reqwest::Client::new(); + let to = Duration::from_secs(1); + let u = client.get(url.clone()).timeout(to).send(); + let mut values = Vec::new(); + let upstreams: DashMap, AtomicUsize)> = DashMap::new(); + match u.await { + Ok(r) => { + let jason = r.json::>().await; + match jason { + Ok(services) => { + for service in services { + let addr = service.tagged_addresses.get("lan_ipv4").unwrap().address.clone(); + let prt = service.tagged_addresses.get("lan_ipv4").unwrap().port.clone(); + let to_add = (addr, prt, false); + values.push(to_add); + } + } + Err(_) => return None, + } + } + Err(_) => return None, + } + upstreams.insert("/".to_string(), (values, AtomicUsize::new(0))); + Some(upstreams) +} + +#[allow(dead_code)] +async fn get_by_dns() { + let (stream, sender) = TcpClientStream::new(([192, 168, 22, 1], 53).into(), None, None, TokioRuntimeProvider::new()); + let client = Client::new(stream, sender, None); + let (mut client, bg) = client.await.expect("connection failed"); + tokio::spawn(bg); + let query = client.query(Name::from_str("_frontend-dev-frontend-srv._tcp.service.consul.").unwrap(), DNSClass::IN, RecordType::SRV); + // let query = client.query(Name::from_str("matyan.org.").unwrap(), DNSClass::IN, RecordType::A); + let response = query.await.unwrap(); + + for t in response.answers().iter() { + for y in t.data().as_srv().iter() { + println!(" DNS ==> {:?} : {:?}", y.target().to_utf8(), y.port()); + } + } +} diff --git a/src/utils/discovery.rs b/src/utils/discovery.rs index 6229871..2a37f9a 100644 --- a/src/utils/discovery.rs +++ b/src/utils/discovery.rs @@ -1,3 +1,4 @@ +use crate::utils::consul; use crate::utils::filewatch; use crate::utils::tools::*; use crate::web::webserver; @@ -11,6 +12,10 @@ pub struct APIUpstreamProvider { pub address: String, } +pub struct ConsulProvider { + pub path: String, +} + #[async_trait] pub trait Discovery { async fn start(&self, tx: Sender<(UpstreamsDashMap, Headers)>); @@ -29,3 +34,10 @@ impl Discovery for FromFileProvider { tokio::spawn(filewatch::start(self.path.clone(), tx.clone())); } } + +#[async_trait] +impl Discovery for ConsulProvider { + async fn start(&self, tx: Sender<(UpstreamsDashMap, Headers)>) { + tokio::spawn(consul::start(self.path.clone(), tx.clone())); + } +} diff --git a/src/utils/filewatch.rs b/src/utils/filewatch.rs index 57a26bd..92f4a2a 100644 --- a/src/utils/filewatch.rs +++ b/src/utils/filewatch.rs @@ -20,7 +20,7 @@ pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)> match snd { Some(snd) => { - toreturn.send(snd).await.unwrap(); + toreturn.send((snd.0, snd.1)).await.unwrap(); } None => {} } @@ -49,11 +49,11 @@ pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)> if e.paths[0].to_str().unwrap().ends_with("yaml") { if start.elapsed() > Duration::from_secs(2) { start = Instant::now(); - info!("Config File changed :=> {:?}", e); + // info!("Config File changed :=> {:?}", e); let snd = load_configuration(file_path, "filepath"); match snd { Some(snd) => { - toreturn.send(snd).await.unwrap(); + toreturn.send((snd.0, snd.1)).await.unwrap(); } None => {} } diff --git a/src/utils/healthcheck.rs b/src/utils/healthcheck.rs index 19b52fc..3160918 100644 --- a/src/utils/healthcheck.rs +++ b/src/utils/healthcheck.rs @@ -1,5 +1,6 @@ use crate::utils::tools::*; use dashmap::DashMap; +use log::warn; use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::time::Duration; @@ -33,7 +34,7 @@ pub async fn hc2(upslist: Arc, fullist: Arc) innervec.push(k.1.clone()); } false => { - println!("Dead Upstream {}, Link: {}",k.0, link); + warn!("Dead Upstream : {}", link); } } } @@ -42,7 +43,7 @@ pub async fn hc2(upslist: Arc, fullist: Arc) totest.insert(host.clone(), inner); } if ! compare_dashmaps(&totest, &upslist){ - print_upstreams(&totest); + // print_upstreams(&totest); clone_dashmap_into(&totest, &upslist); } } diff --git a/src/utils/parceyaml.rs b/src/utils/parceyaml.rs index ed2d8ea..e3701ca 100644 --- a/src/utils/parceyaml.rs +++ b/src/utils/parceyaml.rs @@ -7,11 +7,17 @@ use std::collections::HashMap; use std::fs; use std::sync::atomic::AtomicUsize; +#[derive(Debug, Serialize, Deserialize)] +struct Consul { + servers: Option>, + services: Option>, +} #[derive(Debug, Serialize, Deserialize)] struct Config { provider: String, upstreams: Option>, globals: Option>>, + consul: Option, } #[derive(Debug, Serialize, Deserialize)] @@ -26,9 +32,18 @@ struct PathConfig { headers: Option>, } -pub fn load_configuration(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Headers)> { - let dashmap = UpstreamsDashMap::new(); - let headerm = DashMap::new(); +// #[derive(Debug, Serialize, Deserialize)] +// pub struct Allconfig { +// pub upstreams: Option, +// pub headers: Option, +// pub consul: Option, +// pub typecfg: String, +// } + +// pub fn load_configuration(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Headers, String)> { +pub fn load_configuration(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Headers, String)> { + let upstreamsmap = UpstreamsDashMap::new(); + let headersmap = DashMap::new(); let mut yaml_data = d.to_string(); match kind { "filepath" => { @@ -53,52 +68,91 @@ pub fn load_configuration(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Head let p: Result = serde_yaml::from_str(&yaml_data); match p { Ok(parsed) => { + let global_headers = DashMap::new(); + let mut hl = Vec::new(); + if let Some(globals) = &parsed.globals { + for headers in globals.get("headers").iter().by_ref() { + for header in headers.iter() { + if let Some((key, val)) = header.split_once(':') { + hl.push((key.to_string(), val.to_string())); + } + } + } + global_headers.insert("/".to_string(), hl); + headersmap.insert("GLOBAL_HEADERS".to_string(), global_headers); + } + match parsed.provider.as_str() { - "file" => {} - "consul" => return None, - "kubernetes" => return None, - _ => warn!("Unknown provider {}", parsed.provider), - }; - if let Some(upstream) = parsed.upstreams { - for (hostname, host_config) in upstream { - let path_map = DashMap::new(); - let header_list = DashMap::new(); - for (path, path_config) in host_config.paths { - let mut server_list = Vec::new(); - let mut hl = Vec::new(); - // Set global headers - if let Some(globals) = &parsed.globals { - for headers in globals.get("headers").iter().by_ref() { - for header in headers.iter() { - if let Some((key, val)) = header.split_once(':') { - hl.push((key.to_string(), val.to_string())); + "file" => { + if let Some(upstream) = parsed.upstreams { + for (hostname, host_config) in upstream { + let path_map = DashMap::new(); + let header_list = DashMap::new(); + for (path, path_config) in host_config.paths { + let mut server_list = Vec::new(); + let mut hl = Vec::new(); + // Set global headers + // if let Some(globals) = &parsed.globals { + // for headers in globals.get("headers").iter().by_ref() { + // for header in headers.iter() { + // if let Some((key, val)) = header.split_once(':') { + // hl.push((key.to_string(), val.to_string())); + // } + // } + // } + // } + // Set per host/path headers + if let Some(headers) = &path_config.headers { + for header in headers.iter().by_ref() { + if let Some((key, val)) = header.split_once(':') { + hl.push((key.to_string(), val.to_string())); + } } } - } - } - // Set per host/path headers - if let Some(headers) = &path_config.headers { - for header in headers.iter().by_ref() { - if let Some((key, val)) = header.split_once(':') { - hl.push((key.to_string(), val.to_string())); + header_list.insert(path.clone(), hl); + for server in path_config.servers { + if let Some((ip, port_str)) = server.split_once(':') { + if let Ok(port) = port_str.parse::() { + server_list.push((ip.to_string(), port, path_config.ssl)); + } + } } + path_map.insert(path, (server_list, AtomicUsize::new(0))); } + headersmap.insert(hostname.clone(), header_list); + upstreamsmap.insert(hostname, path_map); } - header_list.insert(path.clone(), hl); - for server in path_config.servers { - if let Some((ip, port_str)) = server.split_once(':') { - if let Ok(port) = port_str.parse::() { - server_list.push((ip.to_string(), port, path_config.ssl)); - } - } - } - path_map.insert(path, (server_list, AtomicUsize::new(0))); } - headerm.insert(hostname.clone(), header_list); - dashmap.insert(hostname, path_map); + Some((upstreamsmap, headersmap, String::from("file"))) + } + "consul" => { + let consul = parsed.consul; + match consul { + Some(consul) => { + // println!("{:?}", consul.services); + if let Some(srv) = consul.servers { + let joined = srv.join(" "); + Some((upstreamsmap, headersmap, String::from("consul ") + &*joined)) + } else { + None + } + } + None => None, + } + // if let Some(srv) = parsed.consul?.servers { + // let joined = srv.join(" "); + // Some((upstreamsmap, headersmap, String::from("consul ") + &*joined)) + // } else { + // None + // } + // Some((upstreamsmap, headersmap, String::from("consul "))) + } + "kubernetes" => None, + _ => { + warn!("Unknown provider {}", parsed.provider); + None } } - Some((dashmap, headerm)) } Err(e) => { error!("Failed to parse upstreams file: {}", e); diff --git a/src/utils/tools.rs b/src/utils/tools.rs index 76e7092..08208c5 100644 --- a/src/utils/tools.rs +++ b/src/utils/tools.rs @@ -117,3 +117,13 @@ pub fn compare_dashmaps(map1: &UpstreamsDashMap, map2: &UpstreamsDashMap) -> boo } true } + +#[allow(dead_code)] +pub fn merge_headers(target: &DashMap>, source: &DashMap>) { + for entry in source.iter() { + let global_key = entry.key().clone(); + let global_values = entry.value().clone(); + let mut target_entry = target.entry(global_key).or_insert_with(Vec::new); + target_entry.extend(global_values); + } +} diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index b394b8c..6e56d5e 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -1,4 +1,4 @@ -use crate::utils::discovery::{APIUpstreamProvider, Discovery, FromFileProvider}; +use crate::utils::discovery::{APIUpstreamProvider, ConsulProvider, Discovery, FromFileProvider}; use crate::utils::tools::*; use crate::utils::*; use async_trait::async_trait; @@ -33,9 +33,14 @@ impl BackgroundService for LB { let from_file = self.config.get("upstreams_conf"); match from_file { Some(from_file) => { - let file_load = FromFileProvider { path: from_file.to_string() }; let tx_file = tx.clone(); + let tx_consul = tx.clone(); + + let file_load = FromFileProvider { path: from_file.to_string() }; + let consul_load = ConsulProvider { path: from_file.to_string() }; + let _ = tokio::spawn(async move { file_load.start(tx_file).await }); + let _ = tokio::spawn(async move { consul_load.start(tx_consul).await }); } None => { error!("Can't read config file"); @@ -68,16 +73,29 @@ impl BackgroundService for LB { val = rx.next() => { match val { Some(ss) => { - // let foo = compare_dashmaps(&*self.ump_full, &ss.0); - // println!("{:?}", ss.1); - // if !foo { clone_dashmap_into(&ss.0, &self.ump_full); clone_dashmap_into(&ss.0, &self.ump_upst); - for (k,v) in ss.1 { - self.headers.insert(k,v); + self.headers.clear(); + + for entry in ss.0.iter() { + let global_key = entry.key().clone(); + let global_values = DashMap::new(); + let mut target_entry = ss.1.entry(global_key).or_insert_with(DashMap::new); + target_entry.extend(global_values); + self.headers.insert(target_entry.key().to_owned(), target_entry.value().to_owned()); } - print_upstreams(&self.ump_full); - // } + + for path in ss.1.iter() { + let path_key = path.key().clone(); + let path_headers = path.value().clone(); + self.headers.insert(path_key.clone(), path_headers); + if let Some(global_headers) = ss.1.get("GLOBAL_HEADERS") { + if let Some(existing_headers) = self.headers.get_mut(&path_key) { + merge_headers(&existing_headers, &global_headers); + } + } + } + // print_upstreams(&self.ump_full); } None => {} } @@ -266,6 +284,6 @@ impl ProxyHttp for LB { async fn logging(&self, session: &mut Session, _e: Option<&pingora::Error>, ctx: &mut Self::CTX) { let response_code = session.response_written().map_or(0, |resp| resp.status.as_u16()); debug!("{}, response code: {response_code}", self.request_summary(session, ctx)); - info!("{}, response code: {response_code}", self.request_summary(session, ctx)); + // info!("{}, response code: {response_code}", self.request_summary(session, ctx)); } } diff --git a/src/web/start.rs b/src/web/start.rs index 30c9fdf..0bac466 100644 --- a/src/web/start.rs +++ b/src/web/start.rs @@ -79,6 +79,8 @@ pub fn run() { server.add_service(bg_srvc); // info!("Starting Gazan server on {}, port : {} !", args.address, args.port); - + // let mut prometheus_service_http = Service::prometheus_http_service(); + // prometheus_service_http.add_tcp("0.0.0.0:1234"); + // server.add_service(prometheus_service_http); server.run_forever(); } diff --git a/src/web/webserver.rs b/src/web/webserver.rs index 01c9d76..754bfd7 100644 --- a/src/web/webserver.rs +++ b/src/web/webserver.rs @@ -25,7 +25,7 @@ pub async fn run_server(bindaddress: String, mut toreturn: Sender<(UpstreamsDash match serverlist { Some(serverlist) => { - let _ = tr.send(serverlist).await.unwrap(); + let _ = tr.send((serverlist.0, serverlist.1)).await.unwrap(); Response::builder().status(StatusCode::CREATED).body(Body::from("Config, conf file, updated!\n")).unwrap() } None => Response::builder()