From 79d952d30e796dbf099a22f4e3e6b090828b61e0 Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Sat, 8 Feb 2025 19:35:44 +0100 Subject: [PATCH] Something is ok --- src/main.rs | 1 + src/utils.rs | 1 + src/utils/compare.rs | 22 ++++++++++ src/web.rs | 3 -- src/web/proxyhttp.rs | 100 +++++++++++++++++++++++-------------------- src/web/start.rs | 31 ++++++-------- 6 files changed, 91 insertions(+), 67 deletions(-) create mode 100644 src/utils.rs create mode 100644 src/utils/compare.rs diff --git a/src/main.rs b/src/main.rs index 811df84..4869eca 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +mod utils; mod web; fn main() { diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..f5d770c --- /dev/null +++ b/src/utils.rs @@ -0,0 +1 @@ +pub mod compare; diff --git a/src/utils/compare.rs b/src/utils/compare.rs new file mode 100644 index 0000000..79c150d --- /dev/null +++ b/src/utils/compare.rs @@ -0,0 +1,22 @@ +use dashmap::DashMap; +use std::sync::atomic::AtomicUsize; + +pub fn dashmaps(map1: &DashMap, AtomicUsize)>, map2: &DashMap, AtomicUsize)>) -> bool { + if map1.len() != map2.len() { + return false; // Different number of keys + } + for entry1 in map1.iter() { + let key = entry1.key(); + let (vec1, _) = entry1.value(); // Extract value + + if let Some(entry2) = map2.get(key) { + let (vec2, _) = entry2.value(); // Correctly extract value + if vec1 != vec2 { + return false; + } + } else { + return false; + } + } + true +} diff --git a/src/web.rs b/src/web.rs index 87da6ea..1148a78 100644 --- a/src/web.rs +++ b/src/web.rs @@ -1,5 +1,2 @@ pub mod proxyhttp; pub mod start; -// pub mod tcpcache; -// pub mod handler; -// pub mod peers; diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index 5c5640d..4cc3fda 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -1,3 +1,4 @@ +use crate::utils::compare; use async_trait::async_trait; use dashmap::DashMap; use log::{info, warn}; @@ -9,7 +10,9 @@ use pingora_http::{RequestHeader, ResponseHeader}; use pingora_proxy::{ProxyHttp, Session}; use std::any::type_name; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use std::time::Duration; +use tokio::sync::RwLock; use tokio::time::interval; #[allow(dead_code)] @@ -18,80 +21,85 @@ pub fn typeoff(_: T) { println!("{:?}", to); } -#[allow(dead_code)] pub struct LB { - pub upstreams_map: DashMap, AtomicUsize)>, + pub upstreams_map: Arc, AtomicUsize)>>>, +} +pub struct BGService { + pub upstreams_map: Arc, AtomicUsize)>>>, } -pub struct BGService; #[async_trait] impl BackgroundService for BGService { async fn start(&self, mut shutdown: ShutdownWatch) { println!("Starting example background service"); - let mut period = interval(Duration::from_secs(1)); + let mut period = interval(Duration::from_secs(10)); loop { tokio::select! { _ = shutdown.changed() => { - // shutdown break; } _ = period.tick() => { - println!("= = = = = = = = = = = = = = = = = = = = = = = = = =") + let map_write = self.upstreams_map.write().await; + let newmap = discover(); + if compare::dashmaps(&map_write, &newmap) { + println!("DashMaps are equal. Chilling out."); + } else { + println!("DashMaps are different. Syncing !!!!!"); + for (k,v) in newmap { + println!("{} -> {:?}", k, v); + map_write.insert(k,v); + } + } + drop(map_write); // Important: Release the lock } } } } } +fn discover() -> DashMap, AtomicUsize)> { + let upstreams_map: DashMap, AtomicUsize)> = DashMap::new(); + let mut toreturn = vec![]; + toreturn.push(("192.168.1.1".to_string(), 8000.to_owned())); + toreturn.push(("192.168.1.10".to_string(), 8000.to_owned())); + toreturn.push(("127.0.0.1".to_string(), 8000.to_owned())); + toreturn.push(("127.0.0.2".to_string(), 8000.to_owned())); + toreturn.push(("127.0.0.3".to_string(), 8000.to_owned())); + toreturn.push(("127.0.0.4".to_string(), 8000.to_owned())); + toreturn.push(("127.0.0.5".to_string(), 8000.to_owned())); + toreturn.push(("127.0.0.6".to_string(), 8000.to_owned())); + upstreams_map.insert("myip.netangels.net".to_string(), (toreturn, AtomicUsize::new(0))); + let mut toreturn = vec![]; + toreturn.push(("192.168.1.1".to_string(), 8000.to_owned())); + toreturn.push(("192.168.1.10".to_string(), 8000.to_owned())); + upstreams_map.insert("polo.netangels.net".to_string(), (toreturn, AtomicUsize::new(0))); + let mut toreturn = vec![]; + toreturn.push(("192.168.1.20".to_string(), 8000.to_owned())); + upstreams_map.insert("glop.netangels.net".to_string(), (toreturn, AtomicUsize::new(0))); + upstreams_map +} + #[async_trait] pub trait GetHost { async fn get_host(&self, peer: &str) -> Option<(String, u16)>; - fn set_host(&mut self, peer: &str, host: &str, port: u16); - fn discover_hosts(&mut self); } #[async_trait] impl GetHost for LB { async fn get_host(&self, peer: &str) -> Option<(String, u16)> { - let entry = self.upstreams_map.get(peer)?; - let (servers, index) = entry.value(); + let map_read = self.upstreams_map.read().await; + let x = if let Some(entry) = map_read.get(peer) { + let (servers, index) = entry.value(); // No clone here - if servers.is_empty() { - return None; - } - - let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len(); - println!("{} {:?} => len: {}, idx: {}", peer, servers[idx], servers.len(), idx); - Some(servers[idx].clone()) - } - - fn set_host(&mut self, peer: &str, host: &str, port: u16) { - let exists = self.upstreams_map.get(peer); - let mut toreturn = vec![]; - match exists { - Some(e) => { - let (ko, _) = e.value(); - let new_value = vec![(host.to_string(), port)]; - for (k, v) in ko.clone().iter() { - toreturn.push((k.to_string(), v.to_owned())); - } - toreturn.push(new_value[0].clone()); + if servers.is_empty() { + return None; } - None => { - toreturn.push((host.to_string(), port)); - } - } - - println!(" ==> Updating peer list: name => {} | value => {:?}", peer.to_string(), toreturn); - self.upstreams_map.insert(peer.to_string(), (toreturn, AtomicUsize::new(0))); - } - - fn discover_hosts(&mut self) { - self.set_host("myip.netangels.net", "192.168.1.1", 8000); - self.set_host("myip.netangels.net", "127.0.0.1", 8000); - self.set_host("myip.netangels.net", "127.0.0.2", 8000); - self.set_host("polo.netangels.net", "192.168.1.1", 8000); - self.set_host("polo.netangels.net", "192.168.1.10", 8000); - self.set_host("glop.netangels.net", "192.168.1.20", 8000); + let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len(); + println!("{} {:?} => len: {}, idx: {}", peer, servers[idx], servers.len(), idx); + Some(servers[idx].clone()) // Clone the server address + } else { + None + }; + x } } diff --git a/src/web/start.rs b/src/web/start.rs index 71d9acb..80cf0f4 100644 --- a/src/web/start.rs +++ b/src/web/start.rs @@ -1,8 +1,11 @@ -use crate::web::proxyhttp::{BGService, GetHost, LB}; +use crate::web::proxyhttp::{BGService, LB}; use dashmap::DashMap; use pingora_core::prelude::background_service; use pingora_core::server::Server; use std::sync::atomic::AtomicUsize; +use std::sync::Arc; +use tokio::sync::RwLock; + /* pub fn run1() { let mut upstreams = LoadBalancer::try_from_iter(vec!["192.168.1.10:8000", "192.168.1.1:8000", "127.0.0.1:8000"]).unwrap(); @@ -31,27 +34,19 @@ pub fn run() { let mut server = Server::new(None).unwrap(); server.bootstrap(); - // let backends = Backends::new(Box::new(SD)); - // let load_balancer = LoadBalancer::from_backends(backends); - // load_balancer.set_health_check(TcpHealthCheck::new()); - // load_balancer.health_check_frequency = Some(Duration::from_secs(1)); - // load_balancer.update_frequency = Some(Duration::from_secs(1)); - - // let background = background_service("load balancer", load_balancer); - let upstreams_map: DashMap, AtomicUsize)> = DashMap::new(); + let config = Arc::new(RwLock::new(upstreams_map)); // Wrap in Arc> - let mut ll = LB { upstreams_map }; + let lb = LB { upstreams_map: config.clone() }; // Share the Arc> + let bg_service = BGService { upstreams_map: config.clone() }; // Share the Arc> - let bg_service = background_service("bgsrvc", BGService {}); - let background = background_service("load balancer", ll.discover_hosts()); - background.task(); + let bg_srvc = background_service("bgsrvc", bg_service); + bg_srvc.task(); - let mut lb = pingora_proxy::http_proxy_service(&server.configuration, ll); - - lb.add_tcp("0.0.0.0:6193"); - server.add_service(lb); - server.add_service(bg_service); + let mut proxy = pingora_proxy::http_proxy_service(&server.configuration, lb); + proxy.add_tcp("0.0.0.0:6193"); + server.add_service(proxy); + server.add_service(bg_srvc); server.run_forever(); }