diff --git a/Cargo.lock b/Cargo.lock index f37d6fd..7d98932 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -185,7 +185,6 @@ dependencies = [ "rand 0.8.5", "reqwest", "serde", - "serde_json", "serde_yaml", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 17c5642..126c200 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,5 @@ axum = "0.8.1" reqwest = { version = "0.12.15", features = ["json"] } serde_yaml = "0.8.26" hickory-client = "0.25.1" -serde_json = "1.0.137" rand = "0.8.5" diff --git a/etc/main.yaml b/etc/main.yaml new file mode 100644 index 0000000..d189007 --- /dev/null +++ b/etc/main.yaml @@ -0,0 +1,13 @@ +version: 1 +threads: 8 +upstream_keepalive_pool_size: 100 +pid_file: /tmp/load_balancer.pid +error_log: /tmp/load_balancer_err.log +upgrade_sock: /tmp/load_balancer.sock +proxy_address_http: 0.0.0.0:6193 +proxy_address_tls: 0.0.0.0:6194 +config_address: 0.0.0.0:3000 +upstreams_conf: etc/upstreams.yaml +#idle_timeout: 1000 +log_level: info # info, warn, error, debug, trace, off + diff --git a/etc/upstreams.yaml b/etc/upstreams.yaml index eca7e19..a7a538b 100644 --- a/etc/upstreams.yaml +++ b/etc/upstreams.yaml @@ -1,18 +1,27 @@ -provider: "file" +provider: "consul" globals: headers: - "Access-Control-Allow-Origin:*" - "Access-Control-Allow-Methods:POST, GET, OPTIONS" - "Access-Control-Max-Age:86400" + - "Hujukulu-Header-HoHoHo:Me No Ho Ho Ho" consul: servers: - "http://master1:8500" - "http://192.168.22.1:8500" - "http://master1.digitai.local:8500" - whitelist: - - "frontend-dev-frontend-srv" - - "gateway-test-gateway-srv" - - "gateway-stage-gateway-srv" + services: + - proxy: "proxy-frontend-dev-frontend-srv" + real: "frontend-dev-frontend-srv" + - proxy: "proxy-gateway-test-gateway-srv" + real: "gateway-test-gateway-srv" + - proxy: "proxy-backoffice-dev-backoffice-srv" + real: "backoffice-dev-backoffice-srv" + token: "8e2db809-845b-45e1-8b47-2c8356a09da0-a4370955-18c2-4d6e-a8f8-ffcc0b47be81" +# whitelist: +# - "frontend-dev-frontend-srv" +# - "gateway-test-gateway-srv" +# - "gateway-stage-gateway-srv" upstreams: myip.netangels.net: paths: diff --git a/src/utils/consul.rs b/src/utils/consul.rs index 2c5312e..a217c62 100644 --- a/src/utils/consul.rs +++ b/src/utils/consul.rs @@ -1,18 +1,19 @@ -use crate::utils::parceyaml::load_configuration; +use crate::utils::parceyaml::{load_configuration, ServiceMapping}; use crate::utils::tools::{clone_dashmap_into, compare_dashmaps, Headers, UpstreamsDashMap}; use dashmap::DashMap; use futures::channel::mpsc::Sender; use futures::SinkExt; -use hickory_client::client::{Client, ClientHandle}; -use hickory_client::proto::rr::{DNSClass, Name, RecordType}; -use hickory_client::proto::runtime::TokioRuntimeProvider; -use hickory_client::proto::tcp::TcpClientStream; -use log::info; +// use hickory_client::client::{Client, ClientHandle}; +// use hickory_client::proto::rr::{DNSClass, Name, RecordType}; +// use hickory_client::proto::runtime::TokioRuntimeProvider; +// use hickory_client::proto::tcp::TcpClientStream; +use log::{info, warn}; use pingora::prelude::sleep; use rand::Rng; +// use std::str::FromStr; +use reqwest::header::{HeaderMap, HeaderValue}; use serde::Deserialize; use std::collections::HashMap; -use std::str::FromStr; use std::sync::atomic::AtomicUsize; use std::time::Duration; @@ -51,14 +52,17 @@ pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)> let servers = consul.servers.unwrap(); info!("Consul Servers => {:?}", servers); let end = servers.len(); + loop { + // println!(" ==> {:?}", consul.services); let num = rand::thread_rng().gen_range(1..end); headers.clear(); for (k, v) in config.headers.clone() { headers.insert(k.to_string(), v); } let consul_data = servers.get(num).unwrap().to_string(); - let upstreams = http_request(consul_data, consul.whitelist.clone()); + // let upstreams = http_request(consul_data, consul.whitelist.clone()); + let upstreams = consul_request(consul_data, consul.services.clone(), consul.token.clone()); match upstreams.await { Some(upstreams) => { if !compare_dashmaps(&upstreams, &prev_upstreams) { @@ -78,6 +82,7 @@ pub async fn start(fp: String, mut toreturn: Sender<(UpstreamsDashMap, Headers)> } } +/* async fn http_request(url: String, whitelist: Option>) -> Option { let client = reqwest::Client::new(); let to = Duration::from_secs(1); @@ -132,11 +137,38 @@ async fn http_request(url: String, whitelist: Option>) -> Option>, token: Option) -> Option { + let upstreams = UpstreamsDashMap::new(); + let ss = url.clone() + "/v1/catalog/service/"; + match whitelist { + Some(whitelist) => { + for k in whitelist.iter() { + let pref: String = ss.clone() + &k.real; + let list = get_by_http(pref.clone(), token.clone()).await; + match list { + Some(list) => { + upstreams.insert(k.proxy.clone(), list); + } + None => { + warn!("Whitelist not found for {}", k.proxy); + } + } + } + } + None => {} + } + Some(upstreams) +} -async fn get_by_http(url: String) -> Option, AtomicUsize)>> { +async fn get_by_http(url: String, token: Option) -> Option, AtomicUsize)>> { let client = reqwest::Client::new(); + let mut headers = HeaderMap::new(); + if let Some(token) = token { + headers.insert("X-Consul-Token", HeaderValue::from_str(&token).unwrap()); + } let to = Duration::from_secs(1); - let u = client.get(url.clone()).timeout(to).send(); + let u = client.get(url).timeout(to).send(); let mut values = Vec::new(); let upstreams: DashMap, AtomicUsize)> = DashMap::new(); match u.await { @@ -160,19 +192,19 @@ async fn get_by_http(url: String) -> Option {:?} : {:?}", y.target().to_utf8(), y.port()); - } - } -} +// #[allow(dead_code)] +// async fn get_by_dns() { +// let (stream, sender) = TcpClientStream::new(([192, 168, 22, 1], 53).into(), None, None, TokioRuntimeProvider::new()); +// let client = Client::new(stream, sender, None); +// let (mut client, bg) = client.await.expect("connection failed"); +// tokio::spawn(bg); +// let query = client.query(Name::from_str("_frontend-dev-frontend-srv._tcp.service.consul.").unwrap(), DNSClass::IN, RecordType::SRV); +// // let query = client.query(Name::from_str("matyan.org.").unwrap(), DNSClass::IN, RecordType::A); +// let response = query.await.unwrap(); +// +// for t in response.answers().iter() { +// for y in t.data().as_srv().iter() { +// println!(" DNS ==> {:?} : {:?}", y.target().to_utf8(), y.port()); +// } +// } +// } diff --git a/src/utils/parceyaml.rs b/src/utils/parceyaml.rs index dd8c0fe..c739008 100644 --- a/src/utils/parceyaml.rs +++ b/src/utils/parceyaml.rs @@ -7,10 +7,17 @@ use std::collections::HashMap; use std::fs; use std::sync::atomic::AtomicUsize; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ServiceMapping { + pub proxy: String, + pub real: String, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Consul { pub servers: Option>, - pub whitelist: Option>, + pub services: Option>, + pub token: Option, } #[derive(Debug, Serialize, Deserialize)] struct Config {