Early stage Consul http api support

This commit is contained in:
Ara Sadoyan
2025-04-02 18:58:46 +02:00
parent cbbf45092b
commit 23dae51e22
13 changed files with 536 additions and 74 deletions

View File

@@ -1,3 +1,4 @@
pub mod consul;
pub mod discovery;
mod filewatch;
pub mod healthcheck;

156
src/utils/consul.rs Normal file
View File

@@ -0,0 +1,156 @@
use crate::utils::tools::{Headers, UpstreamsDashMap};
use futures::channel::mpsc::Sender;
use std::collections::HashMap;
use std::time::Duration;
use crate::utils::parceyaml::load_configuration;
use dashmap::DashMap;
use futures::SinkExt;
use hickory_client::client::{Client, ClientHandle};
use hickory_client::proto::rr::{DNSClass, Name, RecordType};
use hickory_client::proto::runtime::TokioRuntimeProvider;
use hickory_client::proto::tcp::TcpClientStream;
use log::info;
use pingora::prelude::sleep;
use rand::Rng;
use serde::Deserialize;
use std::str::FromStr;
use std::sync::atomic::AtomicUsize;
#[derive(Debug, Deserialize)]
struct Service {
// #[serde(rename = "ServiceName")]
// service_name: String,
#[serde(rename = "ServiceTaggedAddresses")]
tagged_addresses: HashMap<String, TaggedAddress>,
}
#[derive(Debug, Deserialize)]
struct TaggedAddress {
#[serde(rename = "Address")]
address: String,
#[serde(rename = "Port")]
port: u16,
}
pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)>) {
let config = load_configuration(fp.as_str(), "filepath");
let headers = DashMap::new();
// println!("{:?}", config);
match config {
Some(config) => {
let conf: Vec<&str> = config.2.split_whitespace().collect();
let y = conf.get(0).unwrap();
if y.to_string() != "consul" {
info!("Not running Consul discovery, requested type is: {}", config.2);
return;
}
info!("Consul Discovery is enabled : {}", config.2);
let end = conf.len();
loop {
let num = rand::thread_rng().gen_range(1..end);
sleep(Duration::from_secs(5)).await;
headers.clear();
for (k, v) in config.1.clone() {
headers.insert(k.to_string(), v);
}
let consul = "http://".to_string() + conf.get(num).unwrap();
let upstreams = http_request(consul, "GET");
match upstreams.await {
Some(upstreams) => {
toreturn.send((upstreams, headers.clone())).await.unwrap();
}
None => {}
}
}
}
None => {}
}
}
async fn http_request(url: String, method: &str) -> Option<UpstreamsDashMap> {
let client = reqwest::Client::new();
let to = Duration::from_secs(1);
let upstreams = UpstreamsDashMap::new();
let excludes = vec!["consul", "nomad", "nomad-client"];
match method {
"GET" => {
let ss = url.clone() + "/v1/catalog/service";
let response = client.get(ss.clone() + "s").timeout(to).send().await;
match response {
Ok(r) => {
let json = r.json::<HashMap<String, Vec<String>>>().await;
match json {
Ok(_j) => {
for (k, _v) in _j {
if !excludes.iter().any(|&i| i == k) {
let mut pref: String = ss.clone() + "/";
pref.push_str(&k);
let list = get_by_http(pref).await;
match list {
Some(list) => {
upstreams.insert(k.to_string(), list);
}
None => {}
}
}
}
// print_upstreams(&upstreams);
Some(upstreams)
}
Err(_) => None,
}
}
Err(e) => {
println!("Error: {:?}", e);
None
}
}
}
_ => None,
}
}
async fn get_by_http(url: String) -> Option<DashMap<String, (Vec<(String, u16, bool)>, AtomicUsize)>> {
let client = reqwest::Client::new();
let to = Duration::from_secs(1);
let u = client.get(url.clone()).timeout(to).send();
let mut values = Vec::new();
let upstreams: DashMap<String, (Vec<(String, u16, bool)>, AtomicUsize)> = DashMap::new();
match u.await {
Ok(r) => {
let jason = r.json::<Vec<Service>>().await;
match jason {
Ok(services) => {
for service in services {
let addr = service.tagged_addresses.get("lan_ipv4").unwrap().address.clone();
let prt = service.tagged_addresses.get("lan_ipv4").unwrap().port.clone();
let to_add = (addr, prt, false);
values.push(to_add);
}
}
Err(_) => return None,
}
}
Err(_) => return None,
}
upstreams.insert("/".to_string(), (values, AtomicUsize::new(0)));
Some(upstreams)
}
#[allow(dead_code)]
async fn get_by_dns() {
let (stream, sender) = TcpClientStream::new(([192, 168, 22, 1], 53).into(), None, None, TokioRuntimeProvider::new());
let client = Client::new(stream, sender, None);
let (mut client, bg) = client.await.expect("connection failed");
tokio::spawn(bg);
let query = client.query(Name::from_str("_frontend-dev-frontend-srv._tcp.service.consul.").unwrap(), DNSClass::IN, RecordType::SRV);
// let query = client.query(Name::from_str("matyan.org.").unwrap(), DNSClass::IN, RecordType::A);
let response = query.await.unwrap();
for t in response.answers().iter() {
for y in t.data().as_srv().iter() {
println!(" DNS ==> {:?} : {:?}", y.target().to_utf8(), y.port());
}
}
}

View File

@@ -1,3 +1,4 @@
use crate::utils::consul;
use crate::utils::filewatch;
use crate::utils::tools::*;
use crate::web::webserver;
@@ -11,6 +12,10 @@ pub struct APIUpstreamProvider {
pub address: String,
}
pub struct ConsulProvider {
pub path: String,
}
#[async_trait]
pub trait Discovery {
async fn start(&self, tx: Sender<(UpstreamsDashMap, Headers)>);
@@ -29,3 +34,10 @@ impl Discovery for FromFileProvider {
tokio::spawn(filewatch::start(self.path.clone(), tx.clone()));
}
}
#[async_trait]
impl Discovery for ConsulProvider {
async fn start(&self, tx: Sender<(UpstreamsDashMap, Headers)>) {
tokio::spawn(consul::start(self.path.clone(), tx.clone()));
}
}

View File

@@ -20,7 +20,7 @@ pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)>
match snd {
Some(snd) => {
toreturn.send(snd).await.unwrap();
toreturn.send((snd.0, snd.1)).await.unwrap();
}
None => {}
}
@@ -49,11 +49,11 @@ pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)>
if e.paths[0].to_str().unwrap().ends_with("yaml") {
if start.elapsed() > Duration::from_secs(2) {
start = Instant::now();
info!("Config File changed :=> {:?}", e);
// info!("Config File changed :=> {:?}", e);
let snd = load_configuration(file_path, "filepath");
match snd {
Some(snd) => {
toreturn.send(snd).await.unwrap();
toreturn.send((snd.0, snd.1)).await.unwrap();
}
None => {}
}

View File

@@ -1,5 +1,6 @@
use crate::utils::tools::*;
use dashmap::DashMap;
use log::warn;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
@@ -33,7 +34,7 @@ pub async fn hc2(upslist: Arc<UpstreamsDashMap>, fullist: Arc<UpstreamsDashMap>)
innervec.push(k.1.clone());
}
false => {
println!("Dead Upstream {}, Link: {}",k.0, link);
warn!("Dead Upstream : {}", link);
}
}
}
@@ -42,7 +43,7 @@ pub async fn hc2(upslist: Arc<UpstreamsDashMap>, fullist: Arc<UpstreamsDashMap>)
totest.insert(host.clone(), inner);
}
if ! compare_dashmaps(&totest, &upslist){
print_upstreams(&totest);
// print_upstreams(&totest);
clone_dashmap_into(&totest, &upslist);
}
}

View File

@@ -7,11 +7,17 @@ use std::collections::HashMap;
use std::fs;
use std::sync::atomic::AtomicUsize;
#[derive(Debug, Serialize, Deserialize)]
struct Consul {
servers: Option<Vec<String>>,
services: Option<Vec<String>>,
}
#[derive(Debug, Serialize, Deserialize)]
struct Config {
provider: String,
upstreams: Option<HashMap<String, HostConfig>>,
globals: Option<HashMap<String, Vec<String>>>,
consul: Option<Consul>,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -26,9 +32,18 @@ struct PathConfig {
headers: Option<Vec<String>>,
}
pub fn load_configuration(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Headers)> {
let dashmap = UpstreamsDashMap::new();
let headerm = DashMap::new();
// #[derive(Debug, Serialize, Deserialize)]
// pub struct Allconfig {
// pub upstreams: Option<UpstreamsDashMap>,
// pub headers: Option<Headers>,
// pub consul: Option<Consul>,
// pub typecfg: String,
// }
// pub fn load_configuration(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Headers, String)> {
pub fn load_configuration(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Headers, String)> {
let upstreamsmap = UpstreamsDashMap::new();
let headersmap = DashMap::new();
let mut yaml_data = d.to_string();
match kind {
"filepath" => {
@@ -53,52 +68,91 @@ pub fn load_configuration(d: &str, kind: &str) -> Option<(UpstreamsDashMap, Head
let p: Result<Config, Error> = serde_yaml::from_str(&yaml_data);
match p {
Ok(parsed) => {
let global_headers = DashMap::new();
let mut hl = Vec::new();
if let Some(globals) = &parsed.globals {
for headers in globals.get("headers").iter().by_ref() {
for header in headers.iter() {
if let Some((key, val)) = header.split_once(':') {
hl.push((key.to_string(), val.to_string()));
}
}
}
global_headers.insert("/".to_string(), hl);
headersmap.insert("GLOBAL_HEADERS".to_string(), global_headers);
}
match parsed.provider.as_str() {
"file" => {}
"consul" => return None,
"kubernetes" => return None,
_ => warn!("Unknown provider {}", parsed.provider),
};
if let Some(upstream) = parsed.upstreams {
for (hostname, host_config) in upstream {
let path_map = DashMap::new();
let header_list = DashMap::new();
for (path, path_config) in host_config.paths {
let mut server_list = Vec::new();
let mut hl = Vec::new();
// Set global headers
if let Some(globals) = &parsed.globals {
for headers in globals.get("headers").iter().by_ref() {
for header in headers.iter() {
if let Some((key, val)) = header.split_once(':') {
hl.push((key.to_string(), val.to_string()));
"file" => {
if let Some(upstream) = parsed.upstreams {
for (hostname, host_config) in upstream {
let path_map = DashMap::new();
let header_list = DashMap::new();
for (path, path_config) in host_config.paths {
let mut server_list = Vec::new();
let mut hl = Vec::new();
// Set global headers
// if let Some(globals) = &parsed.globals {
// for headers in globals.get("headers").iter().by_ref() {
// for header in headers.iter() {
// if let Some((key, val)) = header.split_once(':') {
// hl.push((key.to_string(), val.to_string()));
// }
// }
// }
// }
// Set per host/path headers
if let Some(headers) = &path_config.headers {
for header in headers.iter().by_ref() {
if let Some((key, val)) = header.split_once(':') {
hl.push((key.to_string(), val.to_string()));
}
}
}
}
}
// Set per host/path headers
if let Some(headers) = &path_config.headers {
for header in headers.iter().by_ref() {
if let Some((key, val)) = header.split_once(':') {
hl.push((key.to_string(), val.to_string()));
header_list.insert(path.clone(), hl);
for server in path_config.servers {
if let Some((ip, port_str)) = server.split_once(':') {
if let Ok(port) = port_str.parse::<u16>() {
server_list.push((ip.to_string(), port, path_config.ssl));
}
}
}
path_map.insert(path, (server_list, AtomicUsize::new(0)));
}
headersmap.insert(hostname.clone(), header_list);
upstreamsmap.insert(hostname, path_map);
}
header_list.insert(path.clone(), hl);
for server in path_config.servers {
if let Some((ip, port_str)) = server.split_once(':') {
if let Ok(port) = port_str.parse::<u16>() {
server_list.push((ip.to_string(), port, path_config.ssl));
}
}
}
path_map.insert(path, (server_list, AtomicUsize::new(0)));
}
headerm.insert(hostname.clone(), header_list);
dashmap.insert(hostname, path_map);
Some((upstreamsmap, headersmap, String::from("file")))
}
"consul" => {
let consul = parsed.consul;
match consul {
Some(consul) => {
// println!("{:?}", consul.services);
if let Some(srv) = consul.servers {
let joined = srv.join(" ");
Some((upstreamsmap, headersmap, String::from("consul ") + &*joined))
} else {
None
}
}
None => None,
}
// if let Some(srv) = parsed.consul?.servers {
// let joined = srv.join(" ");
// Some((upstreamsmap, headersmap, String::from("consul ") + &*joined))
// } else {
// None
// }
// Some((upstreamsmap, headersmap, String::from("consul ")))
}
"kubernetes" => None,
_ => {
warn!("Unknown provider {}", parsed.provider);
None
}
}
Some((dashmap, headerm))
}
Err(e) => {
error!("Failed to parse upstreams file: {}", e);

View File

@@ -117,3 +117,13 @@ pub fn compare_dashmaps(map1: &UpstreamsDashMap, map2: &UpstreamsDashMap) -> boo
}
true
}
#[allow(dead_code)]
pub fn merge_headers(target: &DashMap<String, Vec<(String, String)>>, source: &DashMap<String, Vec<(String, String)>>) {
for entry in source.iter() {
let global_key = entry.key().clone();
let global_values = entry.value().clone();
let mut target_entry = target.entry(global_key).or_insert_with(Vec::new);
target_entry.extend(global_values);
}
}

View File

@@ -1,4 +1,4 @@
use crate::utils::discovery::{APIUpstreamProvider, Discovery, FromFileProvider};
use crate::utils::discovery::{APIUpstreamProvider, ConsulProvider, Discovery, FromFileProvider};
use crate::utils::tools::*;
use crate::utils::*;
use async_trait::async_trait;
@@ -33,9 +33,14 @@ impl BackgroundService for LB {
let from_file = self.config.get("upstreams_conf");
match from_file {
Some(from_file) => {
let file_load = FromFileProvider { path: from_file.to_string() };
let tx_file = tx.clone();
let tx_consul = tx.clone();
let file_load = FromFileProvider { path: from_file.to_string() };
let consul_load = ConsulProvider { path: from_file.to_string() };
let _ = tokio::spawn(async move { file_load.start(tx_file).await });
let _ = tokio::spawn(async move { consul_load.start(tx_consul).await });
}
None => {
error!("Can't read config file");
@@ -68,16 +73,29 @@ impl BackgroundService for LB {
val = rx.next() => {
match val {
Some(ss) => {
// let foo = compare_dashmaps(&*self.ump_full, &ss.0);
// println!("{:?}", ss.1);
// if !foo {
clone_dashmap_into(&ss.0, &self.ump_full);
clone_dashmap_into(&ss.0, &self.ump_upst);
for (k,v) in ss.1 {
self.headers.insert(k,v);
self.headers.clear();
for entry in ss.0.iter() {
let global_key = entry.key().clone();
let global_values = DashMap::new();
let mut target_entry = ss.1.entry(global_key).or_insert_with(DashMap::new);
target_entry.extend(global_values);
self.headers.insert(target_entry.key().to_owned(), target_entry.value().to_owned());
}
print_upstreams(&self.ump_full);
// }
for path in ss.1.iter() {
let path_key = path.key().clone();
let path_headers = path.value().clone();
self.headers.insert(path_key.clone(), path_headers);
if let Some(global_headers) = ss.1.get("GLOBAL_HEADERS") {
if let Some(existing_headers) = self.headers.get_mut(&path_key) {
merge_headers(&existing_headers, &global_headers);
}
}
}
// print_upstreams(&self.ump_full);
}
None => {}
}
@@ -266,6 +284,6 @@ impl ProxyHttp for LB {
async fn logging(&self, session: &mut Session, _e: Option<&pingora::Error>, ctx: &mut Self::CTX) {
let response_code = session.response_written().map_or(0, |resp| resp.status.as_u16());
debug!("{}, response code: {response_code}", self.request_summary(session, ctx));
info!("{}, response code: {response_code}", self.request_summary(session, ctx));
// info!("{}, response code: {response_code}", self.request_summary(session, ctx));
}
}

View File

@@ -79,6 +79,8 @@ pub fn run() {
server.add_service(bg_srvc);
// info!("Starting Gazan server on {}, port : {} !", args.address, args.port);
// let mut prometheus_service_http = Service::prometheus_http_service();
// prometheus_service_http.add_tcp("0.0.0.0:1234");
// server.add_service(prometheus_service_http);
server.run_forever();
}

View File

@@ -25,7 +25,7 @@ pub async fn run_server(bindaddress: String, mut toreturn: Sender<(UpstreamsDash
match serverlist {
Some(serverlist) => {
let _ = tr.send(serverlist).await.unwrap();
let _ = tr.send((serverlist.0, serverlist.1)).await.unwrap();
Response::builder().status(StatusCode::CREATED).body(Body::from("Config, conf file, updated!\n")).unwrap()
}
None => Response::builder()