Added whitelisted or all services support t Consul

This commit is contained in:
Ara Sadoyan
2025-04-04 17:38:18 +02:00
parent 23dae51e22
commit b2823508b7
6 changed files with 111 additions and 90 deletions

View File

@@ -1,4 +1,4 @@
provider: "consul" provider: "file"
globals: globals:
headers: headers:
- "Access-Control-Allow-Origin:*" - "Access-Control-Allow-Origin:*"
@@ -6,13 +6,13 @@ globals:
- "Access-Control-Max-Age:86400" - "Access-Control-Max-Age:86400"
consul: consul:
servers: servers:
- "master1:8500" - "http://master1:8500"
- "192.168.22.1:8500" - "http://192.168.22.1:8500"
- "master1.digitai.local:8500" - "http://master1.digitai.local:8500"
services: whitelist:
- "_frontend-dev-frontend-srv._tcp.service.consul." - "frontend-dev-frontend-srv"
- "_gateway-test-gateway-srv._tcp.service.consul" - "gateway-test-gateway-srv"
- "_gateway-stage-gateway-srv._tcp.service.consul" - "gateway-stage-gateway-srv"
upstreams: upstreams:
myip.netangels.net: myip.netangels.net:
paths: paths:

View File

@@ -1,10 +1,7 @@
use crate::utils::tools::{Headers, UpstreamsDashMap};
use futures::channel::mpsc::Sender;
use std::collections::HashMap;
use std::time::Duration;
use crate::utils::parceyaml::load_configuration; use crate::utils::parceyaml::load_configuration;
use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, Headers, UpstreamsDashMap};
use dashmap::DashMap; use dashmap::DashMap;
use futures::channel::mpsc::Sender;
use futures::SinkExt; use futures::SinkExt;
use hickory_client::client::{Client, ClientHandle}; use hickory_client::client::{Client, ClientHandle};
use hickory_client::proto::rr::{DNSClass, Name, RecordType}; use hickory_client::proto::rr::{DNSClass, Name, RecordType};
@@ -14,8 +11,10 @@ use log::info;
use pingora::prelude::sleep; use pingora::prelude::sleep;
use rand::Rng; use rand::Rng;
use serde::Deserialize; use serde::Deserialize;
use std::collections::HashMap;
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::time::Duration;
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
struct Service { struct Service {
@@ -36,53 +35,77 @@ struct TaggedAddress {
pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)>) { pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)>) {
let config = load_configuration(fp.as_str(), "filepath"); let config = load_configuration(fp.as_str(), "filepath");
let headers = DashMap::new(); let headers = DashMap::new();
// println!("{:?}", config);
match config { match config {
Some(config) => { Some(config) => {
let conf: Vec<&str> = config.2.split_whitespace().collect(); if config.typecfg.to_string() != "consul" {
let y = conf.get(0).unwrap(); info!("Not running Consul discovery, requested type is: {}", config.typecfg);
if y.to_string() != "consul" {
info!("Not running Consul discovery, requested type is: {}", config.2);
return; return;
} }
info!("Consul Discovery is enabled : {}", config.2);
let end = conf.len(); info!("Consul Discovery is enabled : {}", config.typecfg);
loop { let consul = config.consul;
let num = rand::thread_rng().gen_range(1..end); let prev_upstreams = UpstreamsDashMap::new();
sleep(Duration::from_secs(5)).await; match consul {
headers.clear(); Some(consul) => {
for (k, v) in config.1.clone() { let servers = consul.servers.unwrap();
headers.insert(k.to_string(), v); info!("Consul Servers => {:?}", servers);
} let end = servers.len();
let consul = "http://".to_string() + conf.get(num).unwrap(); loop {
let upstreams = http_request(consul, "GET"); let num = rand::thread_rng().gen_range(1..end);
match upstreams.await { headers.clear();
Some(upstreams) => { for (k, v) in config.headers.clone() {
toreturn.send((upstreams, headers.clone())).await.unwrap(); headers.insert(k.to_string(), v);
}
let consul_data = servers.get(num).unwrap().to_string();
let upstreams = http_request(consul_data, consul.whitelist.clone());
match upstreams.await {
Some(upstreams) => {
if !compare_dashmaps(&upstreams, &prev_upstreams) {
clone_dashmap_into(&upstreams, &prev_upstreams);
toreturn.send((upstreams, headers.clone())).await.unwrap();
}
}
None => {}
}
sleep(Duration::from_secs(5)).await;
} }
None => {}
} }
None => {}
} }
} }
None => {} None => {}
} }
} }
async fn http_request(url: String, method: &str) -> Option<UpstreamsDashMap> { async fn http_request(url: String, whitelist: Option<Vec<String>>) -> Option<UpstreamsDashMap> {
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let to = Duration::from_secs(1); let to = Duration::from_secs(1);
let upstreams = UpstreamsDashMap::new(); let upstreams = UpstreamsDashMap::new();
let excludes = vec!["consul", "nomad", "nomad-client"]; let excludes = vec!["consul", "nomad", "nomad-client"];
match method { let ss = url.clone() + "/v1/catalog/service";
"GET" => { let response = client.get(ss.clone() + "s").timeout(to).send().await;
let ss = url.clone() + "/v1/catalog/service"; match response {
let response = client.get(ss.clone() + "s").timeout(to).send().await; Ok(r) => {
match response { let json = r.json::<HashMap<String, Vec<String>>>().await;
Ok(r) => { match json {
let json = r.json::<HashMap<String, Vec<String>>>().await; Ok(_j) => {
match json { for (k, _v) in _j {
Ok(_j) => { match whitelist.clone() {
for (k, _v) in _j { Some(whitelist) => {
if whitelist.iter().any(|i| *i == k) {
let mut pref: String = ss.clone() + "/";
pref.push_str(&k);
let list = get_by_http(pref).await;
match list {
Some(list) => {
upstreams.insert(k.to_string(), list);
}
None => {}
}
}
}
None => {
if !excludes.iter().any(|&i| i == k) { if !excludes.iter().any(|&i| i == k) {
let mut pref: String = ss.clone() + "/"; let mut pref: String = ss.clone() + "/";
pref.push_str(&k); pref.push_str(&k);
@@ -95,19 +118,18 @@ async fn http_request(url: String, method: &str) -> Option<UpstreamsDashMap> {
} }
} }
} }
// print_upstreams(&upstreams);
Some(upstreams)
} }
Err(_) => None,
} }
// print_upstreams(&upstreams);
Some(upstreams)
} }
Err(e) => { Err(_) => None,
println!("Error: {:?}", e);
None
}
} }
} }
_ => None, Err(e) => {
println!("Error: {:?}", e);
None
}
} }
} }
@@ -121,8 +143,8 @@ async fn get_by_http(url: String) -> Option<DashMap<String, (Vec<(String, u16, b
Ok(r) => { Ok(r) => {
let jason = r.json::<Vec<Service>>().await; let jason = r.json::<Vec<Service>>().await;
match jason { match jason {
Ok(services) => { Ok(whitelist) => {
for service in services { for service in whitelist {
let addr = service.tagged_addresses.get("lan_ipv4").unwrap().address.clone(); let addr = service.tagged_addresses.get("lan_ipv4").unwrap().address.clone();
let prt = service.tagged_addresses.get("lan_ipv4").unwrap().port.clone(); let prt = service.tagged_addresses.get("lan_ipv4").unwrap().port.clone();
let to_add = (addr, prt, false); let to_add = (addr, prt, false);

View File

@@ -20,7 +20,7 @@ pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)>
match snd { match snd {
Some(snd) => { Some(snd) => {
toreturn.send((snd.0, snd.1)).await.unwrap(); toreturn.send((snd.upstreams, snd.headers)).await.unwrap();
} }
None => {} None => {}
} }
@@ -53,7 +53,7 @@ pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)>
let snd = load_configuration(file_path, "filepath"); let snd = load_configuration(file_path, "filepath");
match snd { match snd {
Some(snd) => { Some(snd) => {
toreturn.send((snd.0, snd.1)).await.unwrap(); toreturn.send((snd.upstreams, snd.headers)).await.unwrap();
} }
None => {} None => {}
} }

View File

@@ -8,9 +8,9 @@ use std::fs;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
struct Consul { pub struct Consul {
servers: Option<Vec<String>>, pub servers: Option<Vec<String>>,
services: Option<Vec<String>>, pub whitelist: Option<Vec<String>>,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
struct Config { struct Config {
@@ -32,18 +32,27 @@ struct PathConfig {
headers: Option<Vec<String>>, headers: Option<Vec<String>>,
} }
// #[derive(Debug, Serialize, Deserialize)] pub struct Configuration {
// pub struct Allconfig { pub upstreams: UpstreamsDashMap,
// pub upstreams: Option<UpstreamsDashMap>, pub headers: Headers,
// pub headers: Option<Headers>, pub consul: Option<Consul>,
// pub consul: Option<Consul>, pub typecfg: String,
// pub typecfg: String, }
// }
// pub fn load_configuration(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Headers, String)> { // pub fn load_configuration(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Headers, String)> {
pub fn load_configuration(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Headers, String)> { pub fn load_configuration(d: &str, kind: &str) -> Option<Configuration> {
let upstreamsmap = UpstreamsDashMap::new(); let mut toreturn: Configuration = Configuration {
let headersmap = DashMap::new(); upstreams: Default::default(),
headers: Default::default(),
consul: None,
typecfg: "".to_string(),
};
toreturn.upstreams = UpstreamsDashMap::new();
toreturn.headers = Headers::new();
// let upstreamsmap = UpstreamsDashMap::new();
// let headersmap = DashMap::new();
let mut yaml_data = d.to_string(); let mut yaml_data = d.to_string();
match kind { match kind {
"filepath" => { "filepath" => {
@@ -79,11 +88,11 @@ pub fn load_configuration(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Head
} }
} }
global_headers.insert("/".to_string(), hl); global_headers.insert("/".to_string(), hl);
headersmap.insert("GLOBAL_HEADERS".to_string(), global_headers); toreturn.headers.insert("GLOBAL_HEADERS".to_string(), global_headers);
} }
match parsed.provider.as_str() { match parsed.provider.as_str() {
"file" => { "file" => {
toreturn.typecfg = "file".to_string();
if let Some(upstream) = parsed.upstreams { if let Some(upstream) = parsed.upstreams {
for (hostname, host_config) in upstream { for (hostname, host_config) in upstream {
let path_map = DashMap::new(); let path_map = DashMap::new();
@@ -119,33 +128,22 @@ pub fn load_configuration(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Head
} }
path_map.insert(path, (server_list, AtomicUsize::new(0))); path_map.insert(path, (server_list, AtomicUsize::new(0)));
} }
headersmap.insert(hostname.clone(), header_list); toreturn.headers.insert(hostname.clone(), header_list);
upstreamsmap.insert(hostname, path_map); toreturn.upstreams.insert(hostname, path_map);
} }
} }
Some((upstreamsmap, headersmap, String::from("file"))) Some(toreturn)
} }
"consul" => { "consul" => {
toreturn.typecfg = "consul".to_string();
let consul = parsed.consul; let consul = parsed.consul;
match consul { match consul {
Some(consul) => { Some(consul) => {
// println!("{:?}", consul.services); toreturn.consul = Some(consul);
if let Some(srv) = consul.servers { Some(toreturn)
let joined = srv.join(" ");
Some((upstreamsmap, headersmap, String::from("consul ") + &*joined))
} else {
None
}
} }
None => None, None => None,
} }
// if let Some(srv) = parsed.consul?.servers {
// let joined = srv.join(" ");
// Some((upstreamsmap, headersmap, String::from("consul ") + &*joined))
// } else {
// None
// }
// Some((upstreamsmap, headersmap, String::from("consul ")))
} }
"kubernetes" => None, "kubernetes" => None,
_ => { _ => {

View File

@@ -95,7 +95,8 @@ impl BackgroundService for LB {
} }
} }
} }
// print_upstreams(&self.ump_full); info!("Upstreams list is changed, updating to:");
print_upstreams(&self.ump_full);
} }
None => {} None => {}
} }

View File

@@ -25,7 +25,7 @@ pub async fn run_server(bindaddress: String, mut toreturn: Sender<(UpstreamsDash
match serverlist { match serverlist {
Some(serverlist) => { Some(serverlist) => {
let _ = tr.send((serverlist.0, serverlist.1)).await.unwrap(); let _ = tr.send((serverlist.upstreams, serverlist.headers)).await.unwrap();
Response::builder().status(StatusCode::CREATED).body(Body::from("Config, conf file, updated!\n")).unwrap() Response::builder().status(StatusCode::CREATED).body(Body::from("Config, conf file, updated!\n")).unwrap()
} }
None => Response::builder() None => Response::builder()