Stabilized Consul integration with HTTP api

This commit is contained in:
Ara Sadoyan
2025-04-08 10:20:02 +02:00
parent b2823508b7
commit 788c618a3d
6 changed files with 94 additions and 35 deletions

1
Cargo.lock generated
View File

@@ -185,7 +185,6 @@ dependencies = [
"rand 0.8.5", "rand 0.8.5",
"reqwest", "reqwest",
"serde", "serde",
"serde_json",
"serde_yaml", "serde_yaml",
"tokio", "tokio",
] ]

View File

@@ -20,6 +20,5 @@ axum = "0.8.1"
reqwest = { version = "0.12.15", features = ["json"] } reqwest = { version = "0.12.15", features = ["json"] }
serde_yaml = "0.8.26" serde_yaml = "0.8.26"
hickory-client = "0.25.1" hickory-client = "0.25.1"
serde_json = "1.0.137"
rand = "0.8.5" rand = "0.8.5"

13
etc/main.yaml Normal file
View File

@@ -0,0 +1,13 @@
version: 1
threads: 8
upstream_keepalive_pool_size: 100
pid_file: /tmp/load_balancer.pid
error_log: /tmp/load_balancer_err.log
upgrade_sock: /tmp/load_balancer.sock
proxy_address_http: 0.0.0.0:6193
proxy_address_tls: 0.0.0.0:6194
config_address: 0.0.0.0:3000
upstreams_conf: etc/upstreams.yaml
#idle_timeout: 1000
log_level: info # info, warn, error, debug, trace, off

View File

@@ -1,18 +1,27 @@
provider: "file" provider: "consul"
globals: globals:
headers: headers:
- "Access-Control-Allow-Origin:*" - "Access-Control-Allow-Origin:*"
- "Access-Control-Allow-Methods:POST, GET, OPTIONS" - "Access-Control-Allow-Methods:POST, GET, OPTIONS"
- "Access-Control-Max-Age:86400" - "Access-Control-Max-Age:86400"
- "Hujukulu-Header-HoHoHo:Me No Ho Ho Ho"
consul: consul:
servers: servers:
- "http://master1:8500" - "http://master1:8500"
- "http://192.168.22.1:8500" - "http://192.168.22.1:8500"
- "http://master1.digitai.local:8500" - "http://master1.digitai.local:8500"
whitelist: services:
- "frontend-dev-frontend-srv" - proxy: "proxy-frontend-dev-frontend-srv"
- "gateway-test-gateway-srv" real: "frontend-dev-frontend-srv"
- "gateway-stage-gateway-srv" - proxy: "proxy-gateway-test-gateway-srv"
real: "gateway-test-gateway-srv"
- proxy: "proxy-backoffice-dev-backoffice-srv"
real: "backoffice-dev-backoffice-srv"
token: "8e2db809-845b-45e1-8b47-2c8356a09da0-a4370955-18c2-4d6e-a8f8-ffcc0b47be81"
# whitelist:
# - "frontend-dev-frontend-srv"
# - "gateway-test-gateway-srv"
# - "gateway-stage-gateway-srv"
upstreams: upstreams:
myip.netangels.net: myip.netangels.net:
paths: paths:

View File

@@ -1,18 +1,19 @@
use crate::utils::parceyaml::load_configuration; use crate::utils::parceyaml::{load_configuration, ServiceMapping};
use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, Headers, UpstreamsDashMap}; use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, Headers, UpstreamsDashMap};
use dashmap::DashMap; use dashmap::DashMap;
use futures::channel::mpsc::Sender; use futures::channel::mpsc::Sender;
use futures::SinkExt; use futures::SinkExt;
use hickory_client::client::{Client, ClientHandle}; // use hickory_client::client::{Client, ClientHandle};
use hickory_client::proto::rr::{DNSClass, Name, RecordType}; // use hickory_client::proto::rr::{DNSClass, Name, RecordType};
use hickory_client::proto::runtime::TokioRuntimeProvider; // use hickory_client::proto::runtime::TokioRuntimeProvider;
use hickory_client::proto::tcp::TcpClientStream; // use hickory_client::proto::tcp::TcpClientStream;
use log::info; use log::{info, warn};
use pingora::prelude::sleep; use pingora::prelude::sleep;
use rand::Rng; use rand::Rng;
// use std::str::FromStr;
use reqwest::header::{HeaderMap, HeaderValue};
use serde::Deserialize; use serde::Deserialize;
use std::collections::HashMap; use std::collections::HashMap;
use std::str::FromStr;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::time::Duration; use std::time::Duration;
@@ -51,14 +52,17 @@ pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)>
let servers = consul.servers.unwrap(); let servers = consul.servers.unwrap();
info!("Consul Servers => {:?}", servers); info!("Consul Servers => {:?}", servers);
let end = servers.len(); let end = servers.len();
loop { loop {
// println!(" ==> {:?}", consul.services);
let num = rand::thread_rng().gen_range(1..end); let num = rand::thread_rng().gen_range(1..end);
headers.clear(); headers.clear();
for (k, v) in config.headers.clone() { for (k, v) in config.headers.clone() {
headers.insert(k.to_string(), v); headers.insert(k.to_string(), v);
} }
let consul_data = servers.get(num).unwrap().to_string(); let consul_data = servers.get(num).unwrap().to_string();
let upstreams = http_request(consul_data, consul.whitelist.clone()); // let upstreams = http_request(consul_data, consul.whitelist.clone());
let upstreams = consul_request(consul_data, consul.services.clone(), consul.token.clone());
match upstreams.await { match upstreams.await {
Some(upstreams) => { Some(upstreams) => {
if !compare_dashmaps(&upstreams, &prev_upstreams) { if !compare_dashmaps(&upstreams, &prev_upstreams) {
@@ -78,6 +82,7 @@ pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)>
} }
} }
/*
async fn http_request(url: String, whitelist: Option<Vec<String>>) -> Option<UpstreamsDashMap> { async fn http_request(url: String, whitelist: Option<Vec<String>>) -> Option<UpstreamsDashMap> {
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let to = Duration::from_secs(1); let to = Duration::from_secs(1);
@@ -132,11 +137,38 @@ async fn http_request(url: String, whitelist: Option<Vec<String>>) -> Option<Ups
} }
} }
} }
*/
async fn consul_request(url: String, whitelist: Option<Vec<ServiceMapping>>, token: Option<String>) -> Option<UpstreamsDashMap> {
let upstreams = UpstreamsDashMap::new();
let ss = url.clone() + "/v1/catalog/service/";
match whitelist {
Some(whitelist) => {
for k in whitelist.iter() {
let pref: String = ss.clone() + &k.real;
let list = get_by_http(pref.clone(), token.clone()).await;
match list {
Some(list) => {
upstreams.insert(k.proxy.clone(), list);
}
None => {
warn!("Whitelist not found for {}", k.proxy);
}
}
}
}
None => {}
}
Some(upstreams)
}
async fn get_by_http(url: String) -> Option<DashMap<String, (Vec<(String, u16, bool)>, AtomicUsize)>> { async fn get_by_http(url: String, token: Option<String>) -> Option<DashMap<String, (Vec<(String, u16, bool)>, AtomicUsize)>> {
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let mut headers = HeaderMap::new();
if let Some(token) = token {
headers.insert("X-Consul-Token", HeaderValue::from_str(&token).unwrap());
}
let to = Duration::from_secs(1); let to = Duration::from_secs(1);
let u = client.get(url.clone()).timeout(to).send(); let u = client.get(url).timeout(to).send();
let mut values = Vec::new(); let mut values = Vec::new();
let upstreams: DashMap<String, (Vec<(String, u16, bool)>, AtomicUsize)> = DashMap::new(); let upstreams: DashMap<String, (Vec<(String, u16, bool)>, AtomicUsize)> = DashMap::new();
match u.await { match u.await {
@@ -160,19 +192,19 @@ async fn get_by_http(url: String) -> Option<DashMap<String, (Vec<(String, u16, b
Some(upstreams) Some(upstreams)
} }
#[allow(dead_code)] // #[allow(dead_code)]
async fn get_by_dns() { // async fn get_by_dns() {
let (stream, sender) = TcpClientStream::new(([192, 168, 22, 1], 53).into(), None, None, TokioRuntimeProvider::new()); // let (stream, sender) = TcpClientStream::new(([192, 168, 22, 1], 53).into(), None, None, TokioRuntimeProvider::new());
let client = Client::new(stream, sender, None); // let client = Client::new(stream, sender, None);
let (mut client, bg) = client.await.expect("connection failed"); // let (mut client, bg) = client.await.expect("connection failed");
tokio::spawn(bg); // tokio::spawn(bg);
let query = client.query(Name::from_str("_frontend-dev-frontend-srv._tcp.service.consul.").unwrap(), DNSClass::IN, RecordType::SRV); // let query = client.query(Name::from_str("_frontend-dev-frontend-srv._tcp.service.consul.").unwrap(), DNSClass::IN, RecordType::SRV);
// let query = client.query(Name::from_str("matyan.org.").unwrap(), DNSClass::IN, RecordType::A); // // let query = client.query(Name::from_str("matyan.org.").unwrap(), DNSClass::IN, RecordType::A);
let response = query.await.unwrap(); // let response = query.await.unwrap();
//
for t in response.answers().iter() { // for t in response.answers().iter() {
for y in t.data().as_srv().iter() { // for y in t.data().as_srv().iter() {
println!(" DNS ==> {:?} : {:?}", y.target().to_utf8(), y.port()); // println!(" DNS ==> {:?} : {:?}", y.target().to_utf8(), y.port());
} // }
} // }
} // }

View File

@@ -7,10 +7,17 @@ use std::collections::HashMap;
use std::fs; use std::fs;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceMapping {
pub proxy: String,
pub real: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Consul { pub struct Consul {
pub servers: Option<Vec<String>>, pub servers: Option<Vec<String>>,
pub whitelist: Option<Vec<String>>, pub services: Option<Vec<ServiceMapping>>,
pub token: Option<String>,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
struct Config { struct Config {