Some structure

This commit is contained in:
Ara Sadoyan
2025-02-12 12:17:15 +01:00
parent 4f998497bb
commit a2edbcfe73
7 changed files with 90 additions and 37 deletions

1
rustfmt.toml Normal file
View File

@@ -0,0 +1 @@
max_width = 180

View File

@@ -1,2 +1,4 @@
pub mod compare;
pub mod discovery;
pub mod healthcheck;
pub mod tools;

View File

@@ -1,8 +1,9 @@
use dashmap::DashMap;
use std::sync::atomic::AtomicUsize;
use tokio::sync::RwLockWriteGuard;
#[allow(dead_code)]
pub fn dashmaps(map1: &DashMap<String, (Vec<(String, u16)>, AtomicUsize)>, map2: &DashMap<String, (Vec<(String, u16)>, AtomicUsize)>) -> bool {
pub fn dashmaps(map1: &RwLockWriteGuard<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>, map2: &DashMap<String, (Vec<(String, u16)>, AtomicUsize)>) -> bool {
if map1.len() != map2.len() {
return false; // Different number of keys
}

24
src/utils/discovery.rs Normal file
View File

@@ -0,0 +1,24 @@
use dashmap::DashMap;
use std::sync::atomic::AtomicUsize;
pub fn discover() -> DashMap<String, (Vec<(String, u16)>, AtomicUsize)> {
let upstreams: DashMap<String, (Vec<(String, u16)>, AtomicUsize)> = DashMap::new();
let mut toreturn = vec![];
toreturn.push(("192.168.1.1".to_string(), 8000.to_owned()));
toreturn.push(("192.168.1.10".to_string(), 8000.to_owned()));
toreturn.push(("127.0.0.1".to_string(), 8000.to_owned()));
toreturn.push(("127.0.0.2".to_string(), 8000.to_owned()));
toreturn.push(("127.0.0.3".to_string(), 8000.to_owned()));
toreturn.push(("127.0.0.4".to_string(), 8000.to_owned()));
toreturn.push(("127.0.0.5".to_string(), 8000.to_owned()));
toreturn.push(("127.0.0.6".to_string(), 8000.to_owned()));
upstreams.insert("myip.netangels.net".to_string(), (toreturn, AtomicUsize::new(0)));
let mut toreturn = vec![];
toreturn.push(("192.168.1.1".to_string(), 8000.to_owned()));
toreturn.push(("192.168.1.10".to_string(), 8000.to_owned()));
upstreams.insert("polo.netangels.net".to_string(), (toreturn, AtomicUsize::new(0)));
let mut toreturn = vec![];
toreturn.push(("192.168.1.20".to_string(), 8000.to_owned()));
upstreams.insert("glop.netangels.net".to_string(), (toreturn, AtomicUsize::new(0)));
upstreams
}

31
src/utils/healthcheck.rs Normal file
View File

@@ -0,0 +1,31 @@
use dashmap::DashMap;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::interval;
pub async fn hc(upslist: Arc<RwLock<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>>, fullist: Arc<RwLock<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>>) {
let mut period = interval(Duration::from_secs(20));
loop {
tokio::select! {
_ = period.tick() => {
let ups = upslist.write().await;
let full = fullist.write().await;
for val in full.iter_mut() {
// making some dummy ligic
match val.key().to_string().as_str() {
"polo.netangels.net" => ups.remove("polo.netangels.net"),
"glop.netangels.net" => ups.remove("glop.netangels.net"),
_ => ups.remove(""),
};
// println!("Iter full: {} -> {:?}", val.key(), val.value());
}
println!("UPS: {:?}", ups);
drop(ups);
drop(full);
}
}
}
}

View File

@@ -1,5 +1,6 @@
use crate::utils::compare;
// use crate::utils::tools::*;
// use crate::utils::compare;
// use crate::utils::discovery;
use crate::utils::*;
use async_trait::async_trait;
use dashmap::DashMap;
use log::{info, warn};
@@ -17,7 +18,7 @@ use tokio::time::interval;
pub struct LB {
pub upstreams: Arc<RwLock<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>>,
// pub umap_full: Arc<RwLock<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>>,
pub umap_full: Arc<RwLock<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>>,
}
// pub struct BGService {
// pub upstreams: Arc<RwLock<DashMap<String, (Vec<(String, u16)>, AtomicUsize)>>>,
@@ -27,6 +28,8 @@ pub struct LB {
#[async_trait]
impl BackgroundService for LB {
async fn start(&self, mut shutdown: ShutdownWatch) {
tokio::spawn(healthcheck::hc(self.upstreams.clone(), self.umap_full.clone()));
println!("Starting example background service");
let mut period = interval(Duration::from_secs(10));
loop {
@@ -35,47 +38,29 @@ impl BackgroundService for LB {
break;
}
_ = period.tick() => {
let map_write = self.upstreams.write().await;
// let newups : DashMap<String, (Vec<(String, u16)>, AtomicUsize)> = DashMap::new();
let newmap = discover();
if compare::dashmaps(&map_write, &newmap) {
println!("DashMaps are equal. Chilling out.");
} else {
let umap_work = self.upstreams.write().await;
let umap_full = self.umap_full.write().await;
let newmap = discovery::discover();
if !compare::dashmaps(&umap_full, &newmap) {
println!("DashMaps are different. Syncing !!!!!");
for (k,v) in newmap {
let mut o= Vec::new();
println!("{} -> {:?}", k, v);
map_write.insert(k,v);
for k in v.0.clone() {
o.push(k);
}
umap_work.insert(k.clone(),v);
umap_full.insert(k,(o,AtomicUsize::new(0)));
}
}
drop(map_write);
drop(umap_full);
drop(umap_work);
}
}
}
}
}
fn discover() -> DashMap<String, (Vec<(String, u16)>, AtomicUsize)> {
let upstreams: DashMap<String, (Vec<(String, u16)>, AtomicUsize)> = DashMap::new();
let mut toreturn = vec![];
toreturn.push(("192.168.1.1".to_string(), 8000.to_owned()));
toreturn.push(("192.168.1.10".to_string(), 8000.to_owned()));
toreturn.push(("127.0.0.1".to_string(), 8000.to_owned()));
toreturn.push(("127.0.0.2".to_string(), 8000.to_owned()));
toreturn.push(("127.0.0.3".to_string(), 8000.to_owned()));
toreturn.push(("127.0.0.4".to_string(), 8000.to_owned()));
toreturn.push(("127.0.0.5".to_string(), 8000.to_owned()));
toreturn.push(("127.0.0.6".to_string(), 8000.to_owned()));
upstreams.insert("myip.netangels.net".to_string(), (toreturn, AtomicUsize::new(0)));
let mut toreturn = vec![];
toreturn.push(("192.168.1.1".to_string(), 8000.to_owned()));
toreturn.push(("192.168.1.10".to_string(), 8000.to_owned()));
upstreams.insert("polo.netangels.net".to_string(), (toreturn, AtomicUsize::new(0)));
let mut toreturn = vec![];
toreturn.push(("192.168.1.20".to_string(), 8000.to_owned()));
upstreams.insert("glop.netangels.net".to_string(), (toreturn, AtomicUsize::new(0)));
upstreams
}
#[async_trait]
pub trait GetHost {
async fn get_host(&self, peer: &str) -> Option<(String, u16)>;
@@ -84,6 +69,9 @@ pub trait GetHost {
impl GetHost for LB {
async fn get_host(&self, peer: &str) -> Option<(String, u16)> {
let map_read = self.upstreams.read().await;
// let ful_read = self.umap_full.read().await;
println!("DN ==> {:?}", map_read);
// println!("FU ==> {:?}", ful_read);
let x = if let Some(entry) = map_read.get(peer) {
let (servers, index) = entry.value(); // No clone here
@@ -91,11 +79,12 @@ impl GetHost for LB {
return None;
}
let idx = index.fetch_add(1, Ordering::Relaxed) % servers.len();
println!("{} {:?} => len: {}, idx: {}", peer, servers[idx], servers.len(), idx);
// println!("{} {:?} => len: {}, idx: {}", peer, servers[idx], servers.len(), idx);
Some(servers[idx].clone())
} else {
None
};
drop(map_read);
x
}
}

View File

@@ -15,11 +15,16 @@ pub fn run() {
let upstreams_map: DashMap<String, (Vec<(String, u16)>, AtomicUsize)> = DashMap::new();
let config = Arc::new(RwLock::new(upstreams_map));
let umap_full: DashMap<String, (Vec<(String, u16)>, AtomicUsize)> = DashMap::new();
let fconfig = Arc::new(RwLock::new(umap_full));
let lb = LB {
upstreams: config.clone(), // umap_full: config.clone()
upstreams: config.clone(),
umap_full: fconfig.clone(),
};
let bg = LB {
upstreams: config.clone(), // umap_full: config.clone()
upstreams: config.clone(),
umap_full: fconfig.clone(),
};
let bg_srvc = background_service("bgsrvc", bg);