Something is ok (cleanup)

This commit is contained in:
Ara Sadoyan
2025-02-10 18:33:38 +01:00
parent 5b65a6dc4e
commit e4fb2ad74f
2 changed files with 23 additions and 37 deletions

View File

@@ -16,10 +16,12 @@ use tokio::sync::RwLock;
use tokio::time::interval; use tokio::time::interval;
pub struct LB { pub struct LB {
pub upstreams_map: Arc<RwLock<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>>, pub upstreams: Arc<RwLock<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>>,
// pub umap_full: Arc<RwLock<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>>,
} }
pub struct BGService { pub struct BGService {
pub upstreams_map: Arc<RwLock<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>>, pub upstreams: Arc<RwLock<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>>,
// pub umap_full: Arc<RwLock<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>>,
} }
#[async_trait] #[async_trait]
@@ -33,7 +35,8 @@ impl BackgroundService for BGService {
break; break;
} }
_ = period.tick() => { _ = period.tick() => {
let map_write = self.upstreams_map.write().await; let map_write = self.upstreams.write().await;
// let newups : DashMap<String, (Vec<(String, u16)>, AtomicUsize)> = DashMap::new();
let newmap = discover(); let newmap = discover();
if compare::dashmaps(&map_write, &newmap) { if compare::dashmaps(&map_write, &newmap) {
println!("DashMaps are equal. Chilling out."); println!("DashMaps are equal. Chilling out.");
@@ -44,7 +47,7 @@ impl BackgroundService for BGService {
map_write.insert(k,v); map_write.insert(k,v);
} }
} }
drop(map_write); // Important: Release the lock drop(map_write);
} }
} }
} }
@@ -52,7 +55,7 @@ impl BackgroundService for BGService {
} }
fn discover() -> DashMap<String, (Vec<(String, u16)>, AtomicUsize)> { fn discover() -> DashMap<String, (Vec<(String, u16)>, AtomicUsize)> {
let upstreams_map: DashMap<String, (Vec<(String, u16)>, AtomicUsize)> = DashMap::new(); let upstreams: DashMap<String, (Vec<(String, u16)>, AtomicUsize)> = DashMap::new();
let mut toreturn = vec![]; let mut toreturn = vec![];
toreturn.push(("192.168.1.1".to_string(), 8000.to_owned())); toreturn.push(("192.168.1.1".to_string(), 8000.to_owned()));
toreturn.push(("192.168.1.10".to_string(), 8000.to_owned())); toreturn.push(("192.168.1.10".to_string(), 8000.to_owned()));
@@ -62,15 +65,15 @@ fn discover() -> DashMap<String, (Vec<(String, u16)>, AtomicUsize)> {
toreturn.push(("127.0.0.4".to_string(), 8000.to_owned())); toreturn.push(("127.0.0.4".to_string(), 8000.to_owned()));
toreturn.push(("127.0.0.5".to_string(), 8000.to_owned())); toreturn.push(("127.0.0.5".to_string(), 8000.to_owned()));
toreturn.push(("127.0.0.6".to_string(), 8000.to_owned())); toreturn.push(("127.0.0.6".to_string(), 8000.to_owned()));
upstreams_map.insert("myip.netangels.net".to_string(), (toreturn, AtomicUsize::new(0))); upstreams.insert("myip.netangels.net".to_string(), (toreturn, AtomicUsize::new(0)));
let mut toreturn = vec![]; let mut toreturn = vec![];
toreturn.push(("192.168.1.1".to_string(), 8000.to_owned())); toreturn.push(("192.168.1.1".to_string(), 8000.to_owned()));
toreturn.push(("192.168.1.10".to_string(), 8000.to_owned())); toreturn.push(("192.168.1.10".to_string(), 8000.to_owned()));
upstreams_map.insert("polo.netangels.net".to_string(), (toreturn, AtomicUsize::new(0))); upstreams.insert("polo.netangels.net".to_string(), (toreturn, AtomicUsize::new(0)));
let mut toreturn = vec![]; let mut toreturn = vec![];
toreturn.push(("192.168.1.20".to_string(), 8000.to_owned())); toreturn.push(("192.168.1.20".to_string(), 8000.to_owned()));
upstreams_map.insert("glop.netangels.net".to_string(), (toreturn, AtomicUsize::new(0))); upstreams.insert("glop.netangels.net".to_string(), (toreturn, AtomicUsize::new(0)));
upstreams_map upstreams
} }
#[async_trait] #[async_trait]
@@ -80,7 +83,7 @@ pub trait GetHost {
#[async_trait] #[async_trait]
impl GetHost for LB { impl GetHost for LB {
async fn get_host(&self, peer: &str) -> Option<(String, u16)> { async fn get_host(&self, peer: &str) -> Option<(String, u16)> {
let map_read = self.upstreams_map.read().await; let map_read = self.upstreams.read().await;
let x = if let Some(entry) = map_read.get(peer) { let x = if let Some(entry) = map_read.get(peer) {
let (servers, index) = entry.value(); // No clone here let (servers, index) = entry.value(); // No clone here
@@ -89,7 +92,7 @@ impl GetHost for LB {
} }
let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len(); let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len();
println!("{} {:?} => len: {}, idx: {}", peer, servers[idx], servers.len(), idx); println!("{} {:?} => len: {}, idx: {}", peer, servers[idx], servers.len(), idx);
Some(servers[idx].clone()) // Clone the server address Some(servers[idx].clone())
} else { } else {
None None
}; };

View File

@@ -6,28 +6,6 @@ use std::sync::atomic::AtomicUsize;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
/*
pub fn run1() {
let mut upstreams = LoadBalancer::try_from_iter(vec!["192.168.1.10:8000", "192.168.1.1:8000", "127.0.0.1:8000"]).unwrap();
env_logger::init();
let hc = TcpHealthCheck::new();
upstreams.set_health_check(hc);
upstreams.health_check_frequency = Some(Duration::from_secs(1));
let background = background_service("health check", upstreams);
let upstreams = background.task();
let mut server = Server::new(None).unwrap();
server.bootstrap();
let mut proxy = pingora_proxy::http_proxy_service(&server.configuration, proxyhttp::LB(upstreams));
proxy.add_tcp("0.0.0.0:6193");
server.add_service(background);
server.add_service(proxy);
server.run_forever();
}
*/
pub fn run() { pub fn run() {
env_logger::init(); env_logger::init();
@@ -35,13 +13,18 @@ pub fn run() {
server.bootstrap(); server.bootstrap();
let upstreams_map: DashMap<String, (Vec<(String, u16)>, AtomicUsize)> = DashMap::new(); let upstreams_map: DashMap<String, (Vec<(String, u16)>, AtomicUsize)> = DashMap::new();
let config = Arc::new(RwLock::new(upstreams_map)); // Wrap in Arc<RwLock<...>> let config = Arc::new(RwLock::new(upstreams_map));
let lb = LB { upstreams_map: config.clone() }; // Share the Arc<RwLock<...>> let lb = LB {
let bg_service = BGService { upstreams_map: config.clone() }; // Share the Arc<RwLock<...>> upstreams: config.clone(),
// umap_full: config.clone(),
};
let bg_service = BGService {
upstreams: config.clone(),
// umap_full: config.clone(),
};
let bg_srvc = background_service("bgsrvc", bg_service); let bg_srvc = background_service("bgsrvc", bg_service);
bg_srvc.task();
let mut proxy = pingora_proxy::http_proxy_service(&server.configuration, lb); let mut proxy = pingora_proxy::http_proxy_service(&server.configuration, lb);
proxy.add_tcp("0.0.0.0:6193"); proxy.add_tcp("0.0.0.0:6193");