From 2545f328af02080b125c5da1f5460f91656127be Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Thu, 13 Feb 2025 16:37:42 +0100 Subject: [PATCH] Some structure --- src/utils/discovery.rs | 110 ++++++++++++++++++++++++++++++++------- src/utils/healthcheck.rs | 3 +- src/web/proxyhttp.rs | 50 ++++++++++-------- 3 files changed, 121 insertions(+), 42 deletions(-) diff --git a/src/utils/discovery.rs b/src/utils/discovery.rs index aef8cce..9e98e9d 100644 --- a/src/utils/discovery.rs +++ b/src/utils/discovery.rs @@ -1,24 +1,96 @@ use dashmap::DashMap; +use futures::channel::mpsc::Sender; +use futures::SinkExt; +use std::fs; use std::sync::atomic::AtomicUsize; +use std::time::Duration; + +// pub fn discover() -> DashMap, AtomicUsize)> { +// read_upstreams_from_file() +// } + +pub async fn dsc(mut tx: Sender, AtomicUsize)>>) { + loop { + let snd = read_upstreams_from_file(); + let _ = tx.send(snd).await.unwrap(); + tokio::time::sleep(Duration::from_secs(2)).await; + } +} + +fn read_upstreams_from_file() -> DashMap, AtomicUsize)> { + let upstreams = DashMap::new(); + + // Read file contents + let contents = match fs::read_to_string("etc/upstreams.txt") { + Ok(data) => data, + Err(e) => { + eprintln!("Error reading file: {:?}", e); + return upstreams; + } + }; + + // Process each non-empty line + for line in contents.lines().filter(|line| !line.trim().is_empty()) { + let mut parts = line.split_whitespace(); + + let Some(hostname) = parts.next() else { + continue; + }; + let Some(address) = parts.next() else { + continue; + }; + + let mut addr_parts = address.split(':'); + let Some(ip) = addr_parts.next() else { + continue; + }; + let Some(port_str) = addr_parts.next() else { + continue; + }; + + let Ok(port) = port_str.parse::() else { + continue; + }; + // println!("Hostname {}, Address: {}, Port: {}", hostname, ip, port); + // Insert into DashMap using `entry()` for efficiency + upstreams + .entry(hostname.to_string()) // Step 1: Find or create entry + .or_insert_with(|| (Vec::new(), AtomicUsize::new(0))) // Step 2: Insert if missing + .0 // Step 3: Access the Vec<(String, u16)> + .push((ip.to_string(), port)); // Step 4: Append new data + } -pub fn discover() -> DashMap, AtomicUsize)> { - let upstreams: DashMap, AtomicUsize)> = DashMap::new(); - let mut toreturn = vec![]; - toreturn.push(("192.168.1.1".to_string(), 8000.to_owned())); - toreturn.push(("192.168.1.10".to_string(), 8000.to_owned())); - toreturn.push(("127.0.0.1".to_string(), 8000.to_owned())); - toreturn.push(("127.0.0.2".to_string(), 8000.to_owned())); - toreturn.push(("127.0.0.3".to_string(), 8000.to_owned())); - toreturn.push(("127.0.0.4".to_string(), 8000.to_owned())); - toreturn.push(("127.0.0.5".to_string(), 8000.to_owned())); - toreturn.push(("127.0.0.6".to_string(), 8000.to_owned())); - upstreams.insert("myip.netangels.net".to_string(), (toreturn, AtomicUsize::new(0))); - let mut toreturn = vec![]; - toreturn.push(("192.168.1.1".to_string(), 8000.to_owned())); - toreturn.push(("192.168.1.10".to_string(), 8000.to_owned())); - upstreams.insert("polo.netangels.net".to_string(), (toreturn, AtomicUsize::new(0))); - let mut toreturn = vec![]; - toreturn.push(("192.168.1.20".to_string(), 8000.to_owned())); - upstreams.insert("glop.netangels.net".to_string(), (toreturn, AtomicUsize::new(0))); upstreams } + +/* +fn read_upstreams_from_file1() -> DashMap, AtomicUsize)> { + let contents = std::fs::read_to_string("etc/upstreams.txt"); + let upstreams: DashMap, AtomicUsize)> = DashMap::new(); + match contents { + Ok(contents) => { + let t = contents.lines().filter(|line| !line.trim().is_empty()).map(|x| x.to_string()).collect::>(); + for x in t { + let vc = x.split(" ").map(|x| x.to_string()).collect::>(); + let hostname = vc[0].trim().to_string(); + let contents = vc[1].clone().split(":").map(|x| x.to_string()).collect::>(); + let ip = contents[0].trim().to_string(); + let port = contents[1].trim().parse::().unwrap().to_owned(); + + if upstreams.contains_key(&hostname) { + let mut upstream = upstreams.get_mut(&hostname).unwrap(); + upstream.0.push((ip, port)); + } else { + let mut second = vec![]; + second.push((ip, port)); + upstreams.insert(hostname, (second.clone(), AtomicUsize::new(0))); + } + } + } + Err(e) => { + println!("{:?}", e) + } + }; + upstreams +} +*/ diff --git a/src/utils/healthcheck.rs b/src/utils/healthcheck.rs index 210d763..8f9a156 100644 --- a/src/utils/healthcheck.rs +++ b/src/utils/healthcheck.rs @@ -19,10 +19,9 @@ pub async fn hc(upslist: Arc, AtomicU "glop.netangels.net" => ups.remove("glop.netangels.net"), _ => ups.remove(""), }; - // println!("Iter full: {} -> {:?}", val.key(), val.value()); } - println!("UPS: {:?}", ups); + // println!("UPS: {:?}", ups); drop(ups); drop(full); } diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index 622a87f..8a007cb 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -3,6 +3,8 @@ use crate::utils::*; use async_trait::async_trait; use dashmap::DashMap; +use futures::channel::mpsc; +use futures::StreamExt; use log::{info, warn}; use pingora::prelude::*; use pingora_core::prelude::HttpPeer; @@ -12,9 +14,7 @@ use pingora_http::{RequestHeader, ResponseHeader}; use pingora_proxy::{ProxyHttp, Session}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::Duration; use tokio::sync::RwLock; -use tokio::time::interval; pub struct LB { pub upstreams: Arc, AtomicUsize)>>>, @@ -29,32 +29,40 @@ pub struct LB { impl BackgroundService for LB { async fn start(&self, mut shutdown: ShutdownWatch) { tokio::spawn(healthcheck::hc(self.upstreams.clone(), self.umap_full.clone())); - println!("Starting example background service"); - let mut period = interval(Duration::from_secs(10)); + let (tx, mut rx) = mpsc::channel::, AtomicUsize)>>(0); + let _ = tokio::spawn(async move { discovery::dsc(tx.clone()).await }); + loop { tokio::select! { _ = shutdown.changed() => { break; } - _ = period.tick() => { - let umap_work = self.upstreams.write().await; - let umap_full = self.umap_full.write().await; - let newmap = discovery::discover(); - if !compare::dashmaps(&umap_full, &newmap) { - println!("DashMaps are different. Syncing !!!!!"); - for (k,v) in newmap { - let mut o= Vec::new(); - println!("{} -> {:?}", k, v); - for k in v.0.clone() { - o.push(k); + // _ = period.tick() => { + val = rx.next() => { + match val { + Some(newmap) => { + let umap_work = self.upstreams.write().await; + let umap_full = self.umap_full.write().await; + if !compare::dashmaps(&umap_full, &newmap) { + println!("DashMaps are different. Syncing !!!!!"); + umap_work.clear(); + umap_full.clear(); + for (k,v) in newmap { + println!("Host: {}", k); + for vv in v.0.clone() { + println!(" Upstreams: {:?}", vv); + } + // println!("{} -> {:?}", k, v); + umap_work.insert(k.clone(), (v.0.clone(), AtomicUsize::new(0))); // No need for extra vec! + umap_full.insert(k, (v.0, AtomicUsize::new(0))); // Use `value.0` directly + } } - umap_work.insert(k.clone(),v); - umap_full.insert(k,(o,AtomicUsize::new(0))); + drop(umap_full); + drop(umap_work); } + None => {} } - drop(umap_full); - drop(umap_work); } } } @@ -70,7 +78,7 @@ impl GetHost for LB { async fn get_host(&self, peer: &str) -> Option<(String, u16)> { let map_read = self.upstreams.read().await; // let ful_read = self.umap_full.read().await; - println!("DN ==> {:?}", map_read); + // println!("DN ==> {:?}", map_read); // println!("FU ==> {:?}", ful_read); let x = if let Some(entry) = map_read.get(peer) { let (servers, index) = entry.value(); // No clone here @@ -79,7 +87,7 @@ impl GetHost for LB { return None; } let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len(); - // println!("{} {:?} => len: {}, idx: {}", peer, servers[idx], servers.len(), idx); + println!("{} {:?} => len: {}, idx: {}", peer, servers[idx], servers.len(), idx); Some(servers[idx].clone()) } else { None