diff --git a/Cargo.lock b/Cargo.lock index 3cc6a94..d3e0a50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -186,6 +186,7 @@ dependencies = [ "async-trait", "axum", "bytes", + "clap 4.5.32", "dashmap", "env_logger", "futures", @@ -201,6 +202,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "serde_yaml", "tokio", ] @@ -432,8 +434,8 @@ checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123" dependencies = [ "atty", "bitflags 1.3.2", - "clap_derive", - "clap_lex", + "clap_derive 3.2.25", + "clap_lex 0.2.4", "indexmap 1.9.3", "once_cell", "strsim 0.10.0", @@ -441,6 +443,28 @@ dependencies = [ "textwrap 0.16.1", ] +[[package]] +name = "clap" +version = "4.5.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6088f3ae8c3608d19260cd7445411865a485688711b78b5be70d78cd96136f83" +dependencies = [ + "clap_builder", + "clap_derive 4.5.32", +] + +[[package]] +name = "clap_builder" +version = "4.5.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22a7ef7f676155edfb82daa97f99441f3ebf4a58d5e32f295a56259f1b6facc8" +dependencies = [ + "anstream", + "anstyle", + "clap_lex 0.7.4", + "strsim 0.11.1", +] + [[package]] name = "clap_derive" version = "3.2.25" @@ -454,6 +478,18 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "clap_derive" +version = "4.5.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09176aae279615badda0765c0c0b3f6ed53f4709118af73cf4655d85d1530cd7" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "clap_lex" version = "0.2.4" @@ -463,6 +499,12 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "clap_lex" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" + [[package]] name = "cmake" version = "0.1.52" @@ -1588,6 +1630,15 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "openssl-src" +version = "300.4.2+3.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "168ce4e058f975fe43e89d9ccf78ca668601887ae736090aacc23ae353c298e2" +dependencies = [ + "cc", +] + [[package]] name = "openssl-sys" version = "0.9.106" @@ -1596,6 +1647,7 @@ checksum = "8bb61ea9811cc39e3c2069f40b8b8e2e70d8569b361f879786cc7ed48b777cdd" dependencies = [ "cc", "libc", + "openssl-src", "pkg-config", "vcpkg", ] @@ -1679,6 +1731,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79c9fc7098dc3e7d09d2d1647921005be9301cf68536826195dc5369e05124bd" dependencies = [ + "pingora-cache", "pingora-core", "pingora-http", "pingora-load-balancing", @@ -1750,6 +1803,7 @@ dependencies = [ "percent-encoding", "pingora-error", "pingora-http", + "pingora-openssl", "pingora-pool", "pingora-runtime", "pingora-timeout", @@ -1846,6 +1900,20 @@ dependencies = [ "rand", ] +[[package]] +name = "pingora-openssl" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f18158b901a02289f2a2a954a531c96e4d0703c94f7c9291981c9e53fddc6c1" +dependencies = [ + "foreign-types", + "libc", + "openssl", + "openssl-src", + "openssl-sys", + "tokio-openssl", +] + [[package]] name = "pingora-pool" version = "0.4.0" @@ -2448,6 +2516,12 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "structopt" version = "0.3.26" @@ -2690,6 +2764,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-openssl" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59df6849caa43bb7567f9a36f863c447d95a11d5903c9cc334ba32576a27eadd" +dependencies = [ + "openssl", + "openssl-sys", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.1" diff --git a/Cargo.toml b/Cargo.toml index b00869e..44a0bf8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ rand = "0.8.5" dashmap = "6.1.0" tokio = { version = "1", features = ["full"] } #pingora = { version = "0.4", features = ["full"] } -pingora = { version = "0.4.0", features = ["lb"] } +pingora = { version = "0.4.0", features = ["lb", "openssl"] } pingora-core = { version = "0.4.0" } pingora-proxy = { version = "0.4.0" } pingora-load-balancing = { version = "0.4.0" } @@ -24,3 +24,5 @@ axum = "0.8.1" serde = { version = "1.0.217", features = ["derive"] } serde_json = "1.0.137" reqwest = { version = "0.12" } +serde_yaml = "0.8.26" +clap = { version = "4.5.32", features = ["derive"] } diff --git a/etc/upstreams.yaml b/etc/upstreams.yaml new file mode 100644 index 0000000..f13efdf --- /dev/null +++ b/etc/upstreams.yaml @@ -0,0 +1,47 @@ +upstreams: + myip.netangels.net: + paths: + "/draw": + protocol: "wsoc" + ssl: false + servers: + - "192.168.1.1:8000" + "/": + protocol: "http" + ssl: false + servers: + - "127.0.0.1:8000" + - "127.0.0.3:8000" + "/ping": + protocol: "http" + ssl: false + servers: + - "127.0.0.1:8000" + - "127.0.0.4:8000" + polo.netangels.net: + paths: + "/": + protocol: "http" + ssl: false + servers: + - "192.168.1.10:8000" + "/ws": + protocol: "wsoc" + ssl: false + servers: + - "192.168.1.1:8000" + glop.netangels.net: + paths: + "/": + protocol: "http" + ssl: false + servers: + - "192.168.1.10:8000" + - "192.168.1.10:8000" +# apt.netangels.net: +# paths: +# "/": +# protocol: "http" +# ssl: true +# servers: +# - "apt.netangels.net:443" \ No newline at end of file diff --git a/src/utils.rs b/src/utils.rs index c01a04d..a188bcf 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,4 +1,4 @@ -pub mod compare; pub mod discovery; pub mod healthcheck; +pub mod parceyaml; pub mod tools; diff --git a/src/utils/compare.rs b/src/utils/compare.rs deleted file mode 100644 index 86f6644..0000000 --- a/src/utils/compare.rs +++ /dev/null @@ -1,73 +0,0 @@ -use crate::utils::tools::*; -use std::collections::HashSet; - -// #[allow(dead_code)] -pub fn dm(map1: &UpstreamMap, map2: &UpstreamMap) -> bool { - if map1.len() != map2.len() { - return false; // Different number of keys - } - for entry1 in map1.iter() { - let key = entry1.key(); - let (vec1, _) = entry1.value(); // Extract value - - if let Some(entry2) = map2.get(key) { - let (vec2, _) = entry2.value(); // Correctly extract value - if vec1 != vec2 { - return false; - } - } else { - return false; - } - } - true -} -#[allow(dead_code)] -pub fn dam(map1: &UpstresmDashMap, map2: &UpstresmDashMap) -> bool { - // Step 1: Check if both maps have the same keys - let keys1: HashSet<_> = map1.iter().map(|entry| entry.key().clone()).collect(); - let keys2: HashSet<_> = map2.iter().map(|entry| entry.key().clone()).collect(); - if keys1 != keys2 { - return false; - } - - // Step 2: Check if the inner maps have the same keys - for entry1 in map1.iter() { - let hostname = entry1.key(); - let inner_map1 = entry1.value(); - - let Some(inner_map2) = map2.get(hostname) else { - return false; // Key exists in map1 but not in map2 - }; - - let inner_keys1: HashSet<_> = inner_map1.iter().map(|e| e.key().clone()).collect(); - let inner_keys2: HashSet<_> = inner_map2.iter().map(|e| e.key().clone()).collect(); - if inner_keys1 != inner_keys2 { - return false; - } - - // Step 3: Compare values (ignore order) - for path_entry in inner_map1.iter() { - let path = path_entry.key(); - let (vec1, _counter1) = path_entry.value(); - - let Some(entry2) = inner_map2.get(path) else { - return false; // Path exists in map1 but not in map2 - }; - let (vec2, _counter2) = entry2.value(); // ✅ Correctly extract values - - // Compare AtomicUsize values - // if counter1.load(Ordering::Relaxed) != counter2.load(Ordering::Relaxed) { - // return false; - // } - - // Convert Vec to HashSet to compare unordered values - let set1: HashSet<_> = vec1.iter().collect(); - let set2: HashSet<_> = vec2.iter().collect(); - if set1 != set2 { - return false; - } - } - } - - true -} diff --git a/src/utils/discovery.rs b/src/utils/discovery.rs index 4235e8a..0646bd2 100644 --- a/src/utils/discovery.rs +++ b/src/utils/discovery.rs @@ -1,16 +1,16 @@ -use dashmap::DashMap; -use futures::channel::mpsc::Sender; -use futures::SinkExt; -use std::fs; -use std::sync::atomic::AtomicUsize; -use std::time::{Duration, Instant}; - +use crate::utils::parceyaml::load_yaml_to_dashmap; use crate::utils::tools::*; use crate::web::webserver; use async_trait::async_trait; +use dashmap::DashMap; +use futures::channel::mpsc::Sender; +use futures::SinkExt; use notify::event::ModifyKind; use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; +use std::fs; use std::path::Path; +use std::sync::atomic::AtomicUsize; +use std::time::{Duration, Instant}; use tokio::task; pub struct FromFileProvider { @@ -20,34 +20,32 @@ pub struct APIUpstreamProvider; #[async_trait] pub trait Discovery { - async fn run(&self, tx: Sender); + async fn start(&self, tx: Sender); } #[async_trait] impl Discovery for APIUpstreamProvider { - async fn run(&self, toreturn: Sender) { + async fn start(&self, toreturn: Sender) { webserver::run_server(toreturn).await; } } #[async_trait] impl Discovery for FromFileProvider { - async fn run(&self, tx: Sender) { + async fn start(&self, tx: Sender) { tokio::spawn(watch_file(self.path.clone(), tx.clone())); } } -pub async fn watch_file(fp: String, mut toreturn: Sender) { +pub async fn watch_file(fp: String, mut toreturn: Sender) { let file_path = fp.as_str(); - let parent_dir = Path::new(file_path).parent().unwrap(); // Watch directory, not file + let parent_dir = Path::new(file_path).parent().unwrap(); let (local_tx, mut local_rx) = tokio::sync::mpsc::channel::>(1); - println!("Watching for changes in {:?}", parent_dir); let paths = fs::read_dir(parent_dir).unwrap(); for path in paths { println!(" {}", path.unwrap().path().display()) } - - let snd = build_upstreams(file_path, "filepath"); + let snd = load_yaml_to_dashmap(file_path, "filepath"); let _ = toreturn.send(snd).await.unwrap(); let _watcher_handle = task::spawn_blocking({ @@ -71,18 +69,11 @@ pub async fn watch_file(fp: String, mut toreturn: Sender) { match event { Ok(e) => match e.kind { EventKind::Modify(ModifyKind::Data(_)) | EventKind::Create(..) | EventKind::Remove(..) => { - if e.paths[0].to_str().unwrap().ends_with("conf") { - // if start.elapsed() > Duration::from_secs(10) { + if e.paths[0].to_str().unwrap().ends_with("yaml") { if start.elapsed() > Duration::from_secs(2) { start = Instant::now(); println!("Config File changed :=> {:?}", e); - - let upstreams = build_upstreams2("etc/upstreams-long.conf", "filepath"); - print_upstreams(&upstreams); - - println!("\n\n"); - - let snd = build_upstreams(file_path, "filepath"); + let snd = load_yaml_to_dashmap(file_path, "filepath"); let _ = toreturn.send(snd).await.unwrap(); } } @@ -93,59 +84,9 @@ pub async fn watch_file(fp: String, mut toreturn: Sender) { } } } -pub fn build_upstreams(d: &str, kind: &str) -> UpstreamMap { - let upstreams = DashMap::new(); - let mut contents = d.to_string(); - match kind { - "filepath" => { - println!("Reading upstreams from {}", d); - let _ = match fs::read_to_string(d) { - Ok(data) => contents = data, - Err(e) => { - eprintln!("Error reading file: {:?}", e); - return upstreams; - } - }; - } - "content" => { - println!("Reading upstreams from API post body"); - } - _ => println!("*******************> nothing <*******************"), - } - - for line in contents.lines().filter(|line| !line.trim().is_empty()) { - let mut parts = line.split_whitespace(); - - let Some(hostname) = parts.next() else { - continue; - }; - let Some(address) = parts.next() else { - continue; - }; - - let mut addr_parts = address.split(':'); - let Some(ip) = addr_parts.next() else { - continue; - }; - let Some(port_str) = addr_parts.next() else { - continue; - }; - - let Ok(port) = port_str.parse::() else { - continue; - }; - upstreams - .entry(hostname.to_string()) // Step 1: Find or create entry - .or_insert_with(|| (Vec::new(), AtomicUsize::new(0))) // Step 2: Insert if missing - .0 // Step 3: Access the Vec<(String, u16)> - .push((ip.to_string(), port)); // Step 4: Append new data - } - - upstreams -} - -pub fn build_upstreams2(d: &str, kind: &str) -> UpstresmDashMap { - let upstreams: UpstresmDashMap = DashMap::new(); +#[allow(dead_code)] +pub fn build_upstreams(d: &str, kind: &str) -> UpstreamsDashMap { + let upstreams: UpstreamsDashMap = DashMap::new(); let mut contents = d.to_string(); match kind { "filepath" => { @@ -203,7 +144,5 @@ pub fn build_upstreams2(d: &str, kind: &str) -> UpstresmDashMap { .0 .push((ip.to_string(), port, ssl, proto.to_string())); } - // println!("\n\nResult ===> {} <===\n\n", dam(&hopar, &upstreams)); - // println!("{:?}", hopar); upstreams } diff --git a/src/utils/healthcheck.rs b/src/utils/healthcheck.rs index 7ba62b6..3227f74 100644 --- a/src/utils/healthcheck.rs +++ b/src/utils/healthcheck.rs @@ -5,60 +5,52 @@ use std::sync::Arc; use std::time::Duration; use tokio::time::interval; -pub async fn hc(upslist: Arc, fullist: Arc) { +pub async fn hc2(upslist: Arc, fullist: Arc) { let mut period = interval(Duration::from_secs(2)); - loop { tokio::select! { _ = period.tick() => { - // let before = Instant::now(); - let totest: UpstreamMap = DashMap::new(); - let fclone: UpstreamMap = DashMap::new(); - // println!("\nElapsed dash: {:.2?}", before.elapsed()); - // let before = Instant::now(); - { - for v in fullist.iter() { - fclone.insert(v.key().clone(), (v.value().0.clone(), AtomicUsize::new(0))); - } - } // lock releases when scope ends - // println!("Elapsed full: {:.2?}", before.elapsed()); + let totest : UpstreamsDashMap = DashMap::new(); + let fclone : UpstreamsDashMap = clone_dashmap(&fullist); for val in fclone.iter() { - let mut newvec = vec![]; - for hostport in val.value().0.clone(){ - let hostpart = hostport.0.split('/').last().unwrap(); // For later use - let url = format!("http://{}:{}", hostpart, hostport.1); - let resp = http_request(url.as_str(), "GET", "").await; - match resp{ - true => { - newvec.push((hostpart.to_string(), hostport.1)); - }, - false => { - println!("Dead upstream. Host: {}, Upstream: {}:{} ",val.key(), hostpart.to_string(), hostport.1 ); + let host = val.key(); + let inner = DashMap::new(); + for path_entry in val.value().iter() { + // let inner = DashMap::new(); + let path = path_entry.key(); + let mut innervec= Vec::new(); + for k in path_entry.value().0.iter().enumerate() { + let (ip, port, ssl, _proto) = k.1; + let mut _pref = ""; + match ssl { + true => _pref = "https://", + false => _pref = "http://", + } + let link = format!("{}{}:{}{}", _pref, ip, port, path); + let resp = http_request(link.as_str(), "HEAD", "").await; + match resp { + true => { + innervec.push(k.1.clone()); + } + false => { + println!("Dead Upstream {}, Link: {}",k.0, link); + } } } + inner.insert(path.clone().to_owned(), (innervec, AtomicUsize::new(0))); } - totest.insert(val.key().clone(), (newvec, AtomicUsize::new(0))); + totest.insert(host.clone(), inner); } - // let before = Instant::now(); - { - if !crate::utils::compare::dm(&upslist, &totest) { - println!("Dashmaps not matched, synchronizing"); - upslist.clear(); - for (k, v) in totest { // loop takes the ownership - println!("Host: {}", k); - for vv in &v.0 { - println!(" :===> {:?}", vv); - } - upslist.insert(k, v); - } - } + if ! compare_dashmaps(&totest, &upslist){ + print_upstreams(&totest); + clone_dashmap_into(&totest, &upslist); } - // println!("Elapsed upsl: {:.2?}", before.elapsed()); } } } } +#[allow(dead_code)] async fn http_request(url: &str, method: &str, payload: &str) -> bool { let client = reqwest::Client::new(); let to = Duration::from_secs(1); @@ -83,6 +75,13 @@ async fn http_request(url: &str, method: &str, payload: &str) -> bool { } } } + "HEAD" => { + let response = client.head(url).timeout(to).send().await; + match response { + Ok(r) => 100 <= r.status().as_u16() && r.status().as_u16() < 500, + Err(_) => false, + } + } _ => false, } } diff --git a/src/utils/parceyaml.rs b/src/utils/parceyaml.rs new file mode 100644 index 0000000..f877af7 --- /dev/null +++ b/src/utils/parceyaml.rs @@ -0,0 +1,61 @@ +use crate::utils::tools::*; +use dashmap::DashMap; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::fs; +use std::sync::atomic::AtomicUsize; + +#[derive(Debug, Serialize, Deserialize)] +struct Config { + upstreams: HashMap, +} + +#[derive(Debug, Serialize, Deserialize)] +struct HostConfig { + paths: HashMap, +} + +#[derive(Debug, Serialize, Deserialize)] +struct PathConfig { + protocol: String, + ssl: bool, + servers: Vec, +} + +pub fn load_yaml_to_dashmap(d: &str, kind: &str) -> UpstreamsDashMap { + let dashmap = UpstreamsDashMap::new(); + let mut yaml_data = d.to_string(); + match kind { + "filepath" => { + println!("Reading upstreams from {}", d); + let _ = match fs::read_to_string(d) { + Ok(data) => yaml_data = data, + Err(e) => { + eprintln!("Error reading file: {:?}", e); + return dashmap; + } + }; + } + "content" => { + println!("Reading upstreams from API post body"); + } + _ => println!("*******************> nothing <*******************"), + } + let parsed: Config = serde_yaml::from_str(&yaml_data).expect("Failed to parse YAML"); + for (hostname, host_config) in parsed.upstreams { + let path_map = DashMap::new(); + for (path, path_config) in host_config.paths { + let mut server_list = Vec::new(); + for server in path_config.servers { + if let Some((ip, port_str)) = server.split_once(':') { + if let Ok(port) = port_str.parse::() { + server_list.push((ip.to_string(), port, path_config.ssl, path_config.protocol.clone())); + } + } + } + path_map.insert(path, (server_list, AtomicUsize::new(0))); + } + dashmap.insert(hostname, path_map); + } + dashmap +} diff --git a/src/utils/tools.rs b/src/utils/tools.rs index d498e34..0ba9d02 100644 --- a/src/utils/tools.rs +++ b/src/utils/tools.rs @@ -1,9 +1,10 @@ use dashmap::DashMap; use std::any::type_name; +use std::collections::HashSet; use std::sync::atomic::AtomicUsize; #[allow(dead_code)] -pub fn print_upstreams(upstreams: &UpstresmDashMap) { +pub fn print_upstreams(upstreams: &UpstreamsDashMap) { for host_entry in upstreams.iter() { let hostname = host_entry.key(); println!("Hostname: {}", hostname); @@ -19,8 +20,8 @@ pub fn print_upstreams(upstreams: &UpstresmDashMap) { } } -pub type UpstresmDashMap = DashMap, AtomicUsize)>>; -pub type UpstreamMap = DashMap, AtomicUsize)>; +pub type UpstreamsDashMap = DashMap, AtomicUsize)>>; +// pub type UpstreamMap = DashMap, AtomicUsize)>; #[allow(dead_code)] pub fn typeoff(_: T) { @@ -28,6 +29,7 @@ pub fn typeoff(_: T) { println!("{:?}", to); } +#[allow(dead_code)] pub fn string_to_bool(val: Option<&str>) -> Option { match val { Some(v) => match v { @@ -38,3 +40,78 @@ pub fn string_to_bool(val: Option<&str>) -> Option { None => Some(false), } } + +#[allow(dead_code)] +pub fn clone_dashmap(original: &UpstreamsDashMap) -> UpstreamsDashMap { + let new_map: UpstreamsDashMap = DashMap::new(); + + for outer_entry in original.iter() { + let hostname = outer_entry.key(); + let inner_map = outer_entry.value(); + + let new_inner_map = DashMap::new(); + + for inner_entry in inner_map.iter() { + let path = inner_entry.key(); + let (vec, _) = inner_entry.value(); + let new_vec = vec.clone(); + let new_counter = AtomicUsize::new(0); + new_inner_map.insert(path.clone(), (new_vec, new_counter)); + } + new_map.insert(hostname.clone(), new_inner_map); + } + new_map +} + +pub fn clone_dashmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsDashMap) { + cloned.clear(); + for outer_entry in original.iter() { + let hostname = outer_entry.key(); + let inner_map = outer_entry.value(); + + let new_inner_map = DashMap::new(); + + for inner_entry in inner_map.iter() { + let path = inner_entry.key(); + let (vec, _) = inner_entry.value(); + let new_vec = vec.clone(); + let new_counter = AtomicUsize::new(0); + new_inner_map.insert(path.clone(), (new_vec, new_counter)); + } + cloned.insert(hostname.clone(), new_inner_map); + } +} + +pub fn compare_dashmaps(map1: &UpstreamsDashMap, map2: &UpstreamsDashMap) -> bool { + let keys1: HashSet<_> = map1.iter().map(|entry| entry.key().clone()).collect(); + let keys2: HashSet<_> = map2.iter().map(|entry| entry.key().clone()).collect(); + if keys1 != keys2 { + return false; + } + for entry1 in map1.iter() { + let hostname = entry1.key(); + let inner_map1 = entry1.value(); + let Some(inner_map2) = map2.get(hostname) else { + return false; + }; + let inner_keys1: HashSet<_> = inner_map1.iter().map(|e| e.key().clone()).collect(); + let inner_keys2: HashSet<_> = inner_map2.iter().map(|e| e.key().clone()).collect(); + if inner_keys1 != inner_keys2 { + return false; + } + for path_entry in inner_map1.iter() { + let path = path_entry.key(); + let (vec1, _counter1) = path_entry.value(); + let Some(entry2) = inner_map2.get(path) else { + return false; // Path exists in map1 but not in map2 + }; + let (vec2, _counter2) = entry2.value(); + let set1: HashSet<_> = vec1.iter().collect(); + let set2: HashSet<_> = vec2.iter().collect(); + if set1 != set2 { + return false; + } + } + } + true +} diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index a03ed2f..70ba378 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -11,59 +11,46 @@ use pingora_core::server::ShutdownWatch; use pingora_core::services::background::BackgroundService; use pingora_http::{RequestHeader, ResponseHeader}; use pingora_proxy::{ProxyHttp, Session}; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::Ordering; use std::sync::Arc; -// use tokio::time::Instant; pub struct LB { - pub upstreams: Arc, - pub umap_full: Arc, + pub ump_upst: Arc, + pub ump_full: Arc, } #[async_trait] impl BackgroundService for LB { async fn start(&self, mut shutdown: ShutdownWatch) { println!("Starting example background service"); - - let (tx, mut rx) = mpsc::channel::(0); - let file_load = FromFileProvider { - path: "etc/upstreams.conf".to_string(), + let (tux, mut rux) = mpsc::channel::(0); + let file_load2 = FromFileProvider { + path: "etc/upstreams.yaml".to_string(), }; let api_load = APIUpstreamProvider; - let tx_file = tx.clone(); - let tx_api = tx.clone(); - let _ = tokio::spawn(async move { api_load.run(tx_api).await }); - let _ = tokio::spawn(async move { file_load.run(tx_file).await }); - let up = self.upstreams.clone(); - let fu = self.umap_full.clone(); - let _ = tokio::spawn(async move { healthcheck::hc(up, fu).await }); + let tux_file = tux.clone(); + let tux_api = tux.clone(); + let _ = tokio::spawn(async move { file_load2.start(tux_file).await }); + let _ = tokio::spawn(async move { api_load.start(tux_api).await }); + let uu = self.ump_upst.clone(); + let ff = self.ump_full.clone(); + let _ = tokio::spawn(async move { healthcheck::hc2(uu, ff).await }); loop { tokio::select! { _ = shutdown.changed() => { break; } - val = rx.next() => { + val = rux.next() => { match val { - Some(newmap) => { - match compare::dm(&self.umap_full, &newmap) { - false => { - self.upstreams.clear(); - self.umap_full.clear(); - for (k,v) in newmap { - println!("Host: {}", k); - // {:?}", vv); - } - self.upstreams.insert(k.clone(), (v.0.clone(), AtomicUsize::new(0))); // No need for extra vec! - self.umap_full.insert(k, (v.0, AtomicUsize::new(0))); // Use `value.0` directly - } - } - true => { - } + Some(ss) => { + let foo = compare_dashmaps(&*self.ump_full, &ss); + if !foo { + clone_dashmap_into(&ss, &self.ump_full); + clone_dashmap_into(&ss, &self.ump_upst); + print_upstreams(&self.ump_full); } } None => {} @@ -76,12 +63,19 @@ impl BackgroundService for LB { #[async_trait] pub trait GetHost { - async fn get_host(&self, peer: &str) -> Option<(String, u16)>; + async fn get_host(&self, peer: &str, path: &str, upgrade: bool) -> Option<(String, u16, bool, String)>; } #[async_trait] impl GetHost for LB { - async fn get_host(&self, peer: &str) -> Option<(String, u16)> { - let x = if let Some(entry) = self.upstreams.get(peer) { + async fn get_host(&self, peer: &str, path: &str, upgrade: bool) -> Option<(String, u16, bool, String)> { + let mut _proto = ""; + if upgrade { + _proto = "wsoc"; + } else { + _proto = "http" + } + let host_entry = self.ump_upst.get(peer).unwrap(); + let x = if let Some(entry) = host_entry.get(path) { let (servers, index) = entry.value(); if servers.is_empty() { return None; @@ -105,16 +99,17 @@ impl ProxyHttp for LB { let host_name = session.req_header().headers.get("host"); match host_name { Some(host) => { - let h = host.to_str().unwrap().split(':').collect::>(); - let ddr = self.get_host(h[0]); + let header_host = host.to_str().unwrap().split(':').collect::>(); + + let ddr = self.get_host(header_host[0], session.req_header().uri.path(), session.is_upgrade_req()); match ddr.await { - Some((host, port)) => { - let peer = Box::new(HttpPeer::new((host, port), false, String::new())); + Some((host, port, ssl, _proto)) => { + let peer = Box::new(HttpPeer::new((host, port), ssl, String::new())); // info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed()); Ok(peer) } None => { - warn!("Returning default list => {:?}", ("127.0.0.1", 3000)); + warn!("Returning default list => {:?}, {:?}", host_name, session.req_header().uri); let peer = Box::new(HttpPeer::new(("127.0.0.1", 3000), false, String::new())); // info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed()); Ok(peer) @@ -122,26 +117,12 @@ impl ProxyHttp for LB { } } None => { - warn!("Returning default list => {:?}", ("127.0.0.1", 3000)); + warn!("Returning default list => {:?}, {:?}", host_name, session.req_header().uri); let peer = Box::new(HttpPeer::new(("127.0.0.1", 3000), false, String::new())); // info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed()); Ok(peer) } } - /* - let ddr = self.get_host(host_name.unwrap().to_str().unwrap()); - match ddr.await { - Some((host, port)) => { - let peer = Box::new(HttpPeer::new((host, port), false, String::new())); - Ok(peer) - } - None => { - println!("Returning default list => {:?}", ("127.0.0.1", 3000)); - let peer = Box::new(HttpPeer::new(("127.0.0.1", 3000), false, String::new())); - Ok(peer) - } - } - */ } async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> pingora_core::Result where diff --git a/src/web/start.rs b/src/web/start.rs index aa45df0..c0fa092 100644 --- a/src/web/start.rs +++ b/src/web/start.rs @@ -1,37 +1,51 @@ use crate::utils::tools::*; use crate::web::proxyhttp::LB; +use clap::{arg, Parser}; use dashmap::DashMap; +use log::info; use pingora_core::prelude::background_service; use pingora_core::server::Server; use std::sync::Arc; +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + #[arg(short, long)] + address: String, + #[arg(short, long)] + port: String, +} + pub fn run() { env_logger::init(); let mut server = Server::new(None).unwrap(); server.bootstrap(); - let upstreams_map: UpstreamMap = DashMap::new(); - let config = Arc::new(upstreams_map); - - let umap_full: UpstreamMap = DashMap::new(); - let fconfig = Arc::new(umap_full); + let uf: UpstreamsDashMap = DashMap::new(); + let ff: UpstreamsDashMap = DashMap::new(); + let uf_config = Arc::new(uf); + let ff_config = Arc::new(ff); let lb = LB { - upstreams: config.clone(), - umap_full: fconfig.clone(), + ump_upst: uf_config.clone(), + ump_full: ff_config.clone(), }; let bg = LB { - upstreams: config.clone(), - umap_full: fconfig.clone(), + ump_upst: uf_config.clone(), + ump_full: ff_config.clone(), }; let bg_srvc = background_service("bgsrvc", bg); let mut proxy = pingora_proxy::http_proxy_service(&server.configuration, lb); - proxy.add_tcp("0.0.0.0:6193"); + let args = Args::parse(); + let addr = format!("{}:{}", args.address, args.port); + proxy.add_tcp(&addr); server.add_service(proxy); server.add_service(bg_srvc); + info!("Starting Gazan server on {}, port : {} !", args.address, args.port); + server.run_forever(); } diff --git a/src/web/webserver.rs b/src/web/webserver.rs index b65fc8b..7636872 100644 --- a/src/web/webserver.rs +++ b/src/web/webserver.rs @@ -3,22 +3,17 @@ use axum::body::Body; use axum::http::{Response, StatusCode}; use axum::response::IntoResponse; use axum::routing::{delete, get, head, post, put}; -use axum::{Json, Router}; -use dashmap::DashMap; +use axum::Router; use futures::channel::mpsc::Sender; use futures::SinkExt; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::sync::atomic::AtomicUsize; use tokio::net::TcpListener; -#[derive(Debug, Serialize, Deserialize)] -struct UpstreamData { - servers: Vec<(String, u16)>, - counter: usize, -} +// struct UpstreamData { +// servers: UpstreamsDashMap, +// } -pub async fn run_server(mut toreturn: Sender) { +#[allow(unused_mut)] +pub async fn run_server(mut toreturn: Sender) { let mut tr = toreturn.clone(); let app = Router::new() .route("/{*wildcard}", get(getconfig)) @@ -29,28 +24,18 @@ pub async fn run_server(mut toreturn: Sender) { .route( "/conf", post(|up: String| async move { - let serverlist = crate::utils::discovery::build_upstreams(up.as_str(), "content"); + let serverlist = crate::utils::parceyaml::load_yaml_to_dashmap(up.as_str(), "content"); let _ = tr.send(serverlist).await.unwrap(); Response::builder().status(StatusCode::CREATED).body(Body::from("Config, conf file, updated!\n")).unwrap() }) .with_state("state"), - ) - .route( - "/json", - post(|Json(payload): Json>| async move { - let upstreams = DashMap::new(); - for (key, value) in payload { - upstreams.insert(key, (value.servers, AtomicUsize::new(value.counter))); - } - let _ = toreturn.send(upstreams).await.unwrap(); - Response::builder().status(StatusCode::CREATED).body(Body::from("Config, json, updated!\n")).unwrap() - }), ); let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap(); println!("Axum API server running on port 3000"); axum::serve(listener, app).await.unwrap(); } +#[allow(dead_code)] async fn getconfig() -> impl IntoResponse { "Hello from Axum API inside Pingora!\n".to_string(); Response::builder().status(StatusCode::BAD_GATEWAY).body(Body::from("No live upstream found!\n")).unwrap() diff --git a/start.sh b/start.sh index 15b3939..5cfd3f4 100755 --- a/start.sh +++ b/start.sh @@ -1,4 +1,4 @@ #!/bin/bash export RUST_LOG=INFO -reflex -d none -r 'src/' -s -- sh -c 'reset && cargo run ' +reflex -d none -r 'src/' -s -- sh -c 'reset && cargo run -- --address 0.0.0.0 --port 6193'