From b2823508b70f4cdce9ae7dc6e2aa4f2fdd79b24c Mon Sep 17 00:00:00 2001 From: Ara Sadoyan Date: Fri, 4 Apr 2025 17:38:18 +0200 Subject: [PATCH] Added whitelisted or all services support t Consul --- etc/upstreams.yaml | 16 +++--- src/utils/consul.rs | 114 ++++++++++++++++++++++++----------------- src/utils/filewatch.rs | 4 +- src/utils/parceyaml.rs | 62 +++++++++++----------- src/web/proxyhttp.rs | 3 +- src/web/webserver.rs | 2 +- 6 files changed, 111 insertions(+), 90 deletions(-) diff --git a/etc/upstreams.yaml b/etc/upstreams.yaml index 1152f08..eca7e19 100644 --- a/etc/upstreams.yaml +++ b/etc/upstreams.yaml @@ -1,4 +1,4 @@ -provider: "consul" +provider: "file" globals: headers: - "Access-Control-Allow-Origin:*" @@ -6,13 +6,13 @@ globals: - "Access-Control-Max-Age:86400" consul: servers: - - "master1:8500" - - "192.168.22.1:8500" - - "master1.digitai.local:8500" - services: - - "_frontend-dev-frontend-srv._tcp.service.consul." - - "_gateway-test-gateway-srv._tcp.service.consul" - - "_gateway-stage-gateway-srv._tcp.service.consul" + - "http://master1:8500" + - "http://192.168.22.1:8500" + - "http://master1.digitai.local:8500" + whitelist: + - "frontend-dev-frontend-srv" + - "gateway-test-gateway-srv" + - "gateway-stage-gateway-srv" upstreams: myip.netangels.net: paths: diff --git a/src/utils/consul.rs b/src/utils/consul.rs index 90ff394..2c5312e 100644 --- a/src/utils/consul.rs +++ b/src/utils/consul.rs @@ -1,10 +1,7 @@ -use crate::utils::tools::{Headers, UpstreamsDashMap}; -use futures::channel::mpsc::Sender; -use std::collections::HashMap; -use std::time::Duration; - use crate::utils::parceyaml::load_configuration; +use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, Headers, UpstreamsDashMap}; use dashmap::DashMap; +use futures::channel::mpsc::Sender; use futures::SinkExt; use hickory_client::client::{Client, ClientHandle}; use hickory_client::proto::rr::{DNSClass, Name, RecordType}; @@ -14,8 +11,10 @@ use log::info; use pingora::prelude::sleep; use rand::Rng; use serde::Deserialize; +use std::collections::HashMap; use std::str::FromStr; use std::sync::atomic::AtomicUsize; +use std::time::Duration; #[derive(Debug, Deserialize)] struct Service { @@ -36,53 +35,77 @@ struct TaggedAddress { pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)>) { let config = load_configuration(fp.as_str(), "filepath"); let headers = DashMap::new(); - // println!("{:?}", config); + match config { Some(config) => { - let conf: Vec<&str> = config.2.split_whitespace().collect(); - let y = conf.get(0).unwrap(); - if y.to_string() != "consul" { - info!("Not running Consul discovery, requested type is: {}", config.2); + if config.typecfg.to_string() != "consul" { + info!("Not running Consul discovery, requested type is: {}", config.typecfg); return; } - info!("Consul Discovery is enabled : {}", config.2); - let end = conf.len(); - loop { - let num = rand::thread_rng().gen_range(1..end); - sleep(Duration::from_secs(5)).await; - headers.clear(); - for (k, v) in config.1.clone() { - headers.insert(k.to_string(), v); - } - let consul = "http://".to_string() + conf.get(num).unwrap(); - let upstreams = http_request(consul, "GET"); - match upstreams.await { - Some(upstreams) => { - toreturn.send((upstreams, headers.clone())).await.unwrap(); + + info!("Consul Discovery is enabled : {}", config.typecfg); + let consul = config.consul; + let prev_upstreams = UpstreamsDashMap::new(); + match consul { + Some(consul) => { + let servers = consul.servers.unwrap(); + info!("Consul Servers => {:?}", servers); + let end = servers.len(); + loop { + let num = rand::thread_rng().gen_range(1..end); + headers.clear(); + for (k, v) in config.headers.clone() { + headers.insert(k.to_string(), v); + } + let consul_data = servers.get(num).unwrap().to_string(); + let upstreams = http_request(consul_data, consul.whitelist.clone()); + match upstreams.await { + Some(upstreams) => { + if !compare_dashmaps(&upstreams, &prev_upstreams) { + clone_dashmap_into(&upstreams, &prev_upstreams); + toreturn.send((upstreams, headers.clone())).await.unwrap(); + } + } + None => {} + } + sleep(Duration::from_secs(5)).await; } - None => {} } + None => {} } } None => {} } } -async fn http_request(url: String, method: &str) -> Option { +async fn http_request(url: String, whitelist: Option>) -> Option { let client = reqwest::Client::new(); let to = Duration::from_secs(1); let upstreams = UpstreamsDashMap::new(); let excludes = vec!["consul", "nomad", "nomad-client"]; - match method { - "GET" => { - let ss = url.clone() + "/v1/catalog/service"; - let response = client.get(ss.clone() + "s").timeout(to).send().await; - match response { - Ok(r) => { - let json = r.json::>>().await; - match json { - Ok(_j) => { - for (k, _v) in _j { + let ss = url.clone() + "/v1/catalog/service"; + let response = client.get(ss.clone() + "s").timeout(to).send().await; + match response { + Ok(r) => { + let json = r.json::>>().await; + match json { + Ok(_j) => { + for (k, _v) in _j { + match whitelist.clone() { + Some(whitelist) => { + if whitelist.iter().any(|i| *i == k) { + let mut pref: String = ss.clone() + "/"; + pref.push_str(&k); + let list = get_by_http(pref).await; + match list { + Some(list) => { + upstreams.insert(k.to_string(), list); + } + None => {} + } + } + } + None => { if !excludes.iter().any(|&i| i == k) { let mut pref: String = ss.clone() + "/"; pref.push_str(&k); @@ -95,19 +118,18 @@ async fn http_request(url: String, method: &str) -> Option { } } } - // print_upstreams(&upstreams); - Some(upstreams) } - Err(_) => None, } + // print_upstreams(&upstreams); + Some(upstreams) } - Err(e) => { - println!("Error: {:?}", e); - None - } + Err(_) => None, } } - _ => None, + Err(e) => { + println!("Error: {:?}", e); + None + } } } @@ -121,8 +143,8 @@ async fn get_by_http(url: String) -> Option { let jason = r.json::>().await; match jason { - Ok(services) => { - for service in services { + Ok(whitelist) => { + for service in whitelist { let addr = service.tagged_addresses.get("lan_ipv4").unwrap().address.clone(); let prt = service.tagged_addresses.get("lan_ipv4").unwrap().port.clone(); let to_add = (addr, prt, false); diff --git a/src/utils/filewatch.rs b/src/utils/filewatch.rs index 92f4a2a..5b5c0ff 100644 --- a/src/utils/filewatch.rs +++ b/src/utils/filewatch.rs @@ -20,7 +20,7 @@ pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)> match snd { Some(snd) => { - toreturn.send((snd.0, snd.1)).await.unwrap(); + toreturn.send((snd.upstreams, snd.headers)).await.unwrap(); } None => {} } @@ -53,7 +53,7 @@ pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)> let snd = load_configuration(file_path, "filepath"); match snd { Some(snd) => { - toreturn.send((snd.0, snd.1)).await.unwrap(); + toreturn.send((snd.upstreams, snd.headers)).await.unwrap(); } None => {} } diff --git a/src/utils/parceyaml.rs b/src/utils/parceyaml.rs index e3701ca..dd8c0fe 100644 --- a/src/utils/parceyaml.rs +++ b/src/utils/parceyaml.rs @@ -8,9 +8,9 @@ use std::fs; use std::sync::atomic::AtomicUsize; #[derive(Debug, Serialize, Deserialize)] -struct Consul { - servers: Option>, - services: Option>, +pub struct Consul { + pub servers: Option>, + pub whitelist: Option>, } #[derive(Debug, Serialize, Deserialize)] struct Config { @@ -32,18 +32,27 @@ struct PathConfig { headers: Option>, } -// #[derive(Debug, Serialize, Deserialize)] -// pub struct Allconfig { -// pub upstreams: Option, -// pub headers: Option, -// pub consul: Option, -// pub typecfg: String, -// } +pub struct Configuration { + pub upstreams: UpstreamsDashMap, + pub headers: Headers, + pub consul: Option, + pub typecfg: String, +} // pub fn load_configuration(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Headers, String)> { -pub fn load_configuration(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Headers, String)> { - let upstreamsmap = UpstreamsDashMap::new(); - let headersmap = DashMap::new(); +pub fn load_configuration(d: &str, kind: &str) -> Option { + let mut toreturn: Configuration = Configuration { + upstreams: Default::default(), + headers: Default::default(), + consul: None, + typecfg: "".to_string(), + }; + toreturn.upstreams = UpstreamsDashMap::new(); + toreturn.headers = Headers::new(); + + // let upstreamsmap = UpstreamsDashMap::new(); + // let headersmap = DashMap::new(); + let mut yaml_data = d.to_string(); match kind { "filepath" => { @@ -79,11 +88,11 @@ pub fn load_configuration(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Head } } global_headers.insert("/".to_string(), hl); - headersmap.insert("GLOBAL_HEADERS".to_string(), global_headers); + toreturn.headers.insert("GLOBAL_HEADERS".to_string(), global_headers); } - match parsed.provider.as_str() { "file" => { + toreturn.typecfg = "file".to_string(); if let Some(upstream) = parsed.upstreams { for (hostname, host_config) in upstream { let path_map = DashMap::new(); @@ -119,33 +128,22 @@ pub fn load_configuration(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Head } path_map.insert(path, (server_list, AtomicUsize::new(0))); } - headersmap.insert(hostname.clone(), header_list); - upstreamsmap.insert(hostname, path_map); + toreturn.headers.insert(hostname.clone(), header_list); + toreturn.upstreams.insert(hostname, path_map); } } - Some((upstreamsmap, headersmap, String::from("file"))) + Some(toreturn) } "consul" => { + toreturn.typecfg = "consul".to_string(); let consul = parsed.consul; match consul { Some(consul) => { - // println!("{:?}", consul.services); - if let Some(srv) = consul.servers { - let joined = srv.join(" "); - Some((upstreamsmap, headersmap, String::from("consul ") + &*joined)) - } else { - None - } + toreturn.consul = Some(consul); + Some(toreturn) } None => None, } - // if let Some(srv) = parsed.consul?.servers { - // let joined = srv.join(" "); - // Some((upstreamsmap, headersmap, String::from("consul ") + &*joined)) - // } else { - // None - // } - // Some((upstreamsmap, headersmap, String::from("consul "))) } "kubernetes" => None, _ => { diff --git a/src/web/proxyhttp.rs b/src/web/proxyhttp.rs index 6e56d5e..56dc2c5 100644 --- a/src/web/proxyhttp.rs +++ b/src/web/proxyhttp.rs @@ -95,7 +95,8 @@ impl BackgroundService for LB { } } } - // print_upstreams(&self.ump_full); + info!("Upstreams list is changed, updating to:"); + print_upstreams(&self.ump_full); } None => {} } diff --git a/src/web/webserver.rs b/src/web/webserver.rs index 754bfd7..c14b4c9 100644 --- a/src/web/webserver.rs +++ b/src/web/webserver.rs @@ -25,7 +25,7 @@ pub async fn run_server(bindaddress: String, mut toreturn: Sender<(UpstreamsDash match serverlist { Some(serverlist) => { - let _ = tr.send((serverlist.0, serverlist.1)).await.unwrap(); + let _ = tr.send((serverlist.upstreams, serverlist.headers)).await.unwrap(); Response::builder().status(StatusCode::CREATED).body(Body::from("Config, conf file, updated!\n")).unwrap() } None => Response::builder()