diff --git a/Cargo.lock b/Cargo.lock index 20080b0..71052b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -194,6 +194,7 @@ dependencies = [ "hyper-util", "log", "mini-redis", + "notify", "pingora", "pingora-core", "pingora-http", @@ -546,6 +547,18 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "filetime" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35c0522e981e68cbfa8c3f978441a5f34b30b96e146b33cd3359176b50fe8586" +dependencies = [ + "cfg-if", + "libc", + "libredox", + "windows-sys 0.59.0", +] + [[package]] name = "flate2" version = "1.0.35" @@ -569,6 +582,15 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0d2fde1f7b3d48b8395d5f2de76c18a528bd6a9cdde438df747bfcba3e05d6f" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futures" version = "0.3.31" @@ -985,6 +1007,26 @@ dependencies = [ "hashbrown 0.15.2", ] +[[package]] +name = "inotify" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f37dccff2791ab604f9babef0ba14fbe0be30bd368dc541e2b08d07c8aa908f3" +dependencies = [ + "bitflags 2.8.0", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -1016,6 +1058,26 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kqueue" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -1028,6 +1090,17 @@ version = "0.2.169" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" +[[package]] +name = "libredox" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" +dependencies = [ + "bitflags 2.8.0", + "libc", + "redox_syscall", +] + [[package]] name = "libz-ng-sys" version = "1.1.21" @@ -1132,6 +1205,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ "libc", + "log", "wasi", "windows-sys 0.52.0", ] @@ -1148,6 +1222,31 @@ dependencies = [ "memoffset", ] +[[package]] +name = "notify" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fee8403b3d66ac7b26aee6e40a897d85dc5ce26f44da36b8b73e987cc52e943" +dependencies = [ + "bitflags 2.8.0", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio", + "notify-types", + "walkdir", + "windows-sys 0.59.0", +] + +[[package]] +name = "notify-types" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e0826a989adedc2a244799e823aece04662b66609d96af8dff7ac6df9a8925d" + [[package]] name = "num-traits" version = "0.2.19" @@ -1725,6 +1824,15 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -2231,6 +2339,16 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" diff --git a/Cargo.toml b/Cargo.toml index e8ae627..5438cf4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,3 +24,5 @@ async-trait = { version = "0.1.85" } env_logger = { version = "0.11.6" } log = "0.4.25" futures = "0.3.31" +notify = "8.0.0" + diff --git a/src/utils/discovery.rs b/src/utils/discovery.rs index 9e98e9d..7d33c52 100644 --- a/src/utils/discovery.rs +++ b/src/utils/discovery.rs @@ -3,25 +3,87 @@ use futures::channel::mpsc::Sender; use futures::SinkExt; use std::fs; use std::sync::atomic::AtomicUsize; -use std::time::Duration; +use std::time::{Duration, Instant}; -// pub fn discover() -> DashMap, AtomicUsize)> { -// read_upstreams_from_file() -// } +use async_trait::async_trait; +use notify::event::ModifyKind; +use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; +use std::path::Path; +use tokio::sync::mpsc; +use tokio::task; -pub async fn dsc(mut tx: Sender, AtomicUsize)>>) { - loop { - let snd = read_upstreams_from_file(); - let _ = tx.send(snd).await.unwrap(); - tokio::time::sleep(Duration::from_secs(2)).await; +pub struct DSC; +#[async_trait] +pub trait Discovery { + async fn discover(&self, tx: Sender, AtomicUsize)>>); +} + +#[async_trait] +impl Discovery for DSC { + async fn discover(&self, tx: Sender, AtomicUsize)>>) { + let file_path = "etc/upstreams.conf"; + tokio::spawn(watch_file(file_path, tx)); } } -fn read_upstreams_from_file() -> DashMap, AtomicUsize)> { - let upstreams = DashMap::new(); +// pub async fn dsc(tx: Sender, AtomicUsize)>>) { +// let file_path = "etc/upstreams.conf"; +// tokio::spawn(watch_file(file_path, tx)); +// } - // Read file contents - let contents = match fs::read_to_string("etc/upstreams.txt") { +pub async fn watch_file(file_path: &str, mut toreturn: Sender, AtomicUsize)>>) { + let parent_dir = Path::new(file_path).parent().unwrap(); // Watch directory, not file + let (tx, mut rx) = mpsc::channel::>(10); + + println!("Watching for changes in {:?}", parent_dir); + let paths = fs::read_dir(parent_dir).unwrap(); + for path in paths { + println!(" {}", path.unwrap().path().display()) + } + + let snd = read_upstreams_from_file(file_path); + let _ = toreturn.send(snd).await.unwrap(); + + let _watcher_handle = task::spawn_blocking({ + let parent_dir = parent_dir.to_path_buf(); // Move directory path into the closure + move || { + let mut watcher = RecommendedWatcher::new( + move |res| { + let _ = tx.blocking_send(res); + }, + Config::default(), + ) + .unwrap(); + watcher.watch(&parent_dir, RecursiveMode::Recursive).unwrap(); + + loop { + std::thread::sleep(Duration::from_secs(50)); + } + } + }); + let mut start = Instant::now(); + while let Some(event) = rx.recv().await { + match event { + Ok(e) => match e.kind { + EventKind::Modify(ModifyKind::Data(_)) | EventKind::Create(..) | EventKind::Remove(..) => { + if e.paths[0].to_str().unwrap().ends_with("conf") { + if start.elapsed() > Duration::from_secs(10) { + start = Instant::now(); + println!("Config File changed :=> {:?}", e); + let snd = read_upstreams_from_file(file_path); + let _ = toreturn.send(snd).await.unwrap(); + } + } + } + _ => (), + }, + Err(e) => println!("Watch error: {:?}", e), + } + } +} +fn read_upstreams_from_file(path: &str) -> DashMap, AtomicUsize)> { + let upstreams = DashMap::new(); + let contents = match fs::read_to_string(path) { Ok(data) => data, Err(e) => { eprintln!("Error reading file: {:?}", e); @@ -29,7 +91,6 @@ fn read_upstreams_from_file() -> DashMap, AtomicUsiz } }; - // Process each non-empty line for line in contents.lines().filter(|line| !line.trim().is_empty()) { let mut parts = line.split_whitespace(); @@ -51,8 +112,6 @@ fn read_upstreams_from_file() -> DashMap, AtomicUsiz let Ok(port) = port_str.parse::() else { continue; }; - // println!("Hostname {}, Address: {}, Port: {}", hostname, ip, port); - // Insert into DashMap using `entry()` for efficiency upstreams .entry(hostname.to_string()) // Step 1: Find or create entry .or_insert_with(|| (Vec::new(), AtomicUsize::new(0))) // Step 2: Insert if missing @@ -62,35 +121,3 @@ fn read_upstreams_from_file() -> DashMap, AtomicUsiz upstreams } - -/* -fn read_upstreams_from_file1() -> DashMap, AtomicUsize)> { - let contents = std::fs::read_to_string("etc/upstreams.txt"); - let upstreams: DashMap, AtomicUsize)> = DashMap::new(); - match contents { - Ok(contents) => { - let t = contents.lines().filter(|line| !line.trim().is_empty()).map(|x| x.to_string()).collect::>(); - for x in t { - let vc = x.split(" ").map(|x| x.to_string()).collect::>(); - let hostname = vc[0].trim().to_string(); - let contents = vc[1].clone().split(":").map(|x| x.to_string()).collect::>(); - let ip = contents[0].trim().to_string(); - let port = contents[1].trim().parse::().unwrap().to_owned(); - - if upstreams.contains_key(&hostname) { - let mut upstream = upstreams.get_mut(&hostname).unwrap(); - upstream.0.push((ip, port)); - } else { - let mut second = vec![]; - second.push((ip, port)); - upstreams.insert(hostname, (second.clone(), AtomicUsize::new(0))); - } - } - } - Err(e) => { - println!("{:?}", e) - } - }; - upstreams -} -*/ diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index 8a007cb..af112fa 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -1,5 +1,6 @@ // use crate::utils::compare; // use crate::utils::discovery; +use crate::utils::discovery::Discovery; use crate::utils::*; use async_trait::async_trait; use dashmap::DashMap; @@ -31,7 +32,8 @@ impl BackgroundService for LB { tokio::spawn(healthcheck::hc(self.upstreams.clone(), self.umap_full.clone())); println!("Starting example background service"); let (tx, mut rx) = mpsc::channel::, AtomicUsize)>>(0); - let _ = tokio::spawn(async move { discovery::dsc(tx.clone()).await }); + // let _ = tokio::spawn(async move { discovery::dsc(tx.clone()).await }); + let _ = tokio::spawn(async move { discovery::DSC.discover(tx.clone()).await }); loop { tokio::select! {