New inmplementations, big commit :

1. Nested upstreams with params
2. SSL upstream support
3. Upstreams move to yaml format
4. Command line start arguments
This commit is contained in:
Ara Sadoyan
2025-03-16 14:06:29 +01:00
parent 3901b246b3
commit 6cc72c8b48
13 changed files with 405 additions and 288 deletions

89
Cargo.lock generated
View File

@@ -186,6 +186,7 @@ dependencies = [
"async-trait", "async-trait",
"axum", "axum",
"bytes", "bytes",
"clap 4.5.32",
"dashmap", "dashmap",
"env_logger", "env_logger",
"futures", "futures",
@@ -201,6 +202,7 @@ dependencies = [
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
"serde_yaml",
"tokio", "tokio",
] ]
@@ -432,8 +434,8 @@ checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123"
dependencies = [ dependencies = [
"atty", "atty",
"bitflags 1.3.2", "bitflags 1.3.2",
"clap_derive", "clap_derive 3.2.25",
"clap_lex", "clap_lex 0.2.4",
"indexmap 1.9.3", "indexmap 1.9.3",
"once_cell", "once_cell",
"strsim 0.10.0", "strsim 0.10.0",
@@ -441,6 +443,28 @@ dependencies = [
"textwrap 0.16.1", "textwrap 0.16.1",
] ]
[[package]]
name = "clap"
version = "4.5.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6088f3ae8c3608d19260cd7445411865a485688711b78b5be70d78cd96136f83"
dependencies = [
"clap_builder",
"clap_derive 4.5.32",
]
[[package]]
name = "clap_builder"
version = "4.5.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22a7ef7f676155edfb82daa97f99441f3ebf4a58d5e32f295a56259f1b6facc8"
dependencies = [
"anstream",
"anstyle",
"clap_lex 0.7.4",
"strsim 0.11.1",
]
[[package]] [[package]]
name = "clap_derive" name = "clap_derive"
version = "3.2.25" version = "3.2.25"
@@ -454,6 +478,18 @@ dependencies = [
"syn 1.0.109", "syn 1.0.109",
] ]
[[package]]
name = "clap_derive"
version = "4.5.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09176aae279615badda0765c0c0b3f6ed53f4709118af73cf4655d85d1530cd7"
dependencies = [
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.96",
]
[[package]] [[package]]
name = "clap_lex" name = "clap_lex"
version = "0.2.4" version = "0.2.4"
@@ -463,6 +499,12 @@ dependencies = [
"os_str_bytes", "os_str_bytes",
] ]
[[package]]
name = "clap_lex"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6"
[[package]] [[package]]
name = "cmake" name = "cmake"
version = "0.1.52" version = "0.1.52"
@@ -1588,6 +1630,15 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
[[package]]
name = "openssl-src"
version = "300.4.2+3.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "168ce4e058f975fe43e89d9ccf78ca668601887ae736090aacc23ae353c298e2"
dependencies = [
"cc",
]
[[package]] [[package]]
name = "openssl-sys" name = "openssl-sys"
version = "0.9.106" version = "0.9.106"
@@ -1596,6 +1647,7 @@ checksum = "8bb61ea9811cc39e3c2069f40b8b8e2e70d8569b361f879786cc7ed48b777cdd"
dependencies = [ dependencies = [
"cc", "cc",
"libc", "libc",
"openssl-src",
"pkg-config", "pkg-config",
"vcpkg", "vcpkg",
] ]
@@ -1679,6 +1731,7 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79c9fc7098dc3e7d09d2d1647921005be9301cf68536826195dc5369e05124bd" checksum = "79c9fc7098dc3e7d09d2d1647921005be9301cf68536826195dc5369e05124bd"
dependencies = [ dependencies = [
"pingora-cache",
"pingora-core", "pingora-core",
"pingora-http", "pingora-http",
"pingora-load-balancing", "pingora-load-balancing",
@@ -1750,6 +1803,7 @@ dependencies = [
"percent-encoding", "percent-encoding",
"pingora-error", "pingora-error",
"pingora-http", "pingora-http",
"pingora-openssl",
"pingora-pool", "pingora-pool",
"pingora-runtime", "pingora-runtime",
"pingora-timeout", "pingora-timeout",
@@ -1846,6 +1900,20 @@ dependencies = [
"rand", "rand",
] ]
[[package]]
name = "pingora-openssl"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f18158b901a02289f2a2a954a531c96e4d0703c94f7c9291981c9e53fddc6c1"
dependencies = [
"foreign-types",
"libc",
"openssl",
"openssl-src",
"openssl-sys",
"tokio-openssl",
]
[[package]] [[package]]
name = "pingora-pool" name = "pingora-pool"
version = "0.4.0" version = "0.4.0"
@@ -2448,6 +2516,12 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "strsim"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]] [[package]]
name = "structopt" name = "structopt"
version = "0.3.26" version = "0.3.26"
@@ -2690,6 +2764,17 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tokio-openssl"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59df6849caa43bb7567f9a36f863c447d95a11d5903c9cc334ba32576a27eadd"
dependencies = [
"openssl",
"openssl-sys",
"tokio",
]
[[package]] [[package]]
name = "tokio-rustls" name = "tokio-rustls"
version = "0.26.1" version = "0.26.1"

View File

@@ -10,7 +10,7 @@ rand = "0.8.5"
dashmap = "6.1.0" dashmap = "6.1.0"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
#pingora = { version = "0.4", features = ["full"] } #pingora = { version = "0.4", features = ["full"] }
pingora = { version = "0.4.0", features = ["lb"] } pingora = { version = "0.4.0", features = ["lb", "openssl"] }
pingora-core = { version = "0.4.0" } pingora-core = { version = "0.4.0" }
pingora-proxy = { version = "0.4.0" } pingora-proxy = { version = "0.4.0" }
pingora-load-balancing = { version = "0.4.0" } pingora-load-balancing = { version = "0.4.0" }
@@ -24,3 +24,5 @@ axum = "0.8.1"
serde = { version = "1.0.217", features = ["derive"] } serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.137" serde_json = "1.0.137"
reqwest = { version = "0.12" } reqwest = { version = "0.12" }
serde_yaml = "0.8.26"
clap = { version = "4.5.32", features = ["derive"] }

47
etc/upstreams.yaml Normal file
View File

@@ -0,0 +1,47 @@
upstreams:
myip.netangels.net:
paths:
"/draw":
protocol: "wsoc"
ssl: false
servers:
- "192.168.1.1:8000"
"/":
protocol: "http"
ssl: false
servers:
- "127.0.0.1:8000"
- "127.0.0.3:8000"
"/ping":
protocol: "http"
ssl: false
servers:
- "127.0.0.1:8000"
- "127.0.0.4:8000"
polo.netangels.net:
paths:
"/":
protocol: "http"
ssl: false
servers:
- "192.168.1.10:8000"
"/ws":
protocol: "wsoc"
ssl: false
servers:
- "192.168.1.1:8000"
glop.netangels.net:
paths:
"/":
protocol: "http"
ssl: false
servers:
- "192.168.1.10:8000"
- "192.168.1.10:8000"
# apt.netangels.net:
# paths:
# "/":
# protocol: "http"
# ssl: true
# servers:
# - "apt.netangels.net:443"

View File

@@ -1,4 +1,4 @@
pub mod compare;
pub mod discovery; pub mod discovery;
pub mod healthcheck; pub mod healthcheck;
pub mod parceyaml;
pub mod tools; pub mod tools;

View File

@@ -1,73 +0,0 @@
use crate::utils::tools::*;
use std::collections::HashSet;
// #[allow(dead_code)]
pub fn dm(map1: &UpstreamMap, map2: &UpstreamMap) -> bool {
if map1.len() != map2.len() {
return false; // Different number of keys
}
for entry1 in map1.iter() {
let key = entry1.key();
let (vec1, _) = entry1.value(); // Extract value
if let Some(entry2) = map2.get(key) {
let (vec2, _) = entry2.value(); // Correctly extract value
if vec1 != vec2 {
return false;
}
} else {
return false;
}
}
true
}
#[allow(dead_code)]
pub fn dam(map1: &UpstresmDashMap, map2: &UpstresmDashMap) -> bool {
// Step 1: Check if both maps have the same keys
let keys1: HashSet<_> = map1.iter().map(|entry| entry.key().clone()).collect();
let keys2: HashSet<_> = map2.iter().map(|entry| entry.key().clone()).collect();
if keys1 != keys2 {
return false;
}
// Step 2: Check if the inner maps have the same keys
for entry1 in map1.iter() {
let hostname = entry1.key();
let inner_map1 = entry1.value();
let Some(inner_map2) = map2.get(hostname) else {
return false; // Key exists in map1 but not in map2
};
let inner_keys1: HashSet<_> = inner_map1.iter().map(|e| e.key().clone()).collect();
let inner_keys2: HashSet<_> = inner_map2.iter().map(|e| e.key().clone()).collect();
if inner_keys1 != inner_keys2 {
return false;
}
// Step 3: Compare values (ignore order)
for path_entry in inner_map1.iter() {
let path = path_entry.key();
let (vec1, _counter1) = path_entry.value();
let Some(entry2) = inner_map2.get(path) else {
return false; // Path exists in map1 but not in map2
};
let (vec2, _counter2) = entry2.value(); // ✅ Correctly extract values
// Compare AtomicUsize values
// if counter1.load(Ordering::Relaxed) != counter2.load(Ordering::Relaxed) {
// return false;
// }
// Convert Vec to HashSet to compare unordered values
let set1: HashSet<_> = vec1.iter().collect();
let set2: HashSet<_> = vec2.iter().collect();
if set1 != set2 {
return false;
}
}
}
true
}

View File

@@ -1,16 +1,16 @@
use dashmap::DashMap; use crate::utils::parceyaml::load_yaml_to_dashmap;
use futures::channel::mpsc::Sender;
use futures::SinkExt;
use std::fs;
use std::sync::atomic::AtomicUsize;
use std::time::{Duration, Instant};
use crate::utils::tools::*; use crate::utils::tools::*;
use crate::web::webserver; use crate::web::webserver;
use async_trait::async_trait; use async_trait::async_trait;
use dashmap::DashMap;
use futures::channel::mpsc::Sender;
use futures::SinkExt;
use notify::event::ModifyKind; use notify::event::ModifyKind;
use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::fs;
use std::path::Path; use std::path::Path;
use std::sync::atomic::AtomicUsize;
use std::time::{Duration, Instant};
use tokio::task; use tokio::task;
pub struct FromFileProvider { pub struct FromFileProvider {
@@ -20,34 +20,32 @@ pub struct APIUpstreamProvider;
#[async_trait] #[async_trait]
pub trait Discovery { pub trait Discovery {
async fn run(&self, tx: Sender<UpstreamMap>); async fn start(&self, tx: Sender<UpstreamsDashMap>);
} }
#[async_trait] #[async_trait]
impl Discovery for APIUpstreamProvider { impl Discovery for APIUpstreamProvider {
async fn run(&self, toreturn: Sender<UpstreamMap>) { async fn start(&self, toreturn: Sender<UpstreamsDashMap>) {
webserver::run_server(toreturn).await; webserver::run_server(toreturn).await;
} }
} }
#[async_trait] #[async_trait]
impl Discovery for FromFileProvider { impl Discovery for FromFileProvider {
async fn run(&self, tx: Sender<UpstreamMap>) { async fn start(&self, tx: Sender<UpstreamsDashMap>) {
tokio::spawn(watch_file(self.path.clone(), tx.clone())); tokio::spawn(watch_file(self.path.clone(), tx.clone()));
} }
} }
pub async fn watch_file(fp: String, mut toreturn: Sender<UpstreamMap>) { pub async fn watch_file(fp: String, mut toreturn: Sender<UpstreamsDashMap>) {
let file_path = fp.as_str(); let file_path = fp.as_str();
let parent_dir = Path::new(file_path).parent().unwrap(); // Watch directory, not file let parent_dir = Path::new(file_path).parent().unwrap();
let (local_tx, mut local_rx) = tokio::sync::mpsc::channel::<notify::Result<Event>>(1); let (local_tx, mut local_rx) = tokio::sync::mpsc::channel::<notify::Result<Event>>(1);
println!("Watching for changes in {:?}", parent_dir); println!("Watching for changes in {:?}", parent_dir);
let paths = fs::read_dir(parent_dir).unwrap(); let paths = fs::read_dir(parent_dir).unwrap();
for path in paths { for path in paths {
println!(" {}", path.unwrap().path().display()) println!(" {}", path.unwrap().path().display())
} }
let snd = load_yaml_to_dashmap(file_path, "filepath");
let snd = build_upstreams(file_path, "filepath");
let _ = toreturn.send(snd).await.unwrap(); let _ = toreturn.send(snd).await.unwrap();
let _watcher_handle = task::spawn_blocking({ let _watcher_handle = task::spawn_blocking({
@@ -71,18 +69,11 @@ pub async fn watch_file(fp: String, mut toreturn: Sender<UpstreamMap>) {
match event { match event {
Ok(e) => match e.kind { Ok(e) => match e.kind {
EventKind::Modify(ModifyKind::Data(_)) | EventKind::Create(..) | EventKind::Remove(..) => { EventKind::Modify(ModifyKind::Data(_)) | EventKind::Create(..) | EventKind::Remove(..) => {
if e.paths[0].to_str().unwrap().ends_with("conf") { if e.paths[0].to_str().unwrap().ends_with("yaml") {
// if start.elapsed() > Duration::from_secs(10) {
if start.elapsed() > Duration::from_secs(2) { if start.elapsed() > Duration::from_secs(2) {
start = Instant::now(); start = Instant::now();
println!("Config File changed :=> {:?}", e); println!("Config File changed :=> {:?}", e);
let snd = load_yaml_to_dashmap(file_path, "filepath");
let upstreams = build_upstreams2("etc/upstreams-long.conf", "filepath");
print_upstreams(&upstreams);
println!("\n\n");
let snd = build_upstreams(file_path, "filepath");
let _ = toreturn.send(snd).await.unwrap(); let _ = toreturn.send(snd).await.unwrap();
} }
} }
@@ -93,59 +84,9 @@ pub async fn watch_file(fp: String, mut toreturn: Sender<UpstreamMap>) {
} }
} }
} }
pub fn build_upstreams(d: &str, kind: &str) -> UpstreamMap { #[allow(dead_code)]
let upstreams = DashMap::new(); pub fn build_upstreams(d: &str, kind: &str) -> UpstreamsDashMap {
let mut contents = d.to_string(); let upstreams: UpstreamsDashMap = DashMap::new();
match kind {
"filepath" => {
println!("Reading upstreams from {}", d);
let _ = match fs::read_to_string(d) {
Ok(data) => contents = data,
Err(e) => {
eprintln!("Error reading file: {:?}", e);
return upstreams;
}
};
}
"content" => {
println!("Reading upstreams from API post body");
}
_ => println!("*******************> nothing <*******************"),
}
for line in contents.lines().filter(|line| !line.trim().is_empty()) {
let mut parts = line.split_whitespace();
let Some(hostname) = parts.next() else {
continue;
};
let Some(address) = parts.next() else {
continue;
};
let mut addr_parts = address.split(':');
let Some(ip) = addr_parts.next() else {
continue;
};
let Some(port_str) = addr_parts.next() else {
continue;
};
let Ok(port) = port_str.parse::<u16>() else {
continue;
};
upstreams
.entry(hostname.to_string()) // Step 1: Find or create entry
.or_insert_with(|| (Vec::new(), AtomicUsize::new(0))) // Step 2: Insert if missing
.0 // Step 3: Access the Vec<(String, u16)>
.push((ip.to_string(), port)); // Step 4: Append new data
}
upstreams
}
pub fn build_upstreams2(d: &str, kind: &str) -> UpstresmDashMap {
let upstreams: UpstresmDashMap = DashMap::new();
let mut contents = d.to_string(); let mut contents = d.to_string();
match kind { match kind {
"filepath" => { "filepath" => {
@@ -203,7 +144,5 @@ pub fn build_upstreams2(d: &str, kind: &str) -> UpstresmDashMap {
.0 .0
.push((ip.to_string(), port, ssl, proto.to_string())); .push((ip.to_string(), port, ssl, proto.to_string()));
} }
// println!("\n\nResult ===> {} <===\n\n", dam(&hopar, &upstreams));
// println!("{:?}", hopar);
upstreams upstreams
} }

View File

@@ -5,60 +5,52 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::time::interval; use tokio::time::interval;
pub async fn hc(upslist: Arc<UpstreamMap>, fullist: Arc<UpstreamMap>) { pub async fn hc2(upslist: Arc<UpstreamsDashMap>, fullist: Arc<UpstreamsDashMap>) {
let mut period = interval(Duration::from_secs(2)); let mut period = interval(Duration::from_secs(2));
loop { loop {
tokio::select! { tokio::select! {
_ = period.tick() => { _ = period.tick() => {
// let before = Instant::now(); let totest : UpstreamsDashMap = DashMap::new();
let totest: UpstreamMap = DashMap::new(); let fclone : UpstreamsDashMap = clone_dashmap(&fullist);
let fclone: UpstreamMap = DashMap::new();
// println!("\nElapsed dash: {:.2?}", before.elapsed());
// let before = Instant::now();
{
for v in fullist.iter() {
fclone.insert(v.key().clone(), (v.value().0.clone(), AtomicUsize::new(0)));
}
} // lock releases when scope ends
// println!("Elapsed full: {:.2?}", before.elapsed());
for val in fclone.iter() { for val in fclone.iter() {
let mut newvec = vec![]; let host = val.key();
for hostport in val.value().0.clone(){ let inner = DashMap::new();
let hostpart = hostport.0.split('/').last().unwrap(); // For later use for path_entry in val.value().iter() {
let url = format!("http://{}:{}", hostpart, hostport.1); // let inner = DashMap::new();
let resp = http_request(url.as_str(), "GET", "").await; let path = path_entry.key();
match resp{ let mut innervec= Vec::new();
true => { for k in path_entry.value().0.iter().enumerate() {
newvec.push((hostpart.to_string(), hostport.1)); let (ip, port, ssl, _proto) = k.1;
}, let mut _pref = "";
false => { match ssl {
println!("Dead upstream. Host: {}, Upstream: {}:{} ",val.key(), hostpart.to_string(), hostport.1 ); true => _pref = "https://",
false => _pref = "http://",
}
let link = format!("{}{}:{}{}", _pref, ip, port, path);
let resp = http_request(link.as_str(), "HEAD", "").await;
match resp {
true => {
innervec.push(k.1.clone());
}
false => {
println!("Dead Upstream {}, Link: {}",k.0, link);
}
} }
} }
inner.insert(path.clone().to_owned(), (innervec, AtomicUsize::new(0)));
} }
totest.insert(val.key().clone(), (newvec, AtomicUsize::new(0))); totest.insert(host.clone(), inner);
} }
// let before = Instant::now(); if ! compare_dashmaps(&totest, &upslist){
{ print_upstreams(&totest);
if !crate::utils::compare::dm(&upslist, &totest) { clone_dashmap_into(&totest, &upslist);
println!("Dashmaps not matched, synchronizing");
upslist.clear();
for (k, v) in totest { // loop takes the ownership
println!("Host: {}", k);
for vv in &v.0 {
println!(" :===> {:?}", vv);
}
upslist.insert(k, v);
}
}
} }
// println!("Elapsed upsl: {:.2?}", before.elapsed());
} }
} }
} }
} }
#[allow(dead_code)]
async fn http_request(url: &str, method: &str, payload: &str) -> bool { async fn http_request(url: &str, method: &str, payload: &str) -> bool {
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let to = Duration::from_secs(1); let to = Duration::from_secs(1);
@@ -83,6 +75,13 @@ async fn http_request(url: &str, method: &str, payload: &str) -> bool {
} }
} }
} }
"HEAD" => {
let response = client.head(url).timeout(to).send().await;
match response {
Ok(r) => 100 <= r.status().as_u16() && r.status().as_u16() < 500,
Err(_) => false,
}
}
_ => false, _ => false,
} }
} }

61
src/utils/parceyaml.rs Normal file
View File

@@ -0,0 +1,61 @@
use crate::utils::tools::*;
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs;
use std::sync::atomic::AtomicUsize;
#[derive(Debug, Serialize, Deserialize)]
struct Config {
upstreams: HashMap<String, HostConfig>,
}
#[derive(Debug, Serialize, Deserialize)]
struct HostConfig {
paths: HashMap<String, PathConfig>,
}
#[derive(Debug, Serialize, Deserialize)]
struct PathConfig {
protocol: String,
ssl: bool,
servers: Vec<String>,
}
pub fn load_yaml_to_dashmap(d: &str, kind: &str) -> UpstreamsDashMap {
let dashmap = UpstreamsDashMap::new();
let mut yaml_data = d.to_string();
match kind {
"filepath" => {
println!("Reading upstreams from {}", d);
let _ = match fs::read_to_string(d) {
Ok(data) => yaml_data = data,
Err(e) => {
eprintln!("Error reading file: {:?}", e);
return dashmap;
}
};
}
"content" => {
println!("Reading upstreams from API post body");
}
_ => println!("*******************> nothing <*******************"),
}
let parsed: Config = serde_yaml::from_str(&yaml_data).expect("Failed to parse YAML");
for (hostname, host_config) in parsed.upstreams {
let path_map = DashMap::new();
for (path, path_config) in host_config.paths {
let mut server_list = Vec::new();
for server in path_config.servers {
if let Some((ip, port_str)) = server.split_once(':') {
if let Ok(port) = port_str.parse::<u16>() {
server_list.push((ip.to_string(), port, path_config.ssl, path_config.protocol.clone()));
}
}
}
path_map.insert(path, (server_list, AtomicUsize::new(0)));
}
dashmap.insert(hostname, path_map);
}
dashmap
}

View File

@@ -1,9 +1,10 @@
use dashmap::DashMap; use dashmap::DashMap;
use std::any::type_name; use std::any::type_name;
use std::collections::HashSet;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
#[allow(dead_code)] #[allow(dead_code)]
pub fn print_upstreams(upstreams: &UpstresmDashMap) { pub fn print_upstreams(upstreams: &UpstreamsDashMap) {
for host_entry in upstreams.iter() { for host_entry in upstreams.iter() {
let hostname = host_entry.key(); let hostname = host_entry.key();
println!("Hostname: {}", hostname); println!("Hostname: {}", hostname);
@@ -19,8 +20,8 @@ pub fn print_upstreams(upstreams: &UpstresmDashMap) {
} }
} }
pub type UpstresmDashMap = DashMap<String, DashMap<String, (Vec<(String, u16, bool, String)>, AtomicUsize)>>; pub type UpstreamsDashMap = DashMap<String, DashMap<String, (Vec<(String, u16, bool, String)>, AtomicUsize)>>;
pub type UpstreamMap = DashMap<String, (Vec<(String, u16)>, AtomicUsize)>; // pub type UpstreamMap = DashMap<String, (Vec<(String, u16)>, AtomicUsize)>;
#[allow(dead_code)] #[allow(dead_code)]
pub fn typeoff<T>(_: T) { pub fn typeoff<T>(_: T) {
@@ -28,6 +29,7 @@ pub fn typeoff<T>(_: T) {
println!("{:?}", to); println!("{:?}", to);
} }
#[allow(dead_code)]
pub fn string_to_bool(val: Option<&str>) -> Option<bool> { pub fn string_to_bool(val: Option<&str>) -> Option<bool> {
match val { match val {
Some(v) => match v { Some(v) => match v {
@@ -38,3 +40,78 @@ pub fn string_to_bool(val: Option<&str>) -> Option<bool> {
None => Some(false), None => Some(false),
} }
} }
#[allow(dead_code)]
pub fn clone_dashmap(original: &UpstreamsDashMap) -> UpstreamsDashMap {
let new_map: UpstreamsDashMap = DashMap::new();
for outer_entry in original.iter() {
let hostname = outer_entry.key();
let inner_map = outer_entry.value();
let new_inner_map = DashMap::new();
for inner_entry in inner_map.iter() {
let path = inner_entry.key();
let (vec, _) = inner_entry.value();
let new_vec = vec.clone();
let new_counter = AtomicUsize::new(0);
new_inner_map.insert(path.clone(), (new_vec, new_counter));
}
new_map.insert(hostname.clone(), new_inner_map);
}
new_map
}
pub fn clone_dashmap_into(original: &UpstreamsDashMap, cloned: &UpstreamsDashMap) {
cloned.clear();
for outer_entry in original.iter() {
let hostname = outer_entry.key();
let inner_map = outer_entry.value();
let new_inner_map = DashMap::new();
for inner_entry in inner_map.iter() {
let path = inner_entry.key();
let (vec, _) = inner_entry.value();
let new_vec = vec.clone();
let new_counter = AtomicUsize::new(0);
new_inner_map.insert(path.clone(), (new_vec, new_counter));
}
cloned.insert(hostname.clone(), new_inner_map);
}
}
pub fn compare_dashmaps(map1: &UpstreamsDashMap, map2: &UpstreamsDashMap) -> bool {
let keys1: HashSet<_> = map1.iter().map(|entry| entry.key().clone()).collect();
let keys2: HashSet<_> = map2.iter().map(|entry| entry.key().clone()).collect();
if keys1 != keys2 {
return false;
}
for entry1 in map1.iter() {
let hostname = entry1.key();
let inner_map1 = entry1.value();
let Some(inner_map2) = map2.get(hostname) else {
return false;
};
let inner_keys1: HashSet<_> = inner_map1.iter().map(|e| e.key().clone()).collect();
let inner_keys2: HashSet<_> = inner_map2.iter().map(|e| e.key().clone()).collect();
if inner_keys1 != inner_keys2 {
return false;
}
for path_entry in inner_map1.iter() {
let path = path_entry.key();
let (vec1, _counter1) = path_entry.value();
let Some(entry2) = inner_map2.get(path) else {
return false; // Path exists in map1 but not in map2
};
let (vec2, _counter2) = entry2.value();
let set1: HashSet<_> = vec1.iter().collect();
let set2: HashSet<_> = vec2.iter().collect();
if set1 != set2 {
return false;
}
}
}
true
}

View File

@@ -11,59 +11,46 @@ use pingora_core::server::ShutdownWatch;
use pingora_core::services::background::BackgroundService; use pingora_core::services::background::BackgroundService;
use pingora_http::{RequestHeader, ResponseHeader}; use pingora_http::{RequestHeader, ResponseHeader};
use pingora_proxy::{ProxyHttp, Session}; use pingora_proxy::{ProxyHttp, Session};
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
// use tokio::time::Instant;
pub struct LB { pub struct LB {
pub upstreams: Arc<UpstreamMap>, pub ump_upst: Arc<UpstreamsDashMap>,
pub umap_full: Arc<UpstreamMap>, pub ump_full: Arc<UpstreamsDashMap>,
} }
#[async_trait] #[async_trait]
impl BackgroundService for LB { impl BackgroundService for LB {
async fn start(&self, mut shutdown: ShutdownWatch) { async fn start(&self, mut shutdown: ShutdownWatch) {
println!("Starting example background service"); println!("Starting example background service");
let (tux, mut rux) = mpsc::channel::<UpstreamsDashMap>(0);
let (tx, mut rx) = mpsc::channel::<UpstreamMap>(0); let file_load2 = FromFileProvider {
let file_load = FromFileProvider { path: "etc/upstreams.yaml".to_string(),
path: "etc/upstreams.conf".to_string(),
}; };
let api_load = APIUpstreamProvider; let api_load = APIUpstreamProvider;
let tx_file = tx.clone(); let tux_file = tux.clone();
let tx_api = tx.clone(); let tux_api = tux.clone();
let _ = tokio::spawn(async move { api_load.run(tx_api).await }); let _ = tokio::spawn(async move { file_load2.start(tux_file).await });
let _ = tokio::spawn(async move { file_load.run(tx_file).await }); let _ = tokio::spawn(async move { api_load.start(tux_api).await });
let up = self.upstreams.clone(); let uu = self.ump_upst.clone();
let fu = self.umap_full.clone(); let ff = self.ump_full.clone();
let _ = tokio::spawn(async move { healthcheck::hc(up, fu).await }); let _ = tokio::spawn(async move { healthcheck::hc2(uu, ff).await });
loop { loop {
tokio::select! { tokio::select! {
_ = shutdown.changed() => { _ = shutdown.changed() => {
break; break;
} }
val = rx.next() => { val = rux.next() => {
match val { match val {
Some(newmap) => { Some(ss) => {
match compare::dm(&self.umap_full, &newmap) { let foo = compare_dashmaps(&*self.ump_full, &ss);
false => { if !foo {
self.upstreams.clear(); clone_dashmap_into(&ss, &self.ump_full);
self.umap_full.clear(); clone_dashmap_into(&ss, &self.ump_upst);
for (k,v) in newmap { print_upstreams(&self.ump_full);
println!("Host: {}", k);
// <UpstreamMap
for vv in v.0.clone() {
println!(" ===> {:?}", vv);
}
self.upstreams.insert(k.clone(), (v.0.clone(), AtomicUsize::new(0))); // No need for extra vec!
self.umap_full.insert(k, (v.0, AtomicUsize::new(0))); // Use `value.0` directly
}
}
true => {
}
} }
} }
None => {} None => {}
@@ -76,12 +63,19 @@ impl BackgroundService for LB {
#[async_trait] #[async_trait]
pub trait GetHost { pub trait GetHost {
async fn get_host(&self, peer: &str) -> Option<(String, u16)>; async fn get_host(&self, peer: &str, path: &str, upgrade: bool) -> Option<(String, u16, bool, String)>;
} }
#[async_trait] #[async_trait]
impl GetHost for LB { impl GetHost for LB {
async fn get_host(&self, peer: &str) -> Option<(String, u16)> { async fn get_host(&self, peer: &str, path: &str, upgrade: bool) -> Option<(String, u16, bool, String)> {
let x = if let Some(entry) = self.upstreams.get(peer) { let mut _proto = "";
if upgrade {
_proto = "wsoc";
} else {
_proto = "http"
}
let host_entry = self.ump_upst.get(peer).unwrap();
let x = if let Some(entry) = host_entry.get(path) {
let (servers, index) = entry.value(); let (servers, index) = entry.value();
if servers.is_empty() { if servers.is_empty() {
return None; return None;
@@ -105,16 +99,17 @@ impl ProxyHttp for LB {
let host_name = session.req_header().headers.get("host"); let host_name = session.req_header().headers.get("host");
match host_name { match host_name {
Some(host) => { Some(host) => {
let h = host.to_str().unwrap().split(':').collect::<Vec<&str>>(); let header_host = host.to_str().unwrap().split(':').collect::<Vec<&str>>();
let ddr = self.get_host(h[0]);
let ddr = self.get_host(header_host[0], session.req_header().uri.path(), session.is_upgrade_req());
match ddr.await { match ddr.await {
Some((host, port)) => { Some((host, port, ssl, _proto)) => {
let peer = Box::new(HttpPeer::new((host, port), false, String::new())); let peer = Box::new(HttpPeer::new((host, port), ssl, String::new()));
// info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed()); // info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed());
Ok(peer) Ok(peer)
} }
None => { None => {
warn!("Returning default list => {:?}", ("127.0.0.1", 3000)); warn!("Returning default list => {:?}, {:?}", host_name, session.req_header().uri);
let peer = Box::new(HttpPeer::new(("127.0.0.1", 3000), false, String::new())); let peer = Box::new(HttpPeer::new(("127.0.0.1", 3000), false, String::new()));
// info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed()); // info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed());
Ok(peer) Ok(peer)
@@ -122,26 +117,12 @@ impl ProxyHttp for LB {
} }
} }
None => { None => {
warn!("Returning default list => {:?}", ("127.0.0.1", 3000)); warn!("Returning default list => {:?}, {:?}", host_name, session.req_header().uri);
let peer = Box::new(HttpPeer::new(("127.0.0.1", 3000), false, String::new())); let peer = Box::new(HttpPeer::new(("127.0.0.1", 3000), false, String::new()));
// info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed()); // info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed());
Ok(peer) Ok(peer)
} }
} }
/*
let ddr = self.get_host(host_name.unwrap().to_str().unwrap());
match ddr.await {
Some((host, port)) => {
let peer = Box::new(HttpPeer::new((host, port), false, String::new()));
Ok(peer)
}
None => {
println!("Returning default list => {:?}", ("127.0.0.1", 3000));
let peer = Box::new(HttpPeer::new(("127.0.0.1", 3000), false, String::new()));
Ok(peer)
}
}
*/
} }
async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> pingora_core::Result<bool> async fn request_filter(&self, _session: &mut Session, _ctx: &mut Self::CTX) -> pingora_core::Result<bool>
where where

View File

@@ -1,37 +1,51 @@
use crate::utils::tools::*; use crate::utils::tools::*;
use crate::web::proxyhttp::LB; use crate::web::proxyhttp::LB;
use clap::{arg, Parser};
use dashmap::DashMap; use dashmap::DashMap;
use log::info;
use pingora_core::prelude::background_service; use pingora_core::prelude::background_service;
use pingora_core::server::Server; use pingora_core::server::Server;
use std::sync::Arc; use std::sync::Arc;
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
#[arg(short, long)]
address: String,
#[arg(short, long)]
port: String,
}
pub fn run() { pub fn run() {
env_logger::init(); env_logger::init();
let mut server = Server::new(None).unwrap(); let mut server = Server::new(None).unwrap();
server.bootstrap(); server.bootstrap();
let upstreams_map: UpstreamMap = DashMap::new(); let uf: UpstreamsDashMap = DashMap::new();
let config = Arc::new(upstreams_map); let ff: UpstreamsDashMap = DashMap::new();
let uf_config = Arc::new(uf);
let umap_full: UpstreamMap = DashMap::new(); let ff_config = Arc::new(ff);
let fconfig = Arc::new(umap_full);
let lb = LB { let lb = LB {
upstreams: config.clone(), ump_upst: uf_config.clone(),
umap_full: fconfig.clone(), ump_full: ff_config.clone(),
}; };
let bg = LB { let bg = LB {
upstreams: config.clone(), ump_upst: uf_config.clone(),
umap_full: fconfig.clone(), ump_full: ff_config.clone(),
}; };
let bg_srvc = background_service("bgsrvc", bg); let bg_srvc = background_service("bgsrvc", bg);
let mut proxy = pingora_proxy::http_proxy_service(&server.configuration, lb); let mut proxy = pingora_proxy::http_proxy_service(&server.configuration, lb);
proxy.add_tcp("0.0.0.0:6193"); let args = Args::parse();
let addr = format!("{}:{}", args.address, args.port);
proxy.add_tcp(&addr);
server.add_service(proxy); server.add_service(proxy);
server.add_service(bg_srvc); server.add_service(bg_srvc);
info!("Starting Gazan server on {}, port : {} !", args.address, args.port);
server.run_forever(); server.run_forever();
} }

View File

@@ -3,22 +3,17 @@ use axum::body::Body;
use axum::http::{Response, StatusCode}; use axum::http::{Response, StatusCode};
use axum::response::IntoResponse; use axum::response::IntoResponse;
use axum::routing::{delete, get, head, post, put}; use axum::routing::{delete, get, head, post, put};
use axum::{Json, Router}; use axum::Router;
use dashmap::DashMap;
use futures::channel::mpsc::Sender; use futures::channel::mpsc::Sender;
use futures::SinkExt; use futures::SinkExt;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::AtomicUsize;
use tokio::net::TcpListener; use tokio::net::TcpListener;
#[derive(Debug, Serialize, Deserialize)] // struct UpstreamData {
struct UpstreamData { // servers: UpstreamsDashMap,
servers: Vec<(String, u16)>, // }
counter: usize,
}
pub async fn run_server(mut toreturn: Sender<UpstreamMap>) { #[allow(unused_mut)]
pub async fn run_server(mut toreturn: Sender<UpstreamsDashMap>) {
let mut tr = toreturn.clone(); let mut tr = toreturn.clone();
let app = Router::new() let app = Router::new()
.route("/{*wildcard}", get(getconfig)) .route("/{*wildcard}", get(getconfig))
@@ -29,28 +24,18 @@ pub async fn run_server(mut toreturn: Sender<UpstreamMap>) {
.route( .route(
"/conf", "/conf",
post(|up: String| async move { post(|up: String| async move {
let serverlist = crate::utils::discovery::build_upstreams(up.as_str(), "content"); let serverlist = crate::utils::parceyaml::load_yaml_to_dashmap(up.as_str(), "content");
let _ = tr.send(serverlist).await.unwrap(); let _ = tr.send(serverlist).await.unwrap();
Response::builder().status(StatusCode::CREATED).body(Body::from("Config, conf file, updated!\n")).unwrap() Response::builder().status(StatusCode::CREATED).body(Body::from("Config, conf file, updated!\n")).unwrap()
}) })
.with_state("state"), .with_state("state"),
)
.route(
"/json",
post(|Json(payload): Json<HashMap<String, UpstreamData>>| async move {
let upstreams = DashMap::new();
for (key, value) in payload {
upstreams.insert(key, (value.servers, AtomicUsize::new(value.counter)));
}
let _ = toreturn.send(upstreams).await.unwrap();
Response::builder().status(StatusCode::CREATED).body(Body::from("Config, json, updated!\n")).unwrap()
}),
); );
let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap(); let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
println!("Axum API server running on port 3000"); println!("Axum API server running on port 3000");
axum::serve(listener, app).await.unwrap(); axum::serve(listener, app).await.unwrap();
} }
#[allow(dead_code)]
async fn getconfig() -> impl IntoResponse { async fn getconfig() -> impl IntoResponse {
"Hello from Axum API inside Pingora!\n".to_string(); "Hello from Axum API inside Pingora!\n".to_string();
Response::builder().status(StatusCode::BAD_GATEWAY).body(Body::from("No live upstream found!\n")).unwrap() Response::builder().status(StatusCode::BAD_GATEWAY).body(Body::from("No live upstream found!\n")).unwrap()

View File

@@ -1,4 +1,4 @@
#!/bin/bash #!/bin/bash
export RUST_LOG=INFO export RUST_LOG=INFO
reflex -d none -r 'src/' -s -- sh -c 'reset && cargo run ' reflex -d none -r 'src/' -s -- sh -c 'reset && cargo run -- --address 0.0.0.0 --port 6193'