From cbbf45092bd220c3b9dfe79634ab3b396597a2ac Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Thu, 27 Mar 2025 13:03:45 +0100 Subject: [PATCH] Some structuration --- src/utils.rs | 1 + src/utils/discovery.rs | 68 ++---------------------------------------- src/utils/filewatch.rs | 68 ++++++++++++++++++++++++++++++++++++++++++ src/utils/parceyaml.rs | 14 ++++++--- src/web/webserver.rs | 2 +- 5 files changed, 82 insertions(+), 71 deletions(-) create mode 100644 src/utils/filewatch.rs diff --git a/src/utils.rs b/src/utils.rs index a188bcf..db4f48b 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,4 +1,5 @@ pub mod discovery; +mod filewatch; pub mod healthcheck; pub mod parceyaml; pub mod tools; diff --git a/src/utils/discovery.rs b/src/utils/discovery.rs index 271426f..6229871 100644 --- a/src/utils/discovery.rs +++ b/src/utils/discovery.rs @@ -1,16 +1,8 @@ -use crate::utils::parceyaml::load_yaml_to_dashmap; +use crate::utils::filewatch; use crate::utils::tools::*; use crate::web::webserver; use async_trait::async_trait; use futures::channel::mpsc::Sender; -use futures::SinkExt; -use log::{error, info}; -use notify::event::ModifyKind; -use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; -use pingora::prelude::sleep; -use std::path::Path; -use std::time::{Duration, Instant}; -use tokio::task; pub struct FromFileProvider { pub path: String, @@ -34,62 +26,6 @@ impl Discovery for APIUpstreamProvider { #[async_trait] impl Discovery for FromFileProvider { async fn start(&self, tx: Sender<(UpstreamsDashMap, Headers)>) { - tokio::spawn(watch_file(self.path.clone(), tx.clone())); - } -} -pub async fn watch_file(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)>) { - sleep(Duration::from_millis(50)).await; // For having nice logs :-) - let file_path = fp.as_str(); - let parent_dir = Path::new(file_path).parent().unwrap(); - let (local_tx, mut local_rx) = tokio::sync::mpsc::channel::>(1); - info!("Watching for changes in {:?}", parent_dir); - let snd = load_yaml_to_dashmap(file_path, "filepath"); - - match snd { - Some(snd) => { - toreturn.send(snd).await.unwrap(); - } - None => {} - } - - let _watcher_handle = task::spawn_blocking({ - let parent_dir = parent_dir.to_path_buf(); // Move directory path into the closure - move || { - let mut watcher = RecommendedWatcher::new( - move |res| { - let _ = local_tx.blocking_send(res); - }, - Config::default(), - ) - .unwrap(); - watcher.watch(&parent_dir, RecursiveMode::Recursive).unwrap(); - let (_rtx, mut rrx) = tokio::sync::mpsc::channel::(1); - let _ = rrx.blocking_recv(); - } - }); - let mut start = Instant::now(); - - while let Some(event) = local_rx.recv().await { - match event { - Ok(e) => match e.kind { - EventKind::Modify(ModifyKind::Data(_)) | EventKind::Create(..) | EventKind::Remove(..) => { - if e.paths[0].to_str().unwrap().ends_with("yaml") { - if start.elapsed() > Duration::from_secs(2) { - start = Instant::now(); - info!("Config File changed :=> {:?}", e); - let snd = load_yaml_to_dashmap(file_path, "filepath"); - match snd { - Some(snd) => { - toreturn.send(snd).await.unwrap(); - } - None => {} - } - } - } - } - _ => (), - }, - Err(e) => error!("Watch error: {:?}", e), - } + tokio::spawn(filewatch::start(self.path.clone(), tx.clone())); } } diff --git a/src/utils/filewatch.rs b/src/utils/filewatch.rs new file mode 100644 index 0000000..57a26bd --- /dev/null +++ b/src/utils/filewatch.rs @@ -0,0 +1,68 @@ +use crate::utils::parceyaml::load_configuration; +use crate::utils::tools::*; +use futures::channel::mpsc::Sender; +use futures::SinkExt; +use log::{error, info}; +use notify::event::ModifyKind; +use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; +use pingora::prelude::sleep; +use std::path::Path; +use std::time::{Duration, Instant}; +use tokio::task; + +pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)>) { + sleep(Duration::from_millis(50)).await; // For having nice logs :-) + let file_path = fp.as_str(); + let parent_dir = Path::new(file_path).parent().unwrap(); + let (local_tx, mut local_rx) = tokio::sync::mpsc::channel::>(1); + info!("Watching for changes in {:?}", parent_dir); + let snd = load_configuration(file_path, "filepath"); + + match snd { + Some(snd) => { + toreturn.send(snd).await.unwrap(); + } + None => {} + } + + let _watcher_handle = task::spawn_blocking({ + let parent_dir = parent_dir.to_path_buf(); // Move directory path into the closure + move || { + let mut watcher = RecommendedWatcher::new( + move |res| { + let _ = local_tx.blocking_send(res); + }, + Config::default(), + ) + .unwrap(); + watcher.watch(&parent_dir, RecursiveMode::Recursive).unwrap(); + let (_rtx, mut rrx) = tokio::sync::mpsc::channel::(1); + let _ = rrx.blocking_recv(); + } + }); + let mut start = Instant::now(); + + while let Some(event) = local_rx.recv().await { + match event { + Ok(e) => match e.kind { + EventKind::Modify(ModifyKind::Data(_)) | EventKind::Create(..) | EventKind::Remove(..) => { + if e.paths[0].to_str().unwrap().ends_with("yaml") { + if start.elapsed() > Duration::from_secs(2) { + start = Instant::now(); + info!("Config File changed :=> {:?}", e); + let snd = load_configuration(file_path, "filepath"); + match snd { + Some(snd) => { + toreturn.send(snd).await.unwrap(); + } + None => {} + } + } + } + } + _ => (), + }, + Err(e) => error!("Watch error: {:?}", e), + } + } +} diff --git a/src/utils/parceyaml.rs b/src/utils/parceyaml.rs index 29aebf8..ed2d8ea 100644 --- a/src/utils/parceyaml.rs +++ b/src/utils/parceyaml.rs @@ -9,6 +9,7 @@ use std::sync::atomic::AtomicUsize; #[derive(Debug, Serialize, Deserialize)] struct Config { + provider: String, upstreams: Option>, globals: Option>>, } @@ -25,7 +26,7 @@ struct PathConfig { headers: Option>, } -pub fn load_yaml_to_dashmap(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Headers)> { +pub fn load_configuration(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Headers)> { let dashmap = UpstreamsDashMap::new(); let headerm = DashMap::new(); let mut yaml_data = d.to_string(); @@ -52,8 +53,14 @@ pub fn load_yaml_to_dashmap(d: &str, kind: &str) -> Option<(UpstreamsDashMap, He let p: Result = serde_yaml::from_str(&yaml_data); match p { Ok(parsed) => { - if let Some(headers) = parsed.upstreams { - for (hostname, host_config) in headers { + match parsed.provider.as_str() { + "file" => {} + "consul" => return None, + "kubernetes" => return None, + _ => warn!("Unknown provider {}", parsed.provider), + }; + if let Some(upstream) = parsed.upstreams { + for (hostname, host_config) in upstream { let path_map = DashMap::new(); let header_list = DashMap::new(); for (path, path_config) in host_config.paths { @@ -91,7 +98,6 @@ pub fn load_yaml_to_dashmap(d: &str, kind: &str) -> Option<(UpstreamsDashMap, He dashmap.insert(hostname, path_map); } } - Some((dashmap, headerm)) } Err(e) => { diff --git a/src/web/webserver.rs b/src/web/webserver.rs index 092c8cf..01c9d76 100644 --- a/src/web/webserver.rs +++ b/src/web/webserver.rs @@ -21,7 +21,7 @@ pub async fn run_server(bindaddress: String, mut toreturn: Sender<(UpstreamsDash .route( "/conf", post(|up: String| async move { - let serverlist = crate::utils::parceyaml::load_yaml_to_dashmap(up.as_str(), "content"); + let serverlist = crate::utils::parceyaml::load_configuration(up.as_str(), "content"); match serverlist { Some(serverlist) => {