From a824912a8465a77b377dbee3c5e6143f856cde39 Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Mon, 10 Mar 2025 18:27:52 +0100 Subject: [PATCH] Intermediate v2 --- src/utils/compare.rs | 7 +++---- src/utils/discovery.rs | 45 ++++++++++++++++++++-------------------- src/utils/healthcheck.rs | 7 ++++--- src/utils/tools.rs | 24 +++++++++++++++------ src/web/proxyhttp.rs | 23 ++++++++++---------- src/web/start.rs | 6 +++--- src/web/webserver.rs | 3 ++- 7 files changed, 64 insertions(+), 51 deletions(-) diff --git a/src/utils/compare.rs b/src/utils/compare.rs index c931905..d2abbfd 100644 --- a/src/utils/compare.rs +++ b/src/utils/compare.rs @@ -1,10 +1,9 @@ -use dashmap::DashMap; -use std::sync::atomic::AtomicUsize; +use crate::utils::tools::*; use tokio::sync::RwLockReadGuard; /* #[allow(dead_code)] -pub fn dashmaps(map1: &RwLockWriteGuard, AtomicUsize)>>, map2: &DashMap, AtomicUsize)>) -> bool { +pub fn dashmaps(map1: &RwLockWriteGuard, map2: &UpstreamMap) -> bool { if map1.len() != map2.len() { return false; } @@ -26,7 +25,7 @@ pub fn dashmaps(map1: &RwLockWriteGuard, Ato */ #[allow(dead_code)] -pub fn dm(map1: &RwLockReadGuard, AtomicUsize)>>, map2: &DashMap, AtomicUsize)>) -> bool { +pub fn dm(map1: &RwLockReadGuard, map2: &UpstreamMap) -> bool { if map1.len() != map2.len() { return false; // Different number of keys } diff --git a/src/utils/discovery.rs b/src/utils/discovery.rs index 1d07e6c..f716b2b 100644 --- a/src/utils/discovery.rs +++ b/src/utils/discovery.rs @@ -20,23 +20,23 @@ pub struct APIUpstreamProvider; #[async_trait] pub trait Discovery { - async fn run(&self, tx: Sender, AtomicUsize)>>); + async fn run(&self, tx: Sender); } #[async_trait] impl Discovery for APIUpstreamProvider { - async fn run(&self, toreturn: Sender, AtomicUsize)>>) { + async fn run(&self, toreturn: Sender) { webserver::run_server(toreturn).await; } } #[async_trait] impl Discovery for FromFileProvider { - async fn run(&self, tx: Sender, AtomicUsize)>>) { + async fn run(&self, tx: Sender) { tokio::spawn(watch_file(self.path.clone(), tx.clone())); } } -pub async fn watch_file(fp: String, mut toreturn: Sender, AtomicUsize)>>) { +pub async fn watch_file(fp: String, mut toreturn: Sender) { let file_path = fp.as_str(); let parent_dir = Path::new(file_path).parent().unwrap(); // Watch directory, not file let (local_tx, mut local_rx) = tokio::sync::mpsc::channel::>(1); @@ -77,15 +77,16 @@ pub async fn watch_file(fp: String, mut toreturn: Sender {:?}", e); - let _sd = build_upstreams2("etc/upstreams-long.conf", "filepath"); + let upstreams = build_upstreams2("etc/upstreams-long.conf", "filepath"); - println!("\n\n"); - for t in _sd.iter() { - println!("{} ==>", t.key()); - for v in t.value().iter() { - println!(" {:?}", v) - } + print_upstreams(&upstreams); + + let host_entry = upstreams.get("myip.netangels.net").unwrap(); + let path_entry = host_entry.get("/").unwrap(); + for p in path_entry.value().0.clone() { + println!(" {:?}", p); } + println!("\n\n"); let snd = build_upstreams(file_path, "filepath"); @@ -99,7 +100,7 @@ pub async fn watch_file(fp: String, mut toreturn: Sender DashMap, AtomicUsize)> { +pub fn build_upstreams(d: &str, kind: &str) -> UpstreamMap { let upstreams = DashMap::new(); let mut contents = d.to_string(); match kind { @@ -150,8 +151,8 @@ pub fn build_upstreams(d: &str, kind: &str) -> DashMap DashMap> { - let upstreams: DashMap> = DashMap::new(); +pub fn build_upstreams2(d: &str, kind: &str) -> UpstresmDashMap { + let upstreams: UpstresmDashMap = DashMap::new(); let mut contents = d.to_string(); match kind { "filepath" => { @@ -176,7 +177,7 @@ pub fn build_upstreams2(d: &str, kind: &str) -> DashMap DashMap() else { continue; }; - let d = UpstreamsStruct { - proto: proto.to_string(), - path: path.to_string(), - address: (ip.to_string(), port, ssl), - atom: AtomicUsize::new(0), - }; - upstreams.entry(hostname.to_string()).or_insert_with(|| Vec::new()).push(d); + + let entry = upstreams.entry(hostname.to_string()).or_insert_with(DashMap::new); + entry + .entry(path.to_string()) + .or_insert_with(|| (Vec::new(), AtomicUsize::new(0))) + .0 + .push((ip.to_string(), port, ssl, proto.to_string())); } upstreams diff --git a/src/utils/healthcheck.rs b/src/utils/healthcheck.rs index 52859bc..34672e3 100644 --- a/src/utils/healthcheck.rs +++ b/src/utils/healthcheck.rs @@ -1,3 +1,4 @@ +use crate::utils::tools::*; use dashmap::DashMap; use std::sync::atomic::AtomicUsize; use std::sync::Arc; @@ -5,15 +6,15 @@ use std::time::Duration; use tokio::sync::RwLock; use tokio::time::interval; -pub async fn hc(upslist: Arc, AtomicUsize)>>>, fullist: Arc, AtomicUsize)>>>) { +pub async fn hc(upslist: Arc>, fullist: Arc>) { let mut period = interval(Duration::from_secs(2)); loop { tokio::select! { _ = period.tick() => { // let before = Instant::now(); - let totest: DashMap, AtomicUsize)> = DashMap::new(); - let fclone: DashMap, AtomicUsize)> = DashMap::new(); + let totest: UpstreamMap = DashMap::new(); + let fclone: UpstreamMap = DashMap::new(); // println!("\nElapsed dash: {:.2?}", before.elapsed()); // let before = Instant::now(); { diff --git a/src/utils/tools.rs b/src/utils/tools.rs index ced09a1..d498e34 100644 --- a/src/utils/tools.rs +++ b/src/utils/tools.rs @@ -1,15 +1,27 @@ +use dashmap::DashMap; use std::any::type_name; use std::sync::atomic::AtomicUsize; -#[derive(Debug)] #[allow(dead_code)] -pub struct UpstreamsStruct { - pub proto: String, - pub path: String, - pub address: (String, u16, bool), - pub atom: AtomicUsize, +pub fn print_upstreams(upstreams: &UpstresmDashMap) { + for host_entry in upstreams.iter() { + let hostname = host_entry.key(); + println!("Hostname: {}", hostname); + + for path_entry in host_entry.value().iter() { + let path = path_entry.key(); + println!(" Path: {}", path); + + for (ip, port, ssl, proto) in path_entry.value().0.clone() { + println!(" ===> IP: {}, Port: {}, SSL: {}, Proto: {}", ip, port, ssl, proto); + } + } + } } +pub type UpstresmDashMap = DashMap, AtomicUsize)>>; +pub type UpstreamMap = DashMap, AtomicUsize)>; + #[allow(dead_code)] pub fn typeoff(_: T) { let to = type_name::(); diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index 0a4e5d8..48dd69c 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -1,7 +1,7 @@ use crate::utils::discovery::{APIUpstreamProvider, Discovery, FromFileProvider}; +use crate::utils::tools::*; use crate::utils::*; use async_trait::async_trait; -use dashmap::DashMap; use futures::channel::mpsc; use futures::StreamExt; use log::{info, warn}; @@ -14,11 +14,11 @@ use pingora_proxy::{ProxyHttp, Session}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tokio::sync::RwLock; -use tokio::time::Instant; +// use tokio::time::Instant; pub struct LB { - pub upstreams: Arc, AtomicUsize)>>>, - pub umap_full: Arc, AtomicUsize)>>>, + pub upstreams: Arc>, + pub umap_full: Arc>, } #[async_trait] @@ -26,7 +26,7 @@ impl BackgroundService for LB { async fn start(&self, mut shutdown: ShutdownWatch) { println!("Starting example background service"); - let (tx, mut rx) = mpsc::channel::, AtomicUsize)>>(0); + let (tx, mut rx) = mpsc::channel::(0); let file_load = FromFileProvider { path: "etc/upstreams.conf".to_string(), }; @@ -61,6 +61,7 @@ impl BackgroundService for LB { full.clear(); for (k,v) in newmap { println!("Host: {}", k); + // {:?}", vv); } @@ -98,7 +99,7 @@ impl GetHost for LB { return None; } let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len(); - // println!("{} {:?} => len: {}, idx: {}", peer, servers[idx], servers.len(), idx); + println!("{} {:?} => len: {}, idx: {}", peer, servers[idx], servers.len(), idx); Some(servers[idx].clone()) } else { None @@ -113,10 +114,8 @@ impl ProxyHttp for LB { type CTX = (); fn new_ctx(&self) -> Self::CTX {} async fn upstream_peer(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result> { - let before = Instant::now(); + // let before = Instant::now(); let host_name = session.req_header().headers.get("host"); - // let fyu = session.req_header().uri.path(); - // info!("{:?} ==> {} ==> {:?}", host_name, fyu, session.request_summary()); match host_name { Some(host) => { let h = host.to_str().unwrap().split(':').collect::>(); @@ -124,13 +123,13 @@ impl ProxyHttp for LB { match ddr.await { Some((host, port)) => { let peer = Box::new(HttpPeer::new((host, port), false, String::new())); - info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed()); + // info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed()); Ok(peer) } None => { warn!("Returning default list => {:?}", ("127.0.0.1", 3000)); let peer = Box::new(HttpPeer::new(("127.0.0.1", 3000), false, String::new())); - info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed()); + // info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed()); Ok(peer) } } @@ -138,7 +137,7 @@ impl ProxyHttp for LB { None => { warn!("Returning default list => {:?}", ("127.0.0.1", 3000)); let peer = Box::new(HttpPeer::new(("127.0.0.1", 3000), false, String::new())); - info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed()); + // info!("{:?}, Time => {:.2?}", session.request_summary(), before.elapsed()); Ok(peer) } } diff --git a/src/web/start.rs b/src/web/start.rs index fbc3c22..a456ca8 100644 --- a/src/web/start.rs +++ b/src/web/start.rs @@ -1,8 +1,8 @@ +use crate::utils::tools::*; use crate::web::proxyhttp::LB; use dashmap::DashMap; use pingora_core::prelude::background_service; use pingora_core::server::Server; -use std::sync::atomic::AtomicUsize; use std::sync::Arc; use tokio::sync::RwLock; @@ -12,10 +12,10 @@ pub fn run() { let mut server = Server::new(None).unwrap(); server.bootstrap(); - let upstreams_map: DashMap, AtomicUsize)> = DashMap::new(); + let upstreams_map: UpstreamMap = DashMap::new(); let config = Arc::new(RwLock::new(upstreams_map)); - let umap_full: DashMap, AtomicUsize)> = DashMap::new(); + let umap_full: UpstreamMap = DashMap::new(); let fconfig = Arc::new(RwLock::new(umap_full)); let lb = LB { diff --git a/src/web/webserver.rs b/src/web/webserver.rs index d049777..b65fc8b 100644 --- a/src/web/webserver.rs +++ b/src/web/webserver.rs @@ -1,3 +1,4 @@ +use crate::utils::tools::*; use axum::body::Body; use axum::http::{Response, StatusCode}; use axum::response::IntoResponse; @@ -17,7 +18,7 @@ struct UpstreamData { counter: usize, } -pub async fn run_server(mut toreturn: Sender, AtomicUsize)>>) { +pub async fn run_server(mut toreturn: Sender) { let mut tr = toreturn.clone(); let app = Router::new() .route("/{*wildcard}", get(getconfig))