From 3b37582f593ccce7b50004653ab58fc4f4fd401a Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Tue, 18 Feb 2025 14:54:48 +0100 Subject: [PATCH] Structure, dummy API config --- src/utils/discovery.rs | 58 +++++++++++++++++++++++++++++------------- src/web/proxyhttp.rs | 19 +++++++++++--- 2 files changed, 56 insertions(+), 21 deletions(-) diff --git a/src/utils/discovery.rs b/src/utils/discovery.rs index 7d33c52..039352b 100644 --- a/src/utils/discovery.rs +++ b/src/utils/discovery.rs @@ -9,31 +9,46 @@ use async_trait::async_trait; use notify::event::ModifyKind; use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use std::path::Path; -use tokio::sync::mpsc; use tokio::task; -pub struct DSC; +pub struct FromFileProvider { + pub path: String, +} +pub struct APIUpstreamProvider { + pub api_url: String, +} #[async_trait] pub trait Discovery { - async fn discover(&self, tx: Sender, AtomicUsize)>>); + async fn run(&self, tx: Sender, AtomicUsize)>>); } #[async_trait] -impl Discovery for DSC { - async fn discover(&self, tx: Sender, AtomicUsize)>>) { - let file_path = "etc/upstreams.conf"; - tokio::spawn(watch_file(file_path, tx)); +impl Discovery for APIUpstreamProvider { + async fn run(&self, mut toreturn: Sender, AtomicUsize)>>) { + loop { + let dm: DashMap, AtomicUsize)> = DashMap::new(); + dm.insert( + self.api_url.to_string(), + (vec![("192.168.1.1".parse().unwrap(), 8000), ("192.168.1.10".parse().unwrap(), 8000)], AtomicUsize::new(0)), + ); + println!("= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = "); + let _ = toreturn.send(dm).await.unwrap(); + println!("= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = "); + tokio::time::sleep(Duration::from_secs(20)).await; + } } } -// pub async fn dsc(tx: Sender, AtomicUsize)>>) { -// let file_path = "etc/upstreams.conf"; -// tokio::spawn(watch_file(file_path, tx)); -// } - -pub async fn watch_file(file_path: &str, mut toreturn: Sender, AtomicUsize)>>) { +#[async_trait] +impl Discovery for FromFileProvider { + async fn run(&self, tx: Sender, AtomicUsize)>>) { + tokio::spawn(watch_file(self.path.clone(), tx.clone())); + } +} +pub async fn watch_file(fp: String, mut toreturn: Sender, AtomicUsize)>>) { + let file_path = fp.as_str(); let parent_dir = Path::new(file_path).parent().unwrap(); // Watch directory, not file - let (tx, mut rx) = mpsc::channel::>(10); + let (local_tx, mut local_rx) = tokio::sync::mpsc::channel::>(1); println!("Watching for changes in {:?}", parent_dir); let paths = fs::read_dir(parent_dir).unwrap(); @@ -49,7 +64,7 @@ pub async fn watch_file(file_path: &str, mut toreturn: Sender match e.kind { EventKind::Modify(ModifyKind::Data(_)) | EventKind::Create(..) | EventKind::Remove(..) => { if e.paths[0].to_str().unwrap().ends_with("conf") { - if start.elapsed() > Duration::from_secs(10) { + // if start.elapsed() > Duration::from_secs(10) { + if start.elapsed() > Duration::from_secs(2) { start = Instant::now(); println!("Config File changed :=> {:?}", e); + let snd = read_upstreams_from_file(file_path); let _ = toreturn.send(snd).await.unwrap(); } } } - _ => (), + _ => (), //println!("* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *"), }, Err(e) => println!("Watch error: {:?}", e), } diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index af112fa..002a393 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -1,6 +1,6 @@ // use crate::utils::compare; // use crate::utils::discovery; -use crate::utils::discovery::Discovery; +use crate::utils::discovery::{APIUpstreamProvider, Discovery, FromFileProvider}; use crate::utils::*; use async_trait::async_trait; use dashmap::DashMap; @@ -31,9 +31,21 @@ impl BackgroundService for LB { async fn start(&self, mut shutdown: ShutdownWatch) { tokio::spawn(healthcheck::hc(self.upstreams.clone(), self.umap_full.clone())); println!("Starting example background service"); + // let (tra, mut rec) = broadcast::channel::, AtomicUsize)>>(16); + let (tx, mut rx) = mpsc::channel::, AtomicUsize)>>(0); - // let _ = tokio::spawn(async move { discovery::dsc(tx.clone()).await }); - let _ = tokio::spawn(async move { discovery::DSC.discover(tx.clone()).await }); + let file_load = FromFileProvider { + path: "etc/upstreams.conf".to_string(), + }; + + let api_load = APIUpstreamProvider { + api_url: "myip.netangels.net".to_string(), + }; + + let tx_file = tx.clone(); + let tx_api = tx.clone(); + let _ = tokio::spawn(async move { api_load.run(tx_api).await }); + let _ = tokio::spawn(async move { file_load.run(tx_file).await }); loop { tokio::select! { @@ -44,6 +56,7 @@ impl BackgroundService for LB { val = rx.next() => { match val { Some(newmap) => { + println!("{:?}", newmap); let umap_work = self.upstreams.write().await; let umap_full = self.umap_full.write().await; if !compare::dashmaps(&umap_full, &newmap) {