From a2edbcfe73a893bfb1e8511ccd9570a1fb9c84bd Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Wed, 12 Feb 2025 12:17:15 +0100 Subject: [PATCH] Some structure --- rustfmt.toml | 1 + src/utils.rs | 2 ++ src/utils/compare.rs | 3 ++- src/utils/discovery.rs | 24 +++++++++++++++++ src/utils/healthcheck.rs | 31 ++++++++++++++++++++++ src/web/proxyhttp.rs | 57 ++++++++++++++++------------------------ src/web/start.rs | 9 +++++-- 7 files changed, 90 insertions(+), 37 deletions(-) create mode 100644 rustfmt.toml create mode 100644 src/utils/discovery.rs create mode 100644 src/utils/healthcheck.rs diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..cd016f6 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +max_width = 180 diff --git a/src/utils.rs b/src/utils.rs index d1380bf..c01a04d 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,2 +1,4 @@ pub mod compare; +pub mod discovery; +pub mod healthcheck; pub mod tools; diff --git a/src/utils/compare.rs b/src/utils/compare.rs index 93f7361..90fbbda 100644 --- a/src/utils/compare.rs +++ b/src/utils/compare.rs @@ -1,8 +1,9 @@ use dashmap::DashMap; use std::sync::atomic::AtomicUsize; +use tokio::sync::RwLockWriteGuard; #[allow(dead_code)] -pub fn dashmaps(map1: &DashMap, AtomicUsize)>, map2: &DashMap, AtomicUsize)>) -> bool { +pub fn dashmaps(map1: &RwLockWriteGuard, AtomicUsize)>>, map2: &DashMap, AtomicUsize)>) -> bool { if map1.len() != map2.len() { return false; // Different number of keys } diff --git a/src/utils/discovery.rs b/src/utils/discovery.rs new file mode 100644 index 0000000..aef8cce --- /dev/null +++ b/src/utils/discovery.rs @@ -0,0 +1,24 @@ +use dashmap::DashMap; +use std::sync::atomic::AtomicUsize; + +pub fn discover() -> DashMap, AtomicUsize)> { + let upstreams: DashMap, AtomicUsize)> = DashMap::new(); + let mut toreturn = vec![]; + toreturn.push(("192.168.1.1".to_string(), 8000.to_owned())); + toreturn.push(("192.168.1.10".to_string(), 8000.to_owned())); + toreturn.push(("127.0.0.1".to_string(), 8000.to_owned())); + toreturn.push(("127.0.0.2".to_string(), 8000.to_owned())); + toreturn.push(("127.0.0.3".to_string(), 8000.to_owned())); + toreturn.push(("127.0.0.4".to_string(), 8000.to_owned())); + toreturn.push(("127.0.0.5".to_string(), 8000.to_owned())); + toreturn.push(("127.0.0.6".to_string(), 8000.to_owned())); + upstreams.insert("myip.netangels.net".to_string(), (toreturn, AtomicUsize::new(0))); + let mut toreturn = vec![]; + toreturn.push(("192.168.1.1".to_string(), 8000.to_owned())); + toreturn.push(("192.168.1.10".to_string(), 8000.to_owned())); + upstreams.insert("polo.netangels.net".to_string(), (toreturn, AtomicUsize::new(0))); + let mut toreturn = vec![]; + toreturn.push(("192.168.1.20".to_string(), 8000.to_owned())); + upstreams.insert("glop.netangels.net".to_string(), (toreturn, AtomicUsize::new(0))); + upstreams +} diff --git a/src/utils/healthcheck.rs b/src/utils/healthcheck.rs new file mode 100644 index 0000000..210d763 --- /dev/null +++ b/src/utils/healthcheck.rs @@ -0,0 +1,31 @@ +use dashmap::DashMap; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::RwLock; +use tokio::time::interval; + +pub async fn hc(upslist: Arc, AtomicUsize)>>>, fullist: Arc, AtomicUsize)>>>) { + let mut period = interval(Duration::from_secs(20)); + loop { + tokio::select! { + _ = period.tick() => { + let ups = upslist.write().await; + let full = fullist.write().await; + for val in full.iter_mut() { + // making some dummy ligic + match val.key().to_string().as_str() { + "polo.netangels.net" => ups.remove("polo.netangels.net"), + "glop.netangels.net" => ups.remove("glop.netangels.net"), + _ => ups.remove(""), + }; + // println!("Iter full: {} -> {:?}", val.key(), val.value()); + } + + println!("UPS: {:?}", ups); + drop(ups); + drop(full); + } + } + } +} diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index 2778fb6..622a87f 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -1,5 +1,6 @@ -use crate::utils::compare; -// use crate::utils::tools::*; +// use crate::utils::compare; +// use crate::utils::discovery; +use crate::utils::*; use async_trait::async_trait; use dashmap::DashMap; use log::{info, warn}; @@ -17,7 +18,7 @@ use tokio::time::interval; pub struct LB { pub upstreams: Arc, AtomicUsize)>>>, - // pub umap_full: Arc, AtomicUsize)>>>, + pub umap_full: Arc, AtomicUsize)>>>, } // pub struct BGService { // pub upstreams: Arc, AtomicUsize)>>>, @@ -27,6 +28,8 @@ pub struct LB { #[async_trait] impl BackgroundService for LB { async fn start(&self, mut shutdown: ShutdownWatch) { + tokio::spawn(healthcheck::hc(self.upstreams.clone(), self.umap_full.clone())); + println!("Starting example background service"); let mut period = interval(Duration::from_secs(10)); loop { @@ -35,47 +38,29 @@ impl BackgroundService for LB { break; } _ = period.tick() => { - let map_write = self.upstreams.write().await; - // let newups : DashMap, AtomicUsize)> = DashMap::new(); - let newmap = discover(); - if compare::dashmaps(&map_write, &newmap) { - println!("DashMaps are equal. Chilling out."); - } else { + let umap_work = self.upstreams.write().await; + let umap_full = self.umap_full.write().await; + let newmap = discovery::discover(); + if !compare::dashmaps(&umap_full, &newmap) { println!("DashMaps are different. Syncing !!!!!"); for (k,v) in newmap { + let mut o= Vec::new(); println!("{} -> {:?}", k, v); - map_write.insert(k,v); + for k in v.0.clone() { + o.push(k); + } + umap_work.insert(k.clone(),v); + umap_full.insert(k,(o,AtomicUsize::new(0))); } } - drop(map_write); + drop(umap_full); + drop(umap_work); } } } } } -fn discover() -> DashMap, AtomicUsize)> { - let upstreams: DashMap, AtomicUsize)> = DashMap::new(); - let mut toreturn = vec![]; - toreturn.push(("192.168.1.1".to_string(), 8000.to_owned())); - toreturn.push(("192.168.1.10".to_string(), 8000.to_owned())); - toreturn.push(("127.0.0.1".to_string(), 8000.to_owned())); - toreturn.push(("127.0.0.2".to_string(), 8000.to_owned())); - toreturn.push(("127.0.0.3".to_string(), 8000.to_owned())); - toreturn.push(("127.0.0.4".to_string(), 8000.to_owned())); - toreturn.push(("127.0.0.5".to_string(), 8000.to_owned())); - toreturn.push(("127.0.0.6".to_string(), 8000.to_owned())); - upstreams.insert("myip.netangels.net".to_string(), (toreturn, AtomicUsize::new(0))); - let mut toreturn = vec![]; - toreturn.push(("192.168.1.1".to_string(), 8000.to_owned())); - toreturn.push(("192.168.1.10".to_string(), 8000.to_owned())); - upstreams.insert("polo.netangels.net".to_string(), (toreturn, AtomicUsize::new(0))); - let mut toreturn = vec![]; - toreturn.push(("192.168.1.20".to_string(), 8000.to_owned())); - upstreams.insert("glop.netangels.net".to_string(), (toreturn, AtomicUsize::new(0))); - upstreams -} - #[async_trait] pub trait GetHost { async fn get_host(&self, peer: &str) -> Option<(String, u16)>; @@ -84,6 +69,9 @@ pub trait GetHost { impl GetHost for LB { async fn get_host(&self, peer: &str) -> Option<(String, u16)> { let map_read = self.upstreams.read().await; + // let ful_read = self.umap_full.read().await; + println!("DN ==> {:?}", map_read); + // println!("FU ==> {:?}", ful_read); let x = if let Some(entry) = map_read.get(peer) { let (servers, index) = entry.value(); // No clone here @@ -91,11 +79,12 @@ impl GetHost for LB { return None; } let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len(); - println!("{} {:?} => len: {}, idx: {}", peer, servers[idx], servers.len(), idx); + // println!("{} {:?} => len: {}, idx: {}", peer, servers[idx], servers.len(), idx); Some(servers[idx].clone()) } else { None }; + drop(map_read); x } } diff --git a/src/web/start.rs b/src/web/start.rs index c56fde7..fbc3c22 100644 --- a/src/web/start.rs +++ b/src/web/start.rs @@ -15,11 +15,16 @@ pub fn run() { let upstreams_map: DashMap, AtomicUsize)> = DashMap::new(); let config = Arc::new(RwLock::new(upstreams_map)); + let umap_full: DashMap, AtomicUsize)> = DashMap::new(); + let fconfig = Arc::new(RwLock::new(umap_full)); + let lb = LB { - upstreams: config.clone(), // umap_full: config.clone() + upstreams: config.clone(), + umap_full: fconfig.clone(), }; let bg = LB { - upstreams: config.clone(), // umap_full: config.clone() + upstreams: config.clone(), + umap_full: fconfig.clone(), }; let bg_srvc = background_service("bgsrvc", bg);